mirror of
https://github.com/mjl-/mox.git
synced 2024-12-26 16:33:47 +03:00
1cf7477642
instead of skipping on any smtp and delivering messages to accounts. we dial the ip of the smtp listener, which is localhost:1025 by default. the smtp server now uses a mock dns resolver during spf & dkim verification for hosted domains (localhost by default), so they should pass. the advantage is that we get regular full smtp server behaviour for delivering in localserve, including webhooks, and potential first-time sender delays (though this is disabled by default now). incoming deliveries now go through normal address resolution, where before we would always deliver to mox@localhost. we still accept email for unknown recipients to mox@localhost. this will be useful upcoming alias/list functionality. localserve will now generate a dkim key when creating a new config. existing users may wish to reset (remove) their localserve directory, or add a dkim key.
1489 lines
47 KiB
Go
1489 lines
47 KiB
Go
package queue
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/ed25519"
|
|
cryptorand "crypto/rand"
|
|
"crypto/sha256"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math/big"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/mjl-/adns"
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/dns"
|
|
"github.com/mjl-/mox/mlog"
|
|
"github.com/mjl-/mox/mox-"
|
|
"github.com/mjl-/mox/smtp"
|
|
"github.com/mjl-/mox/smtpclient"
|
|
"github.com/mjl-/mox/store"
|
|
"github.com/mjl-/mox/tlsrpt"
|
|
"github.com/mjl-/mox/tlsrptdb"
|
|
"github.com/mjl-/mox/webhook"
|
|
)
|
|
|
|
var ctxbg = context.Background()
|
|
var pkglog = mlog.New("queue", nil)
|
|
|
|
func tcheck(t *testing.T, err error, msg string) {
|
|
if err != nil {
|
|
t.Helper()
|
|
t.Fatalf("%s: %s", msg, err)
|
|
}
|
|
}
|
|
|
|
func tcompare(t *testing.T, got, exp any) {
|
|
t.Helper()
|
|
if !reflect.DeepEqual(got, exp) {
|
|
t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
|
|
}
|
|
}
|
|
|
|
func setup(t *testing.T) (*store.Account, func()) {
|
|
// Prepare config so email can be delivered to mjl@mox.example.
|
|
os.RemoveAll("../testdata/queue/data")
|
|
log := mlog.New("queue", nil)
|
|
mox.Context = ctxbg
|
|
mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
|
|
mox.MustLoadConfig(true, false)
|
|
acc, err := store.OpenAccount(log, "mjl")
|
|
tcheck(t, err, "open account")
|
|
err = acc.SetPassword(log, "testtest")
|
|
tcheck(t, err, "set password")
|
|
switchStop := store.Switchboard()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
return acc, func() {
|
|
acc.Close()
|
|
acc.CheckClosed()
|
|
mox.ShutdownCancel()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
Shutdown()
|
|
switchStop()
|
|
}
|
|
}
|
|
|
|
var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
|
|
To: <mjl@mox.example>
|
|
Subject: test
|
|
|
|
test email
|
|
`, "\n", "\r\n")
|
|
|
|
func prepareFile(t *testing.T) *os.File {
|
|
t.Helper()
|
|
msgFile, err := store.CreateMessageTemp(pkglog, "queue")
|
|
tcheck(t, err, "create temp message for delivery to queue")
|
|
_, err = msgFile.Write([]byte(testmsg))
|
|
tcheck(t, err, "write message file")
|
|
return msgFile
|
|
}
|
|
|
|
func TestQueue(t *testing.T) {
|
|
acc, cleanup := setup(t)
|
|
defer cleanup()
|
|
err := Init()
|
|
tcheck(t, err, "queue init")
|
|
|
|
idfilter := func(msgID int64) Filter {
|
|
return Filter{IDs: []int64{msgID}}
|
|
}
|
|
|
|
kick := func(expn int, id int64) {
|
|
t.Helper()
|
|
n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
|
|
tcheck(t, err, "kick queue")
|
|
if n != expn {
|
|
t.Fatalf("kick changed %d messages, expected %d", n, expn)
|
|
}
|
|
}
|
|
|
|
msgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "listing messages in queue")
|
|
if len(msgs) != 0 {
|
|
t.Fatalf("got %d messages in queue, expected 0", len(msgs))
|
|
}
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
var qm Msg
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "listing queue")
|
|
if len(msgs) != 3 {
|
|
t.Fatalf("got msgs %v, expected 1", msgs)
|
|
}
|
|
|
|
yes := true
|
|
n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
|
|
tcheck(t, err, "requiretlsset")
|
|
tcompare(t, n, 1)
|
|
|
|
msg := msgs[0]
|
|
if msg.Attempts != 0 {
|
|
t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
|
|
}
|
|
n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
|
|
tcheck(t, err, "drop")
|
|
if n != 1 {
|
|
t.Fatalf("dropped %d, expected 1", n)
|
|
}
|
|
if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
|
|
t.Fatalf("dropped message not removed from file system")
|
|
}
|
|
|
|
// Fail a message, check the account has a message afterwards, the DSN.
|
|
n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
|
|
tcheck(t, err, "count messages in account")
|
|
tcompare(t, n, 0)
|
|
n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
|
|
tcheck(t, err, "fail")
|
|
if n != 1 {
|
|
t.Fatalf("failed %d, expected 1", n)
|
|
}
|
|
n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
|
|
tcheck(t, err, "count messages in account")
|
|
tcompare(t, n, 1)
|
|
|
|
// Check filter through various List calls. Other code uses the same filtering function.
|
|
filter := func(f Filter, expn int) {
|
|
t.Helper()
|
|
l, err := List(ctxbg, f, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, len(l), expn)
|
|
}
|
|
filter(Filter{}, 1)
|
|
filter(Filter{Account: "mjl"}, 1)
|
|
filter(Filter{Account: "bogus"}, 0)
|
|
filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
|
|
filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
|
|
filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
|
|
filter(Filter{From: "mjl@"}, 1)
|
|
filter(Filter{From: "bogus@"}, 0)
|
|
filter(Filter{To: "mjl@"}, 1)
|
|
filter(Filter{To: "bogus@"}, 0)
|
|
filter(Filter{Hold: &yes}, 0)
|
|
no := false
|
|
filter(Filter{Hold: &no}, 1)
|
|
filter(Filter{Submitted: "<now"}, 1)
|
|
filter(Filter{Submitted: ">now"}, 0)
|
|
filter(Filter{NextAttempt: "<1m"}, 1)
|
|
filter(Filter{NextAttempt: ">1m"}, 0)
|
|
var empty string
|
|
bogus := "bogus"
|
|
filter(Filter{Transport: &empty}, 1)
|
|
filter(Filter{Transport: &bogus}, 0)
|
|
|
|
next := nextWork(ctxbg, pkglog, nil)
|
|
if next > 0 {
|
|
t.Fatalf("nextWork in %s, should be now", next)
|
|
}
|
|
busy := map[string]struct{}{"mox.example": {}}
|
|
if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
|
|
t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
|
|
}
|
|
if nn := launchWork(pkglog, nil, busy); nn != 0 {
|
|
t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
|
|
}
|
|
|
|
mailDomain := dns.Domain{ASCII: "mox.example"}
|
|
mailHost := dns.Domain{ASCII: "mail.mox.example"}
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{
|
|
"mail.mox.example.": {"127.0.0.1"},
|
|
"submission.example.": {"127.0.0.1"},
|
|
},
|
|
MX: map[string][]*net.MX{
|
|
"mox.example.": {{Host: "mail.mox.example", Pref: 10}},
|
|
"other.example.": {{Host: "mail.mox.example", Pref: 10}},
|
|
},
|
|
}
|
|
|
|
// Try a failing delivery attempt.
|
|
var ndial int
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
ndial++
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
n = launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
|
|
// Wait until we see the dial and the failed attempt.
|
|
timer := time.NewTimer(time.Second)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-deliveryResults:
|
|
tcompare(t, ndial, 1)
|
|
m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
|
|
tcheck(t, err, "get")
|
|
tcompare(t, m.Attempts, 1)
|
|
case <-timer.C:
|
|
t.Fatalf("no delivery within 1s")
|
|
}
|
|
|
|
// OpenMessage.
|
|
_, err = OpenMessage(ctxbg, msg.ID+1)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
|
|
}
|
|
reader, err := OpenMessage(ctxbg, msg.ID)
|
|
tcheck(t, err, "open message")
|
|
defer reader.Close()
|
|
msgbuf, err := io.ReadAll(reader)
|
|
tcheck(t, err, "read message")
|
|
if string(msgbuf) != testmsg {
|
|
t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
|
|
}
|
|
|
|
// Reduce by more than first attempt interval of 7.5 minutes.
|
|
n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
|
|
tcheck(t, err, "kick")
|
|
if n != 0 {
|
|
t.Fatalf("kick %d, expected 0", n)
|
|
}
|
|
n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
|
|
tcheck(t, err, "kick")
|
|
if n != 1 {
|
|
t.Fatalf("kicked %d, expected 1", n)
|
|
}
|
|
|
|
nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
for _, ext := range extensions {
|
|
writeline("250-" + ext)
|
|
}
|
|
writeline("250 pipelining")
|
|
for tx := 0; tx < ntx; tx++ {
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
for i := 0; i < rcpts; i++ {
|
|
readline("rcpt")
|
|
if onercpt && i > 0 {
|
|
writeline("552 ok")
|
|
} else {
|
|
writeline("250 ok")
|
|
}
|
|
}
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
}
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
fakeSMTPServer := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 1, 1, false, nil)
|
|
}
|
|
fakeSMTPServer2Rcpts := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 2, 1, false, nil)
|
|
}
|
|
fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
|
|
nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
|
|
}
|
|
// Server that returns an error after first recipient. We expect another
|
|
// transaction to deliver the second message.
|
|
fakeSMTPServerRcpt1 := func(server net.Conn) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 pipelining")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("552 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader = smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
|
|
moxCert := fakeCert(t, "mail.mox.example", false)
|
|
goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
|
|
makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
|
|
attempt := 0
|
|
return func(server net.Conn) {
|
|
attempt++
|
|
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(server, "%s\r\n", s)
|
|
}
|
|
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 starttls")
|
|
if nstarttls == 0 || attempt <= nstarttls {
|
|
readline("starttls")
|
|
writeline("220 ok")
|
|
tlsConn := tls.Server(server, tlsConfig)
|
|
err := tlsConn.Handshake()
|
|
if err != nil {
|
|
return
|
|
}
|
|
server = tlsConn
|
|
br = bufio.NewReader(server)
|
|
|
|
readline("ehlo")
|
|
if requiretls {
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 requiretls")
|
|
} else {
|
|
writeline("250 mail.mox.example")
|
|
}
|
|
}
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
readline("quit")
|
|
writeline("221 ok")
|
|
}
|
|
}
|
|
|
|
fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
|
|
makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
|
|
return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
|
|
}
|
|
|
|
nfakeSubmitServer := func(server net.Conn, nrcpt int) {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
|
|
// cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mail.mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
br.ReadString('\n') // Should be EHLO.
|
|
fmt.Fprintf(server, "250-localhost\r\n")
|
|
fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
|
|
br.ReadString('\n') // Should be AUTH PLAIN
|
|
fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
|
|
br.ReadString('\n') // Should be MAIL FROM.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
for i := 0; i < nrcpt; i++ {
|
|
br.ReadString('\n') // Should be RCPT TO.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
}
|
|
br.ReadString('\n') // Should be DATA.
|
|
fmt.Fprintf(server, "354 continue\r\n")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be QUIT.
|
|
fmt.Fprintf(server, "221 ok\r\n")
|
|
}
|
|
fakeSubmitServer := func(server net.Conn) {
|
|
nfakeSubmitServer(server, 1)
|
|
}
|
|
fakeSubmitServer2Rcpts := func(server net.Conn) {
|
|
nfakeSubmitServer(server, 2)
|
|
}
|
|
|
|
testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
|
|
t.Helper()
|
|
|
|
var pipes []net.Conn
|
|
defer func() {
|
|
for _, conn := range pipes {
|
|
conn.Close()
|
|
}
|
|
}()
|
|
|
|
var connMu sync.Mutex
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
connMu.Lock()
|
|
defer connMu.Unlock()
|
|
|
|
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
|
|
// client-side to the invocation dial, for the attempted delivery from the queue.
|
|
server, client := net.Pipe()
|
|
pipes = append(pipes, server, client)
|
|
go fakeServer(server)
|
|
|
|
_, wasNetDialer = dialer.(*net.Dialer)
|
|
|
|
return client, nil
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
|
|
tcheck(t, err, "get inbox")
|
|
|
|
inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
|
|
tcheck(t, err, "querying messages in inbox")
|
|
|
|
launchWork(pkglog, resolver, map[string]struct{}{})
|
|
|
|
// Wait for all results.
|
|
timer.Reset(time.Second)
|
|
for i := 0; i < nresults; i++ {
|
|
select {
|
|
case <-deliveryResults:
|
|
case <-timer.C:
|
|
t.Fatalf("no dial within 1s")
|
|
}
|
|
}
|
|
|
|
// Check that queue is now empty.
|
|
xmsgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list queue")
|
|
tcompare(t, len(xmsgs), 0)
|
|
|
|
// And that we possibly got a DSN delivered.
|
|
ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
|
|
tcheck(t, err, "querying messages in inbox")
|
|
if expectDSN && ninbox != inboxCount+1 {
|
|
t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
|
|
} else if !expectDSN && ninbox != inboxCount {
|
|
t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
|
|
}
|
|
|
|
return wasNetDialer
|
|
}
|
|
testDeliver := func(fakeServer func(conn net.Conn)) bool {
|
|
t.Helper()
|
|
return testQueue(false, fakeServer, 1)
|
|
}
|
|
testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
|
|
t.Helper()
|
|
return testQueue(false, fakeServer, nresults)
|
|
}
|
|
testDSN := func(fakeServer func(conn net.Conn)) bool {
|
|
t.Helper()
|
|
return testQueue(true, fakeServer, 1)
|
|
}
|
|
|
|
// Test direct delivery.
|
|
wasNetDialer := testDeliver(fakeSMTPServer)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Single delivery to two recipients at same domain, expecting single connection
|
|
// and single transaction.
|
|
qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
qml := []Msg{qm0, qm0} // Same NextAttempt.
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServer2Rcpts)
|
|
|
|
// Single enqueue to two recipients at different domain, expecting two connections.
|
|
otheraddr, _ := smtp.ParseAddress("mjl@other.example")
|
|
otherpath := otheraddr.Path()
|
|
t0 := time.Now()
|
|
qml = []Msg{
|
|
MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
|
|
MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
|
|
}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
conns := ConnectionCounter()
|
|
testDeliverN(fakeSMTPServer, 2)
|
|
nconns := ConnectionCounter()
|
|
if nconns != conns+2 {
|
|
t.Errorf("saw %d connections, expected 2", nconns-conns)
|
|
}
|
|
|
|
// Single enqueue with two recipients at same domain, but with smtp server that has
|
|
// LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
|
|
qml = []Msg{qm0, qm0}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServerLimitRcpt1)
|
|
|
|
// Single enqueue with two recipients at same domain, but smtp server sends 552 for
|
|
// 2nd recipient, so we expect a single connection with two transactions.
|
|
qml = []Msg{qm0, qm0}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
testDeliver(fakeSMTPServerRcpt1)
|
|
|
|
// Add a message to be delivered with submit because of its route.
|
|
topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
|
|
qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
wasNetDialer = testDeliver(fakeSubmitServer)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Two messages for submission.
|
|
qml = []Msg{qm, qm}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue for delivery")
|
|
wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
transportSubmitTLS := "submittls"
|
|
n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
|
|
tcheck(t, err, "set transport")
|
|
if n != 1 {
|
|
t.Fatalf("TransportSet changed %d messages, expected 1", n)
|
|
}
|
|
// Make fake cert, and make it trusted.
|
|
cert := fakeCert(t, "submission.example", false)
|
|
mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
|
|
mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
|
|
tlsConfig := tls.Config{
|
|
Certificates: []tls.Certificate{cert},
|
|
}
|
|
wasNetDialer = testDeliver(func(conn net.Conn) {
|
|
conn = tls.Server(conn, &tlsConfig)
|
|
fakeSubmitServer(conn)
|
|
})
|
|
if !wasNetDialer {
|
|
t.Fatalf("expected net.Dialer as dialer")
|
|
}
|
|
|
|
// Various failure reasons.
|
|
fdNotTrusted := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultCertificateNotTrusted,
|
|
SendingMTAIP: "", // Missing due to pipe.
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
ReceivingMXHelo: "mail.mox.example",
|
|
ReceivingIP: "", // Missing due to pipe.
|
|
FailedSessionCount: 1,
|
|
FailureReasonCode: "",
|
|
}
|
|
fdTLSAUnusable := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultTLSAInvalid,
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
FailedSessionCount: 0,
|
|
FailureReasonCode: "all-unusable-records+ignored",
|
|
}
|
|
fdBadProtocol := tlsrpt.FailureDetails{
|
|
ResultType: tlsrpt.ResultValidationFailure,
|
|
ReceivingMXHostname: "mail.mox.example",
|
|
ReceivingMXHelo: "mail.mox.example",
|
|
FailedSessionCount: 1,
|
|
FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
|
|
}
|
|
|
|
// Add a message to be delivered with socks.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
|
|
tcheck(t, err, "TransportSet")
|
|
if n != 1 {
|
|
t.Fatalf("TransportSet changed %d messages, expected 1", n)
|
|
}
|
|
kick(1, qml[0].ID)
|
|
wasNetDialer = testDeliver(fakeSMTPServer)
|
|
if wasNetDialer {
|
|
t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
|
|
}
|
|
|
|
// Add message to be delivered with opportunistic TLS verification.
|
|
clearTLSResults(t)
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
|
|
|
|
// Test fallback to plain text with TLS handshake fails.
|
|
clearTLSResults(t)
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
|
|
|
|
// Add message to be delivered with DANE verification.
|
|
clearTLSResults(t)
|
|
resolver.AllAuthentic = true
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
|
|
|
|
// We should know starttls/requiretls by now.
|
|
rdt := store.RecipientDomainTLS{Domain: "mox.example"}
|
|
err = acc.DB.Get(ctxbg, &rdt)
|
|
tcheck(t, err, "get recipientdomaintls")
|
|
tcompare(t, rdt.STARTTLS, true)
|
|
tcompare(t, rdt.RequireTLS, true)
|
|
|
|
// Add message to be delivered with verified TLS and REQUIRETLS.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
|
|
// Check that message is delivered with all unusable DANE records.
|
|
clearTLSResults(t)
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
|
|
|
|
// Check that message is delivered with insecure TLSA records. They should be
|
|
// ignored and regular STARTTLS tried.
|
|
clearTLSResults(t)
|
|
resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
|
|
},
|
|
}
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
resolver.Inauthentic = nil
|
|
checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
|
|
checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
|
|
|
|
// STARTTLS failed, so not known supported.
|
|
rdt = store.RecipientDomainTLS{Domain: "mox.example"}
|
|
err = acc.DB.Get(ctxbg, &rdt)
|
|
tcheck(t, err, "get recipientdomaintls")
|
|
tcompare(t, rdt.STARTTLS, false)
|
|
tcompare(t, rdt.RequireTLS, false)
|
|
|
|
// Check that message is delivered with TLS-Required: No and non-matching DANE record.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(fakeSMTPSTARTTLSServer)
|
|
|
|
// Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
|
|
|
|
// Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
testDSN(makeBadFakeSMTPSTARTTLSServer(false))
|
|
|
|
// Restore pre-DANE behaviour.
|
|
resolver.AllAuthentic = false
|
|
resolver.TLSA = nil
|
|
|
|
// Add message with requiretls that fails immediately due to no verification policy for recipient domain.
|
|
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
kick(1, qml[0].ID)
|
|
// Based on DNS lookups, there won't be any dialing or SMTP connection.
|
|
testDSN(func(conn net.Conn) {})
|
|
|
|
// Add another message that we'll fail to deliver entirely.
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list queue")
|
|
if len(msgs) != 1 {
|
|
t.Fatalf("queue has %d messages, expected 1", len(msgs))
|
|
}
|
|
msg = msgs[0]
|
|
|
|
prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
|
|
server, client := net.Pipe()
|
|
go func() {
|
|
fn(server)
|
|
server.Close()
|
|
}()
|
|
return client, func() {
|
|
server.Close()
|
|
client.Close()
|
|
}
|
|
}
|
|
|
|
conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
|
|
conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
|
|
conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
|
|
defer func() {
|
|
cleanup2()
|
|
cleanup3()
|
|
cleanup4()
|
|
}()
|
|
|
|
seq := 0
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
seq++
|
|
switch seq {
|
|
default:
|
|
return nil, fmt.Errorf("connect error from test")
|
|
case 2:
|
|
return conn2, nil
|
|
case 3:
|
|
return conn3, nil
|
|
case 4:
|
|
return conn4, nil
|
|
}
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
comm := store.RegisterComm(acc)
|
|
defer comm.Unregister()
|
|
|
|
for i := 1; i < 8; i++ {
|
|
if i == 4 {
|
|
resolver.AllAuthentic = true
|
|
resolver.TLSA = map[string][]adns.TLSA{
|
|
"_25._tcp.mail.mox.example.": {
|
|
// Non-matching zero CertAssoc, should cause failure.
|
|
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
|
|
},
|
|
}
|
|
} else {
|
|
resolver.AllAuthentic = false
|
|
resolver.TLSA = nil
|
|
}
|
|
go deliver(pkglog, resolver, msg)
|
|
<-deliveryResults
|
|
err = DB.Get(ctxbg, &msg)
|
|
tcheck(t, err, "get msg")
|
|
if msg.Attempts != i {
|
|
t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
|
|
}
|
|
if msg.Attempts == 5 {
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trigger final failure.
|
|
go deliver(pkglog, resolver, msg)
|
|
<-deliveryResults
|
|
err = DB.Get(ctxbg, &msg)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
|
|
}
|
|
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
|
|
// We shouldn't have any more work to do.
|
|
msgs, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages at end of test")
|
|
tcompare(t, len(msgs), 0)
|
|
}
|
|
|
|
func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
|
|
result.Summary.TotalSuccessfulSessionCount += success
|
|
result.Summary.TotalFailureSessionCount += failure
|
|
return result
|
|
}
|
|
|
|
func clearTLSResults(t *testing.T) {
|
|
_, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
|
|
tcheck(t, err, "delete tls results")
|
|
}
|
|
|
|
func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
|
|
t.Helper()
|
|
q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
|
|
q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
|
|
result, err := q.Get()
|
|
tcheck(t, err, "get tls result")
|
|
tcompare(t, result.RecipientDomain, expRecipientDomain)
|
|
tcompare(t, result.IsHost, expIsHost)
|
|
|
|
// Before comparing, compensate for go1.20 vs go1.21 difference.
|
|
for i, r := range result.Results {
|
|
for j, fd := range r.FailureDetails {
|
|
if fd.FailureReasonCode == "tls-remote-alert-70" {
|
|
result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
|
|
}
|
|
}
|
|
}
|
|
tcompare(t, result.Results, expResults)
|
|
}
|
|
|
|
// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
|
|
// retired list if configured, with a proper result, that webhooks are scheduled,
|
|
// and that cleaning up works.
|
|
func TestRetiredHooks(t *testing.T) {
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
err := Init()
|
|
tcheck(t, err, "queue init")
|
|
|
|
addr, err := smtp.ParseAddress("mjl@mox.example")
|
|
tcheck(t, err, "parse address")
|
|
path := addr.Path()
|
|
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
|
|
testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
|
|
t.Helper()
|
|
|
|
_, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
|
|
tcheck(t, err, "clearing retired messages")
|
|
_, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
|
|
tcheck(t, err, "clearing hooks")
|
|
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
qm.Extra = map[string]string{"a": "123"}
|
|
err = Add(ctxbg, pkglog, account, mf, qm)
|
|
tcheck(t, err, "add to queue")
|
|
|
|
action()
|
|
|
|
// Should be no messages left in queue.
|
|
msgs, err := List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, len(msgs), 0)
|
|
|
|
retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
|
|
tcheck(t, err, "list hooks")
|
|
if expResult == nil {
|
|
tcompare(t, len(retireds), 0)
|
|
tcompare(t, len(hooks), 0)
|
|
} else {
|
|
tcompare(t, len(retireds), 1)
|
|
mr := retireds[0]
|
|
tcompare(t, len(mr.Results) > 0, true)
|
|
lr := mr.LastResult()
|
|
lr.Start = time.Time{}
|
|
lr.Duration = 0
|
|
tcompare(t, lr.Error == "", expResult.Error == "")
|
|
lr.Error = expResult.Error
|
|
tcompare(t, lr, *expResult)
|
|
|
|
// Compare added webhook.
|
|
tcompare(t, len(hooks), 1)
|
|
h := hooks[0]
|
|
var out webhook.Outgoing
|
|
dec := json.NewDecoder(strings.NewReader(h.Payload))
|
|
dec.DisallowUnknownFields()
|
|
err := dec.Decode(&out)
|
|
tcheck(t, err, "unmarshal outgoing webhook payload")
|
|
tcompare(t, out.Error == "", expResult.Error == "")
|
|
out.WebhookQueued = time.Time{}
|
|
out.Error = ""
|
|
var ecode string
|
|
if expResult.Secode != "" {
|
|
ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
|
|
}
|
|
expOut := webhook.Outgoing{
|
|
Event: webhook.OutgoingEvent(expEvent),
|
|
Suppressing: expSuppressing,
|
|
QueueMsgID: mr.ID,
|
|
FromID: mr.FromID,
|
|
MessageID: mr.MessageID,
|
|
Subject: mr.Subject,
|
|
SMTPCode: expResult.Code,
|
|
SMTPEnhancedCode: ecode,
|
|
Extra: mr.Extra,
|
|
}
|
|
tcompare(t, out, expOut)
|
|
h.ID = 0
|
|
h.Payload = ""
|
|
h.Submitted = time.Time{}
|
|
h.NextAttempt = time.Time{}
|
|
exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
|
|
tcompare(t, h, exph)
|
|
}
|
|
}
|
|
|
|
makeLaunchAction := func(handler func(conn net.Conn)) func() {
|
|
return func() {
|
|
server, client := net.Pipe()
|
|
defer server.Close()
|
|
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
go handler(server)
|
|
return client, nil
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
// Trigger delivery attempt.
|
|
n := launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
|
|
// Wait until delivery has finished.
|
|
tm := time.NewTimer(5 * time.Second)
|
|
defer tm.Stop()
|
|
select {
|
|
case <-tm.C:
|
|
t.Fatalf("delivery didn't happen within 5s")
|
|
case <-deliveryResults:
|
|
}
|
|
}
|
|
}
|
|
|
|
smtpAccept := func(conn net.Conn) {
|
|
br := bufio.NewReader(conn)
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(conn, "%s\r\n", s)
|
|
}
|
|
|
|
writeline("220 mail.mox.example")
|
|
readline("ehlo")
|
|
writeline("250 mail.mox.example")
|
|
|
|
readline("mail")
|
|
writeline("250 ok")
|
|
readline("rcpt")
|
|
writeline("250 ok")
|
|
readline("data")
|
|
writeline("354 continue")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
writeline("250 ok")
|
|
readline("quit")
|
|
writeline("250 ok")
|
|
}
|
|
smtpReject := func(code int) func(conn net.Conn) {
|
|
return func(conn net.Conn) {
|
|
br := bufio.NewReader(conn)
|
|
readline := func(cmd string) {
|
|
line, err := br.ReadString('\n')
|
|
if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
|
|
panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
|
|
}
|
|
}
|
|
writeline := func(s string) {
|
|
fmt.Fprintf(conn, "%s\r\n", s)
|
|
}
|
|
|
|
writeline("220 mail.mox.example")
|
|
readline("ehlo")
|
|
writeline("250-mail.mox.example")
|
|
writeline("250 enhancedstatuscodes")
|
|
|
|
readline("mail")
|
|
writeline(fmt.Sprintf("%d 5.1.0 nok", code))
|
|
readline("quit")
|
|
writeline("250 ok")
|
|
}
|
|
}
|
|
|
|
testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
|
|
testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{}, string(webhook.EventDelivered), false)
|
|
// 554 is generic, doesn't immediately cause suppression.
|
|
testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
|
|
testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
|
|
// 550 causes immediate suppression, check for it in webhook.
|
|
testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
|
|
testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
|
|
// Try to deliver to suppressed addresses.
|
|
launch := func() {
|
|
n := launchWork(pkglog, resolver, map[string]struct{}{})
|
|
tcompare(t, n, 1)
|
|
<-deliveryResults
|
|
}
|
|
testAction("mjl", launch, nil, "", false)
|
|
testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
|
|
|
|
queueFail := func() {
|
|
n, err := Fail(ctxbg, pkglog, Filter{})
|
|
tcheck(t, err, "cancel delivery with failure dsn")
|
|
tcompare(t, n, 1)
|
|
}
|
|
queueDrop := func() {
|
|
n, err := Drop(ctxbg, pkglog, Filter{})
|
|
tcheck(t, err, "cancel delivery without failure dsn")
|
|
tcompare(t, n, 1)
|
|
}
|
|
testAction("mjl", queueFail, nil, "", false)
|
|
testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
|
|
testAction("mjl", queueDrop, nil, "", false)
|
|
testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
|
|
|
|
retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
tcompare(t, len(retireds), 1)
|
|
|
|
cleanupMsgRetiredSingle(pkglog)
|
|
retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
|
|
tcheck(t, err, "list retired messages")
|
|
tcompare(t, len(retireds), 0)
|
|
}
|
|
|
|
// test Start and that it attempts to deliver.
|
|
func TestQueueStart(t *testing.T) {
|
|
// Override dial function. We'll make connecting fail and check the attempt.
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
dialed := make(chan struct{}, 1)
|
|
smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
dialed <- struct{}{}
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
defer func() {
|
|
smtpclient.DialHook = nil
|
|
}()
|
|
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
|
|
done := make(chan struct{}, 4)
|
|
defer func() {
|
|
mox.ShutdownCancel()
|
|
// Wait for message and hooks deliverers and cleaners.
|
|
<-done
|
|
<-done
|
|
<-done
|
|
<-done
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
|
|
}()
|
|
err := Start(resolver, done)
|
|
tcheck(t, err, "queue start")
|
|
|
|
checkDialed := func(need bool) {
|
|
t.Helper()
|
|
d := time.Second / 10
|
|
if need {
|
|
d = time.Second
|
|
}
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-dialed:
|
|
if !need {
|
|
t.Fatalf("unexpected dial attempt")
|
|
}
|
|
case <-timer.C:
|
|
if need {
|
|
t.Fatalf("expected to see a dial attempt")
|
|
}
|
|
}
|
|
}
|
|
|
|
// HoldRule to mark mark all messages sent by mjl on hold, including existing
|
|
// messages.
|
|
hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
|
|
tcheck(t, err, "add hold rule")
|
|
|
|
// All zero HoldRule holds all deliveries, and marks all on hold.
|
|
hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
|
|
tcheck(t, err, "add hold rule")
|
|
|
|
hrl, err := HoldRuleList(ctxbg)
|
|
tcheck(t, err, "listing hold rules")
|
|
tcompare(t, hrl, []HoldRule{hr0, hr1})
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
checkDialed(false) // No delivery attempt yet.
|
|
|
|
n, err := Count(ctxbg)
|
|
tcheck(t, err, "count messages in queue")
|
|
tcompare(t, n, 1)
|
|
|
|
// Take message off hold.
|
|
n, err = HoldSet(ctxbg, Filter{}, false)
|
|
tcheck(t, err, "taking message off hold")
|
|
tcompare(t, n, 1)
|
|
checkDialed(true)
|
|
|
|
// Remove hold rules.
|
|
err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
|
|
tcheck(t, err, "removing hold rule")
|
|
err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
|
|
tcheck(t, err, "removing hold rule")
|
|
// Check it is gone.
|
|
hrl, err = HoldRuleList(ctxbg)
|
|
tcheck(t, err, "listing hold rules")
|
|
tcompare(t, len(hrl), 0)
|
|
|
|
// Don't change message nextattempt time, but kick queue. Message should not be delivered.
|
|
msgqueueKick()
|
|
checkDialed(false)
|
|
|
|
// Set new next attempt, should see another attempt.
|
|
n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
|
|
tcheck(t, err, "kick queue")
|
|
if n != 1 {
|
|
t.Fatalf("kick changed %d messages, expected 1", n)
|
|
}
|
|
checkDialed(true)
|
|
time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done...
|
|
|
|
// Submit another, should be delivered immediately without HoldRule.
|
|
path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf = prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qm)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
checkDialed(true) // Immediate.
|
|
|
|
time.Sleep(100 * time.Millisecond) // Racy... give time to finish.
|
|
}
|
|
|
|
func TestListFilterSort(t *testing.T) {
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
err := Init()
|
|
tcheck(t, err, "queue init")
|
|
|
|
// insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
mf := prepareFile(t)
|
|
defer os.Remove(mf.Name())
|
|
defer mf.Close()
|
|
|
|
now := time.Now().Round(0)
|
|
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
|
|
qm.Queued = now
|
|
qm1 := qm
|
|
qm1.Queued = now.Add(-time.Second)
|
|
qm1.NextAttempt = now.Add(time.Minute)
|
|
qml := []Msg{qm, qm, qm, qm, qm, qm1}
|
|
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
|
|
tcheck(t, err, "add messages to queue")
|
|
qm1 = qml[len(qml)-1]
|
|
|
|
qmlrev := slices.Clone(qml)
|
|
slices.Reverse(qmlrev)
|
|
|
|
// Ascending by nextattempt,id.
|
|
l, err := List(ctxbg, Filter{}, Sort{Asc: true})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, l, qml)
|
|
|
|
// Descending by nextattempt,id.
|
|
l, err = List(ctxbg, Filter{}, Sort{})
|
|
tcheck(t, err, "list messages")
|
|
tcompare(t, l, qmlrev)
|
|
|
|
// Descending by queued,id.
|
|
l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
|
|
tcheck(t, err, "list messages")
|
|
ql := append(append([]Msg{}, qmlrev[1:]...), qml[5])
|
|
tcompare(t, l, ql)
|
|
|
|
// Filter by all fields to get a single.
|
|
no := false
|
|
allfilters := Filter{
|
|
Max: 2,
|
|
IDs: []int64{qm1.ID},
|
|
Account: "mjl",
|
|
From: path.XString(true),
|
|
To: path.XString(true),
|
|
Hold: &no,
|
|
Submitted: "<1s",
|
|
NextAttempt: ">1s",
|
|
}
|
|
l, err = List(ctxbg, allfilters, Sort{})
|
|
tcheck(t, err, "list single")
|
|
tcompare(t, l, []Msg{qm1})
|
|
|
|
// Paginated NextAttmpt asc.
|
|
var lastID int64
|
|
var last any
|
|
l = nil
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, qml)
|
|
|
|
// Paginated NextAttempt desc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, qmlrev)
|
|
|
|
// Paginated Queued desc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
|
|
}
|
|
tcompare(t, l, ql)
|
|
|
|
// Paginated Queued asc.
|
|
l = nil
|
|
lastID = 0
|
|
last = ""
|
|
for {
|
|
nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
l = append(l, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
|
|
}
|
|
qlrev := slices.Clone(ql)
|
|
slices.Reverse(qlrev)
|
|
tcompare(t, l, qlrev)
|
|
|
|
// Retire messages and do similar but more basic tests. The code is similar.
|
|
var mrl []MsgRetired
|
|
err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
|
|
for _, m := range qml {
|
|
mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
|
|
err := tx.Insert(&mr)
|
|
tcheck(t, err, "inserting retired message")
|
|
mrl = append(mrl, mr)
|
|
}
|
|
return nil
|
|
})
|
|
tcheck(t, err, "adding retired messages")
|
|
|
|
// Paginated LastActivity desc.
|
|
var lr []MsgRetired
|
|
lastID = 0
|
|
last = ""
|
|
l = nil
|
|
for {
|
|
nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
|
|
tcheck(t, err, "list paginated")
|
|
lr = append(lr, nl...)
|
|
if len(nl) == 0 {
|
|
break
|
|
}
|
|
tcompare(t, len(nl), 1)
|
|
lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
|
|
}
|
|
mrlrev := slices.Clone(mrl)
|
|
slices.Reverse(mrlrev)
|
|
tcompare(t, lr, mrlrev)
|
|
|
|
// Filter by all fields to get a single.
|
|
allretiredfilters := RetiredFilter{
|
|
Max: 2,
|
|
IDs: []int64{mrlrev[0].ID},
|
|
Account: "mjl",
|
|
From: path.XString(true),
|
|
To: path.XString(true),
|
|
Submitted: "<1s",
|
|
LastActivity: ">1s",
|
|
}
|
|
lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
|
|
tcheck(t, err, "list single")
|
|
tcompare(t, lr, []MsgRetired{mrlrev[0]})
|
|
}
|
|
|
|
// Just a cert that appears valid.
|
|
func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
|
|
notAfter := time.Now()
|
|
if expired {
|
|
notAfter = notAfter.Add(-time.Hour)
|
|
} else {
|
|
notAfter = notAfter.Add(time.Hour)
|
|
}
|
|
|
|
privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
|
|
template := &x509.Certificate{
|
|
SerialNumber: big.NewInt(1), // Required field...
|
|
DNSNames: []string{name},
|
|
NotBefore: time.Now().Add(-time.Hour),
|
|
NotAfter: notAfter,
|
|
}
|
|
localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
|
|
if err != nil {
|
|
t.Fatalf("making certificate: %s", err)
|
|
}
|
|
cert, err := x509.ParseCertificate(localCertBuf)
|
|
if err != nil {
|
|
t.Fatalf("parsing generated certificate: %s", err)
|
|
}
|
|
c := tls.Certificate{
|
|
Certificate: [][]byte{localCertBuf},
|
|
PrivateKey: privKey,
|
|
Leaf: cert,
|
|
}
|
|
return c
|
|
}
|