mirror of
https://github.com/mjl-/mox.git
synced 2025-01-27 14:55:56 +03:00
fdc0560ac4
noticed some time ago when looking at my retired messages queue.
1496 lines
48 KiB
Go
1496 lines
48 KiB
Go
package queue
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/ed25519"
|
|
cryptorand "crypto/rand"
|
|
"crypto/sha256"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math/big"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/mjl-/adns"
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/dns"
|
|
"github.com/mjl-/mox/mlog"
|
|
"github.com/mjl-/mox/mox-"
|
|
"github.com/mjl-/mox/mtastsdb"
|
|
"github.com/mjl-/mox/smtp"
|
|
"github.com/mjl-/mox/smtpclient"
|
|
"github.com/mjl-/mox/store"
|
|
"github.com/mjl-/mox/tlsrpt"
|
|
"github.com/mjl-/mox/tlsrptdb"
|
|
"github.com/mjl-/mox/webhook"
|
|
)
|
|
|
|
var ctxbg = context.Background()
|
|
var pkglog = mlog.New("queue", nil)
|
|
|
|
func tcheck(t *testing.T, err error, msg string) {
|
|
if err != nil {
|
|
t.Helper()
|
|
t.Fatalf("%s: %s", msg, err)
|
|
}
|
|
}
|
|
|
|
func tcompare(t *testing.T, got, exp any) {
|
|
t.Helper()
|
|
if !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
|
|
}
|
|
}
|
|
|
|
func setup(t *testing.T) (*store.Account, func()) {
|
|
// Prepare config so email can be delivered to mjl@mox.example.
|
|
os.RemoveAll("../testdata/queue/data")
|
|
log := mlog.New("queue", nil)
|
|
mox.Context = ctxbg
|
|
mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
|
|
mox.MustLoadConfig(true, false)
|
|
err := Init()
|
|
tcheck(t, err, "queue init")
|
|
err = mtastsdb.Init(false)
|
|
tcheck(t, err, "mtastsdb init")
|
|
err = tlsrptdb.Init()
|
|
tcheck(t, err, "tlsrptdb init")
|
|
acc, err := store.OpenAccount(log, "mjl")
|
|
tcheck(t, err, "open account")
|
|
err = acc.SetPassword(log, "testtest")
|
|
tcheck(t, err, "set password")
|
|
switchStop := store.Switchboard()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
return acc, func() {
|
|
acc.Close()
|
|
acc.CheckClosed()
|
|
mox.ShutdownCancel()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
Shutdown()
|
|
err := mtastsdb.Close()
|
|
tcheck(t, err, "mtastsdb close")
|
|
err = tlsrptdb.Close()
|
|
tcheck(t, err, "tlsrptdb close")
|
|
switchStop()
|
|
}
|
|
}
|
|
|
|
var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
|
|
To: <mjl@mox.example>
|
|
Subject: test
|
|
|
|
test email
|
|
`, "\n", "\r\n")
|
|
|
|
func prepareFile(t *testing.T) *os.File {
|
|
t.Helper()
|
|
msgFile, err := store.CreateMessageTemp(pkglog, "queue")
|
|
tcheck(t, err, "create temp message for delivery to queue")
|
|
_, err = msgFile.Write([]byte(testmsg))
|
|
tcheck(t, err, "write message file")
|
|
return msgFile
|
|
}
|
|
|
|
func TestQueue(t *testing.T) {
|
|
acc, cleanup := setup(t)
|
|
defer cleanup()
|
|
|
|
idfilter := func(msgID int64) Filter {
|
|
return Filter{IDs: []int64{msgID}}
|
|
}
|
|
|
|
kick := func(expn int, id int64) {
|
|
t.Helper()
|
|
n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
|
|
tcheck(t, err, "kick queue")
|
|
if n != expn {
|
|
t.Fatalf("kick changed %d messages, expected %d", n, expn)
|
|
}
|
|
}
|
|
|
|
msgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "listing messages in queue")
|
|
if len(msgs) != 0 {
|
|
t.Fatalf("got %d messages in queue, expected 0", len(msgs))
|
|
}
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
var qm Msg
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "listing queue")
|
|
if len(msgs) != 3 {
|
|
t.Fatalf("got msgs %v, expected 1", msgs)
|
|
}
|
|
|
|
yes := true
|
|
n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
|
|
tcheck(t, err, "requiretlsset")
|
|
tcompare(t, n, 1)
|
|
|
|
msg := msgs[0]
|
|
if msg.Attempts != 0 {
|
|
t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
|
|
}
|
|
n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
|
|
tcheck(t, err, "drop")
|
|
if n != 1 {
|
|
t.Fatalf("dropped %d, expected 1", n)
|
|
}
|
|
if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
|
|
t.Fatalf("dropped message not removed from file system")
|
|
}
|
|
|
|
// Fail a message, check the account has a message afterwards, the DSN.
|
|
n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
|
|
tcheck(t, err, "count messages in account")
|
|
tcompare(t, n, 0)
|
|
n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
|
|
tcheck(t, err, "fail")
|
|
if n != 1 {
|
|
t.Fatalf("failed %d, expected 1", n)
|
|
}
|
|
n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
|
|
tcheck(t, err, "count messages in account")
|
|
tcompare(t, n, 1)
|
|
|
|
// Check filter through various List calls. Other code uses the same filtering function.
|
|
filter := func(f Filter, expn int) {
|
|
t.Helper()
|
|
l, err := List(ctxbg, f, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, len(l), expn)
|
|
}
|
|
filter(Filter{}, 1)
|
|
filter(Filter{Account: "mjl"}, 1)
|
|
filter(Filter{Account: "bogus"}, 0)
|
|
filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
|
|
filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
|
|
filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
|
|
filter(Filter{From: "mjl@"}, 1)
|
|
filter(Filter{From: "bogus@"}, 0)
|
|
filter(Filter{To: "mjl@"}, 1)
|
|
filter(Filter{To: "bogus@"}, 0)
|
|
filter(Filter{Hold: &yes}, 0)
|
|
no := false
|
|
filter(Filter{Hold: &no}, 1)
|
|
filter(Filter{Submitted: "<now"}, 1)
|
|
filter(Filter{Submitted: ">now"}, 0)
|
|
filter(Filter{NextAttempt: "<1m"}, 1)
|
|
filter(Filter{NextAttempt: ">1m"}, 0)
|
|
var empty string
|
|
bogus := "bogus"
|
|
filter(Filter{Transport: &empty}, 1)
|
|
filter(Filter{Transport: &bogus}, 0)
|
|
|
|
next := nextWork(ctxbg, pkglog, nil)
|
|
if next > 0 {
|
|
t.Fatalf("nextWork in %s, should be now", next)
|
|
}
|
|
busy := map[string]struct{}{"mox.example": {}}
|
|
if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
|
|
t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
|
|
}
|
|
if nn := launchWork(pkglog, nil, busy); nn != 0 {
|
|
t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
|
|
}
|
|
|
|
mailDomain := dns.Domain{ASCII: "mox.example"}
|
|
mailHost := dns.Domain{ASCII: "mail.mox.example"}
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{
|
|
"mail.mox.example.": {"127.0.0.1"},
|
|
"submission.example.": {"127.0.0.1"},
|
|
},
|
|
MX: map[string][]*net.MX{
|
|
"mox.example.": {{Host: "mail.mox.example", Pref: 10}},
|
|
"other.example.": {{Host: "mail.mox.example", Pref: 10}},
|
|
},
|
|
}
|
|
|
|
// Try a failing delivery attempt.
|
|
var ndial int
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
ndial++
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
n = launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
|
|
// Wait until we see the dial and the failed attempt.
|
|
timer := time.NewTimer(time.Second)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-deliveryResults:
|
|
tcompare(t, ndial, 1)
|
|
m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
|
|
tcheck(t, err, "get")
|
|
tcompare(t, m.Attempts, 1)
|
|
case <-timer.C:
|
|
t.Fatalf("no delivery within 1s")
|
|
}
|
|
|
|
// OpenMessage.
|
|
_, err = OpenMessage(ctxbg, msg.ID+1)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
|
|
}
|
|
reader, err := OpenMessage(ctxbg, msg.ID)
|
|
tcheck(t, err, "open message")
|
|
defer reader.Close()
|
|
msgbuf, err := io.ReadAll(reader)
|
|
tcheck(t, err, "read message")
|
|
if string(msgbuf) != testmsg {
|
|
t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
|
|
}
|
|
|
|
// Reduce by more than first attempt interval of 7.5 minutes.
|
|
n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
|
|
tcheck(t, err, "kick")
|
|
if n != 0 {
|
|
t.Fatalf("kick %d, expected 0", n)
|
|
}
|
|
n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
|
|
tcheck(t, err, "kick")
|
|
if n != 1 {
|
|
t.Fatalf("kicked %d, expected 1", n)
|
|
}
|
|
|
|
nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
for _, ext := range extensions {
|
|
writeline("250-" + ext)
|
|
}
|
|
writeline("250 pipelining")
|
|
for tx := 0; tx < ntx; tx++ {
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
for i := 0; i < rcpts; i++ {
|
|
readline("rcpt")
|
|
if onercpt && i > 0 {
|
|
writeline("552 ok")
|
|
} else {
|
|
writeline("250 ok")
|
|
}
|
|
}
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
}
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
fakeSMTPServer := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 1, 1, false, nil)
|
|
}
|
|
fakeSMTPServer2Rcpts := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 2, 1, false, nil)
|
|
}
|
|
fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
|
|
}
|
|
// Server that returns an error after first recipient. We expect another
|
|
// transaction to deliver the second message.
|
|
fakeSMTPServerRcpt1 := func(server net.Conn) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 pipelining")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("552 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader = smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
|
|
moxCert := fakeCert(t, "mail.mox.example", false)
|
|
goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
|
|
makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
|
|
attempt := 0
|
|
return func(server net.Conn) {
|
|
attempt++
|
|
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 starttls")
|
|
if nstarttls == 0 || attempt <= nstarttls {
|
|
readline("starttls")
|
|
writeline("220 ok")
|
|
tlsConn := tls.Server(server, tlsConfig)
|
|
err := tlsConn.Handshake()
|
|
if err != nil {
|
|
return
|
|
}
|
|
server = tlsConn
|
|
br = bufio.NewReader(server)
|
|
|
|
readline("ehlo")
|
|
if requiretls {
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 requiretls")
|
|
} else {
|
|
writeline("250 mail.mox.example")
|
|
}
|
|
}
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
}
|
|
|
|
fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
|
|
makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
|
|
return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
|
|
}
|
|
|
|
nfakeSubmitServer := func(server net.Conn, nrcpt int) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
br.ReadString('\n') // Should be EHLO.
|
|
fmt.Fprintf(server, "250-localhost\r\n")
|
|
fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
|
|
br.ReadString('\n') // Should be AUTH PLAIN
|
|
fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
|
|
br.ReadString('\n') // Should be MAIL FROM.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
for i := 0; i < nrcpt; i++ {
|
|
br.ReadString('\n') // Should be RCPT TO.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
}
|
|
br.ReadString('\n') // Should be DATA.
|
|
fmt.Fprintf(server, "354 continue\r\n")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be QUIT.
|
|
fmt.Fprintf(server, "221 ok\r\n")
|
|
}
|
|
fakeSubmitServer := func(server net.Conn) {
|
|
nfakeSubmitServer(server, 1)
|
|
}
|
|
fakeSubmitServer2Rcpts := func(server net.Conn) {
|
|
nfakeSubmitServer(server, 2)
|
|
}
|
|
|
|
testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
|
|
t.Helper()
|
|
|
|
var pipes []net.Conn
|
|
defer func() {
|
|
for _, conn := range pipes {
|
|
conn.Close()
|
|
}
|
|
}()
|
|
|
|
var connMu sync.Mutex
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
connMu.Lock()
|
|
defer connMu.Unlock()
|
|
|
|
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
|
|
// client-side to the invocation dial, for the attempted delivery from the queue.
|
|
server, client := net.Pipe()
|
|
pipes = append(pipes, server, client)
|
|
go fakeServer(server)
|
|
|
|
_, wasNetDialer = dialer.(*net.Dialer)
|
|
|
|
return client, nil
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
|
|
tcheck(t, err, "get inbox")
|
|
|
|
inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
|
|
tcheck(t, err, "querying messages in inbox")
|
|
|
|
launchWork(pkglog, resolver, map[string]struct{}{})
|
|
|
|
// Wait for all results.
|
|
timer.Reset(time.Second)
|
|
for i := 0; i < nresults; i++ {
|
|
select {
|
|
case <-deliveryResults:
|
|
case <-timer.C:
|
|
t.Fatalf("no dial within 1s")
|
|
}
|
|
}
|
|
|
|
// Check that queue is now empty.
|
|
xmsgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list queue")
|
|
tcompare(t, len(xmsgs), 0)
|
|
|
|
// And that we possibly got a DSN delivered.
|
|
ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
|
|
tcheck(t, err, "querying messages in inbox")
|
|
if expectDSN && ninbox != inboxCount+1 {
|
|
t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
|
|
} else if !expectDSN && ninbox != inboxCount {
|
|
t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
|
|
}
|
|
|
|
return wasNetDialer
|
|
}
|
|
testDeliver := func(fakeServer func(conn net.Conn)) bool {
|
|
t.Helper()
|
|
return testQueue(false, fakeServer, 1)
|
|
}
|
|
testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
|
|
t.Helper()
|
|
return testQueue(false, fakeServer, nresults)
|
|
}
|
|
testDSN := func(fakeServer func(conn net.Conn)) bool {
|
|
t.Helper()
|
|
return testQueue(true, fakeServer, 1)
|
|
}
|
|
|
|
// Test direct delivery.
|
|
wasNetDialer := testDeliver(fakeSMTPServer)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Single delivery to two recipients at same domain, expecting single connection
|
|
// and single transaction.
|
|
qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
qml := []Msg{qm0, qm0} // Same NextAttempt.
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServer2Rcpts)
|
|
|
|
// Single enqueue to two recipients at different domain, expecting two connections.
|
|
otheraddr, _ := smtp.ParseAddress("mjl@other.example")
|
|
otherpath := otheraddr.Path()
|
|
t0 := time.Now()
|
|
qml = []Msg{
|
|
MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
|
|
MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
|
|
}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
conns := ConnectionCounter()
|
|
testDeliverN(fakeSMTPServer, 2)
|
|
nconns := ConnectionCounter()
|
|
if nconns != conns+2 {
|
|
t.Errorf("saw %d connections, expected 2", nconns-conns)
|
|
}
|
|
|
|
// Single enqueue with two recipients at same domain, but with smtp server that has
|
|
// LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
|
|
qml = []Msg{qm0, qm0}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServerLimitRcpt1)
|
|
|
|
// Single enqueue with two recipients at same domain, but smtp server sends 552 for
|
|
// 2nd recipient, so we expect a single connection with two transactions.
|
|
qml = []Msg{qm0, qm0}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServerRcpt1)
|
|
|
|
// Add a message to be delivered with submit because of its route.
|
|
topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
|
|
qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
wasNetDialer = testDeliver(fakeSubmitServer)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Two messages for submission.
|
|
qml = []Msg{qm, qm}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
transportSubmitTLS := "submittls"
|
|
n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
|
|
tcheck(t, err, "set transport")
|
|
if n != 1 {
|
|
t.Fatalf("TransportSet changed %d messages, expected 1", n)
|
|
}
|
|
// Make fake cert, and make it trusted.
|
|
cert := fakeCert(t, "submission.example", false)
|
|
mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
|
|
mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
|
|
tlsConfig := tls.Config{
|
|
Certificates: []tls.Certificate{cert},
|
|
}
|
|
wasNetDialer = testDeliver(func(conn net.Conn) {
|
|
conn = tls.Server(conn, &tlsConfig)
|
|
fakeSubmitServer(conn)
|
|
})
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Various failure reasons.
|
|
fdNotTrusted := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultCertificateNotTrusted,
|
|
SendingMTAIP: "", // Missing due to pipe.
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
ReceivingMXHelo: "mail.mox.example",
|
|
ReceivingIP: "", // Missing due to pipe.
|
|
FailedSessionCount: 1,
|
|
FailureReasonCode: "",
|
|
}
|
|
fdTLSAUnusable := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultTLSAInvalid,
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
FailedSessionCount: 0,
|
|
FailureReasonCode: "all-unusable-records+ignored",
|
|
}
|
|
fdBadProtocol := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultValidationFailure,
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
ReceivingMXHelo: "mail.mox.example",
|
|
FailedSessionCount: 1,
|
|
FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
|
|
}
|
|
|
|
// Add a message to be delivered with socks.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
|
|
tcheck(t, err, "TransportSet")
|
|
if n != 1 {
|
|
t.Fatalf("TransportSet changed %d messages, expected 1", n)
|
|
}
|
|
kick(1, qml[0].ID)
|
|
wasNetDialer = testDeliver(fakeSMTPServer)
|
|
if wasNetDialer {
|
|
t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
|
|
}
|
|
|
|
// Add message to be delivered with opportunistic TLS verification.
|
|
clearTLSResults(t)
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
|
|
|
|
// Test fallback to plain text with TLS handshake fails.
|
|
clearTLSResults(t)
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
|
|
|
|
// Add message to be delivered with DANE verification.
|
|
clearTLSResults(t)
|
|
resolver.AllAuthentic = true
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
|
|
|
|
// We should know starttls/requiretls by now.
|
|
rdt := store.RecipientDomainTLS{Domain: "mox.example"}
|
|
err = acc.DB.Get(ctxbg, &rdt)
|
|
tcheck(t, err, "get recipientdomaintls")
|
|
tcompare(t, rdt.STARTTLS, true)
|
|
tcompare(t, rdt.RequireTLS, true)
|
|
|
|
// Add message to be delivered with verified TLS and REQUIRETLS.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
|
|
// Check that message is delivered with all unusable DANE records.
|
|
clearTLSResults(t)
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
|
|
|
|
// Check that message is delivered with insecure TLSA records. They should be
|
|
// ignored and regular STARTTLS tried.
|
|
clearTLSResults(t)
|
|
resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
resolver.Inauthentic = nil
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
|
|
|
|
// STARTTLS failed, so not known supported.
|
|
rdt = store.RecipientDomainTLS{Domain: "mox.example"}
|
|
err = acc.DB.Get(ctxbg, &rdt)
|
|
tcheck(t, err, "get recipientdomaintls")
|
|
tcompare(t, rdt.STARTTLS, false)
|
|
tcompare(t, rdt.RequireTLS, false)
|
|
|
|
// Check that message is delivered with TLS-Required: No and non-matching DANE record.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
|
|
// Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
|
|
// Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDSN(makeBadFakeSMTPSTARTTLSServer(false))
|
|
|
|
// Restore pre-DANE behaviour.
|
|
resolver.AllAuthentic = false
|
|
resolver.TLSA = nil
|
|
|
|
// Add message with requiretls that fails immediately due to no verification policy for recipient domain.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
// Based on DNS lookups, there won't be any dialing or SMTP connection.
|
|
testDSN(func(conn net.Conn) {})
|
|
|
|
// Add another message that we'll fail to deliver entirely.
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list queue")
|
|
if len(msgs) != 1 {
|
|
t.Fatalf("queue has %d messages, expected 1", len(msgs))
|
|
}
|
|
msg = msgs[0]
|
|
|
|
prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
|
|
server, client := net.Pipe()
|
|
go func() {
|
|
fn(server)
|
|
server.Close()
|
|
}()
|
|
return client, func() {
|
|
server.Close()
|
|
client.Close()
|
|
}
|
|
}
|
|
|
|
conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
|
|
conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
|
|
conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
|
|
defer func() {
|
|
cleanup2()
|
|
cleanup3()
|
|
cleanup4()
|
|
}()
|
|
|
|
seq := 0
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
seq++
|
|
switch seq {
|
|
default:
|
|
return nil, fmt.Errorf("connect error from test")
|
|
case 2:
|
|
return conn2, nil
|
|
case 3:
|
|
return conn3, nil
|
|
case 4:
|
|
return conn4, nil
|
|
}
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
comm := store.RegisterComm(acc)
|
|
defer comm.Unregister()
|
|
|
|
for i := 1; i < 8; i++ {
|
|
if i == 4 {
|
|
resolver.AllAuthentic = true
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
// Non-matching zero CertAssoc, should cause failure.
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
|
|
},
|
|
}
|
|
} else {
|
|
resolver.AllAuthentic = false
|
|
resolver.TLSA = nil
|
|
}
|
|
go deliver(pkglog, resolver, msg)
|
|
<-deliveryResults
|
|
err = DB.Get(ctxbg, &msg)
|
|
tcheck(t, err, "get msg")
|
|
if msg.Attempts != i {
|
|
t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
|
|
}
|
|
if msg.Attempts == 5 {
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trigger final failure.
|
|
go deliver(pkglog, resolver, msg)
|
|
<-deliveryResults
|
|
err = DB.Get(ctxbg, &msg)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
|
|
}
|
|
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
|
|
// We shouldn't have any more work to do.
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages at end of test")
|
|
tcompare(t, len(msgs), 0)
|
|
}
|
|
|
|
func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
|
|
result.Summary.TotalSuccessfulSessionCount += success
|
|
result.Summary.TotalFailureSessionCount += failure
|
|
return result
|
|
}
|
|
|
|
func clearTLSResults(t *testing.T) {
|
|
_, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
|
|
tcheck(t, err, "delete tls results")
|
|
}
|
|
|
|
func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
|
|
t.Helper()
|
|
q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
|
|
q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
|
|
result, err := q.Get()
|
|
tcheck(t, err, "get tls result")
|
|
tcompare(t, result.RecipientDomain, expRecipientDomain)
|
|
tcompare(t, result.IsHost, expIsHost)
|
|
|
|
// Before comparing, compensate for go1.20 vs go1.21 difference.
|
|
for i, r := range result.Results {
|
|
for j, fd := range r.FailureDetails {
|
|
if fd.FailureReasonCode == "tls-remote-alert-70" {
|
|
result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
|
|
}
|
|
}
|
|
}
|
|
tcompare(t, result.Results, expResults)
|
|
}
|
|
|
|
// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
|
|
// retired list if configured, with a proper result, that webhooks are scheduled,
|
|
// and that cleaning up works.
|
|
func TestRetiredHooks(t *testing.T) {
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
|
|
addr, err := smtp.ParseAddress("mjl@mox.example")
|
|
tcheck(t, err, "parse address")
|
|
path := addr.Path()
|
|
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
|
|
testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
|
|
t.Helper()
|
|
|
|
_, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
|
|
tcheck(t, err, "clearing retired messages")
|
|
_, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
|
|
tcheck(t, err, "clearing hooks")
|
|
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
qm.Extra = map[string]string{"a": "123"}
|
|
err = Add(ctxbg, pkglog, account, mf, qm)
|
|
tcheck(t, err, "add to queue")
|
|
|
|
action()
|
|
|
|
// Should be no messages left in queue.
|
|
msgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, len(msgs), 0)
|
|
|
|
retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
|
|
tcheck(t, err, "list hooks")
|
|
if expResult == nil {
|
|
tcompare(t, len(retireds), 0)
|
|
tcompare(t, len(hooks), 0)
|
|
} else {
|
|
tcompare(t, len(retireds), 1)
|
|
mr := retireds[0]
|
|
tcompare(t, len(mr.Results) > 0, true)
|
|
lr := mr.LastResult()
|
|
lr.Start = time.Time{}
|
|
lr.Duration = 0
|
|
tcompare(t, lr.Error == "", expResult.Error == "")
|
|
lr.Error = expResult.Error
|
|
tcompare(t, lr, *expResult)
|
|
|
|
// Compare added webhook.
|
|
tcompare(t, len(hooks), 1)
|
|
h := hooks[0]
|
|
var out webhook.Outgoing
|
|
dec := json.NewDecoder(strings.NewReader(h.Payload))
|
|
dec.DisallowUnknownFields()
|
|
err := dec.Decode(&out)
|
|
tcheck(t, err, "unmarshal outgoing webhook payload")
|
|
tcompare(t, out.Error == "", expResult.Error == "")
|
|
out.WebhookQueued = time.Time{}
|
|
out.Error = ""
|
|
var ecode string
|
|
if expResult.Secode != "" {
|
|
ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
|
|
}
|
|
var code int // Only set for errors.
|
|
if expResult.Code != 250 {
|
|
code = expResult.Code
|
|
}
|
|
expOut := webhook.Outgoing{
|
|
Event: webhook.OutgoingEvent(expEvent),
|
|
Suppressing: expSuppressing,
|
|
QueueMsgID: mr.ID,
|
|
FromID: mr.FromID,
|
|
MessageID: mr.MessageID,
|
|
Subject: mr.Subject,
|
|
SMTPCode: code,
|
|
SMTPEnhancedCode: ecode,
|
|
Extra: mr.Extra,
|
|
}
|
|
tcompare(t, out, expOut)
|
|
h.ID = 0
|
|
h.Payload = ""
|
|
h.Submitted = time.Time{}
|
|
h.NextAttempt = time.Time{}
|
|
exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
|
|
tcompare(t, h, exph)
|
|
}
|
|
}
|
|
|
|
makeLaunchAction := func(handler func(conn net.Conn)) func() {
|
|
return func() {
|
|
server, client := net.Pipe()
|
|
defer server.Close()
|
|
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
go handler(server)
|
|
return client, nil
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
// Trigger delivery attempt.
|
|
n := launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
|
|
// Wait until delivery has finished.
|
|
tm := time.NewTimer(5 * time.Second)
|
|
defer tm.Stop()
|
|
select {
|
|
case <-tm.C:
|
|
t.Fatalf("delivery didn't happen within 5s")
|
|
case <-deliveryResults:
|
|
}
|
|
}
|
|
}
|
|
|
|
smtpAccept := func(conn net.Conn) {
|
|
br := bufio.NewReader(conn)
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(conn, "%s\r\n", s)
|
|
}
|
|
|
|
writeline("220 mail.mox.example")
|
|
readline("ehlo")
|
|
writeline("250 mail.mox.example")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
readline("quit")
|
|
writeline("250 ok")
|
|
}
|
|
smtpReject := func(code int) func(conn net.Conn) {
|
|
return func(conn net.Conn) {
|
|
br := bufio.NewReader(conn)
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(conn, "%s\r\n", s)
|
|
}
|
|
|
|
writeline("220 mail.mox.example")
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 enhancedstatuscodes")
|
|
|
|
readline("mail")
|
|
writeline(fmt.Sprintf("%d 5.1.0 nok", code))
|
|
readline("quit")
|
|
writeline("250 ok")
|
|
}
|
|
}
|
|
|
|
testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
|
|
testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{Code: 250, Success: true}, string(webhook.EventDelivered), false)
|
|
// 554 is generic, doesn't immediately cause suppression.
|
|
testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
|
|
testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
|
|
// 550 causes immediate suppression, check for it in webhook.
|
|
testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
|
|
testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
|
|
// Try to deliver to suppressed addresses.
|
|
launch := func() {
|
|
n := launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
<-deliveryResults
|
|
}
|
|
testAction("mjl", launch, nil, "", false)
|
|
testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
|
|
|
|
queueFail := func() {
|
|
n, err := Fail(ctxbg, pkglog, Filter{})
|
|
tcheck(t, err, "cancel delivery with failure dsn")
|
|
tcompare(t, n, 1)
|
|
}
|
|
queueDrop := func() {
|
|
n, err := Drop(ctxbg, pkglog, Filter{})
|
|
tcheck(t, err, "cancel delivery without failure dsn")
|
|
tcompare(t, n, 1)
|
|
}
|
|
testAction("mjl", queueFail, nil, "", false)
|
|
testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
|
|
testAction("mjl", queueDrop, nil, "", false)
|
|
testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
|
|
|
|
retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
tcompare(t, len(retireds), 1)
|
|
|
|
cleanupMsgRetiredSingle(pkglog)
|
|
retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
tcompare(t, len(retireds), 0)
|
|
}
|
|
|
|
// test Start and that it attempts to deliver.
|
|
func TestQueueStart(t *testing.T) {
|
|
// Override dial function. We'll make connecting fail and check the attempt.
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
dialed := make(chan struct{}, 1)
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
dialed <- struct{}{}
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
|
|
done := make(chan struct{})
|
|
defer func() {
|
|
mox.ShutdownCancel()
|
|
// Wait for message and hooks deliverers and cleaners.
|
|
<-done
|
|
<-done
|
|
<-done
|
|
<-done
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
}()
|
|
Shutdown() // DB was opened already. Start will open it again. Just close it before.
|
|
err := Start(resolver, done)
|
|
tcheck(t, err, "queue start")
|
|
|
|
checkDialed := func(need bool) {
|
|
t.Helper()
|
|
d := time.Second / 10
|
|
if need {
|
|
d = time.Second
|
|
}
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-dialed:
|
|
if !need {
|
|
t.Fatalf("unexpected dial attempt")
|
|
}
|
|
case <-timer.C:
|
|
if need {
|
|
t.Fatalf("expected to see a dial attempt")
|
|
}
|
|
}
|
|
}
|
|
|
|
// HoldRule to mark mark all messages sent by mjl on hold, including existing
|
|
// messages.
|
|
hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
|
|
tcheck(t, err, "add hold rule")
|
|
|
|
// All zero HoldRule holds all deliveries, and marks all on hold.
|
|
hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
|
|
tcheck(t, err, "add hold rule")
|
|
|
|
hrl, err := HoldRuleList(ctxbg)
|
|
tcheck(t, err, "listing hold rules")
|
|
tcompare(t, hrl, []HoldRule{hr0, hr1})
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
checkDialed(false) // No delivery attempt yet.
|
|
|
|
n, err := Count(ctxbg)
|
|
tcheck(t, err, "count messages in queue")
|
|
tcompare(t, n, 1)
|
|
|
|
// Take message off hold.
|
|
n, err = HoldSet(ctxbg, Filter{}, false)
|
|
tcheck(t, err, "taking message off hold")
|
|
tcompare(t, n, 1)
|
|
checkDialed(true)
|
|
|
|
// Remove hold rules.
|
|
err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
|
|
tcheck(t, err, "removing hold rule")
|
|
err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
|
|
tcheck(t, err, "removing hold rule")
|
|
// Check it is gone.
|
|
hrl, err = HoldRuleList(ctxbg)
|
|
tcheck(t, err, "listing hold rules")
|
|
tcompare(t, len(hrl), 0)
|
|
|
|
// Don't change message nextattempt time, but kick queue. Message should not be delivered.
|
|
msgqueueKick()
|
|
checkDialed(false)
|
|
|
|
// Set new next attempt, should see another attempt.
|
|
n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
|
|
tcheck(t, err, "kick queue")
|
|
if n != 1 {
|
|
t.Fatalf("kick changed %d messages, expected 1", n)
|
|
}
|
|
checkDialed(true)
|
|
|
|
// Submit another, should be delivered immediately without HoldRule.
|
|
path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf = prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
checkDialed(true) // Immediate.
|
|
}
|
|
|
|
func TestListFilterSort(t *testing.T) {
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
|
|
// insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
now := time.Now().Round(0)
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
|
|
qm.Queued = now
|
|
qm1 := qm
|
|
qm1.Queued = now.Add(-time.Second)
|
|
qm1.NextAttempt = now.Add(time.Minute)
|
|
qml := []Msg{qm, qm, qm, qm, qm, qm1}
|
|
err := Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue")
|
|
qm1 = qml[len(qml)-1]
|
|
|
|
qmlrev := slices.Clone(qml)
|
|
slices.Reverse(qmlrev)
|
|
|
|
// Ascending by nextattempt,id.
|
|
l, err := List(ctxbg, Filter{}, Sort{Asc: true})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, l, qml)
|
|
|
|
// Descending by nextattempt,id.
|
|
l, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, l, qmlrev)
|
|
|
|
// Descending by queued,id.
|
|
l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
|
|
tcheck(t, err, "list messages")
|
|
ql := append(append([]Msg{}, qmlrev[1:]...), qml[5])
|
|
tcompare(t, l, ql)
|
|
|
|
// Filter by all fields to get a single.
|
|
no := false
|
|
allfilters := Filter{
|
|
Max: 2,
|
|
IDs: []int64{qm1.ID},
|
|
Account: "mjl",
|
|
From: path.XString(true),
|
|
To: path.XString(true),
|
|
Hold: &no,
|
|
Submitted: "<1s",
|
|
NextAttempt: ">1s",
|
|
}
|
|
l, err = List(ctxbg, allfilters, Sort{})
|
|
tcheck(t, err, "list single")
|
|
tcompare(t, l, []Msg{qm1})
|
|
|
|
// Paginated NextAttmpt asc.
|
|
var lastID int64
|
|
var last any
|
|
l = nil
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, qml)
|
|
|
|
// Paginated NextAttempt desc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, qmlrev)
|
|
|
|
// Paginated Queued desc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, ql)
|
|
|
|
// Paginated Queued asc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
|
|
}
|
|
qlrev := slices.Clone(ql)
|
|
slices.Reverse(qlrev)
|
|
tcompare(t, l, qlrev)
|
|
|
|
// Retire messages and do similar but more basic tests. The code is similar.
|
|
var mrl []MsgRetired
|
|
err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
|
|
for _, m := range qml {
|
|
mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
|
|
err := tx.Insert(&mr)
|
|
tcheck(t, err, "inserting retired message")
|
|
mrl = append(mrl, mr)
|
|
}
|
|
return nil
|
|
})
|
|
tcheck(t, err, "adding retired messages")
|
|
|
|
// Paginated LastActivity desc.
|
|
var lr []MsgRetired
|
|
lastID = 0
|
|
last = ""
|
|
l = nil
|
|
for {
|
|
nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
lr = append(lr, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
|
|
}
|
|
mrlrev := slices.Clone(mrl)
|
|
slices.Reverse(mrlrev)
|
|
tcompare(t, lr, mrlrev)
|
|
|
|
// Filter by all fields to get a single.
|
|
allretiredfilters := RetiredFilter{
|
|
Max: 2,
|
|
IDs: []int64{mrlrev[0].ID},
|
|
Account: "mjl",
|
|
From: path.XString(true),
|
|
To: path.XString(true),
|
|
Submitted: "<1s",
|
|
LastActivity: ">1s",
|
|
}
|
|
lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
|
|
tcheck(t, err, "list single")
|
|
tcompare(t, lr, []MsgRetired{mrlrev[0]})
|
|
}
|
|
|
|
// Just a cert that appears valid.
|
|
func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
|
|
notAfter := time.Now()
|
|
if expired {
|
|
notAfter = notAfter.Add(-time.Hour)
|
|
} else {
|
|
notAfter = notAfter.Add(time.Hour)
|
|
}
|
|
|
|
privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
|
|
template := &x509.Certificate{
|
|
SerialNumber: big.NewInt(1), // Required field...
|
|
DNSNames: []string{name},
|
|
NotBefore: time.Now().Add(-time.Hour),
|
|
NotAfter: notAfter,
|
|
}
|
|
localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
|
|
if err != nil {
|
|
t.Fatalf("making certificate: %s", err)
|
|
}
|
|
cert, err := x509.ParseCertificate(localCertBuf)
|
|
if err != nil {
|
|
t.Fatalf("parsing generated certificate: %s", err)
|
|
}
|
|
c := tls.Certificate{
|
|
Certificate: [][]byte{localCertBuf},
|
|
PrivateKey: privKey,
|
|
Leaf: cert,
|
|
}
|
|
return c
|
|
}
|