From 47ebfa81526cb87bd44f4452fcb080fda1be515a Mon Sep 17 00:00:00 2001 From: Mechiel Lukkien Date: Tue, 5 Mar 2024 20:10:28 +0100 Subject: [PATCH] queue: implement adding a message to the queue that gets sent to multiple recipients and in a way that allows us to send that message to multiple recipients in a single smtp transaction. --- dmarcdb/eval.go | 8 +- dmarcdb/eval_test.go | 7 +- gentestdata.go | 4 +- queue/queue.go | 102 +++++++++++++++------- queue/queue_test.go | 92 ++++++++++---------- rfc/index.txt | 1 + smtpserver/dsn.go | 4 +- smtpserver/server.go | 59 ++++++++----- tlsrptsend/send.go | 4 +- tlsrptsend/send_test.go | 10 ++- webadmin/admin.js | 122 +++++++++++++-------------- webadmin/admin.ts | 182 ++++++++++++++++++++-------------------- webadmin/api.json | 7 ++ webadmin/api.ts | 3 +- webmail/api.go | 17 ++-- 15 files changed, 346 insertions(+), 276 deletions(-) diff --git a/dmarcdb/eval.go b/dmarcdb/eval.go index 35c8722..0d920d7 100644 --- a/dmarcdb/eval.go +++ b/dmarcdb/eval.go @@ -842,13 +842,13 @@ Period: %s - %s UTC continue } - qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) + qm := queue.MakeMsg(from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) // Don't try as long as regular deliveries, and stop before we would send the // delayed DSN. Though we also won't send that due to IsDMARCReport. qm.MaxAttempts = 5 qm.IsDMARCReport = true - err = queueAdd(ctx, log, &qm, msgf) + err = queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm) if err != nil { tempError = true log.Errorx("queueing message with dmarc aggregate report", err) @@ -997,13 +997,13 @@ Submitting-URI: %s continue } - qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) + qm := queue.MakeMsg(fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) // Don't try as long as regular deliveries, and stop before we would send the // delayed DSN. Though we also won't send that due to IsDMARCReport. qm.MaxAttempts = 5 qm.IsDMARCReport = true - if err := queueAdd(ctx, log, &qm, msgf); err != nil { + if err := queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm); err != nil { log.Errorx("queueing message with dmarc error report", err) metricReportError.Inc() } else { diff --git a/dmarcdb/eval_test.go b/dmarcdb/eval_test.go index f37c340..2db8c9a 100644 --- a/dmarcdb/eval_test.go +++ b/dmarcdb/eval_test.go @@ -295,7 +295,12 @@ func TestSendReports(t *testing.T) { aggrAddrs := map[string]struct{}{} errorAddrs := map[string]struct{}{} - queueAdd = func(ctx context.Context, log mlog.Log, qm *queue.Msg, msgFile *os.File) error { + queueAdd = func(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...queue.Msg) error { + if len(qml) != 1 { + return fmt.Errorf("queued %d messages, expected 1", len(qml)) + } + qm := qml[0] + // Read message file. Also write copy to disk for inspection. buf, err := io.ReadAll(&moxio.AtReader{R: msgFile}) tcheckf(t, err, "read report message") diff --git a/gentestdata.go b/gentestdata.go index f1d9441..71b90fd 100644 --- a/gentestdata.go +++ b/gentestdata.go @@ -233,8 +233,8 @@ Accounts: const qmsg = "From: \r\nTo: \r\nSubject: test\r\n\r\nthe message...\r\n" _, err = fmt.Fprint(mf, qmsg) xcheckf(err, "writing message") - qm := queue.MakeMsg("test0", mailfrom, rcptto, false, false, int64(len(qmsg)), "", prefix, nil) - err = queue.Add(ctxbg, c.log, &qm, mf) + qm := queue.MakeMsg(mailfrom, rcptto, false, false, int64(len(qmsg)), "", prefix, nil) + err = queue.Add(ctxbg, c.log, "test0", mf, qm) xcheckf(err, "enqueue message") // Create three accounts. diff --git a/queue/queue.go b/queue/queue.go index d476315..1f3d559 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -78,7 +78,14 @@ var Localserve bool // Use MakeMsg to make a message with fields that Add needs. Add will further set // queueing related fields. type Msg struct { - ID int64 + ID int64 + + // A message for multiple recipients will get a BaseID that is identical to the + // first Msg.ID queued. They may be delivered in a single SMTP transaction if they + // are going to the same mail server. For messages with a single recipient, this + // field will be 0. + BaseID int64 `bstore:"index"` + Queued time.Time `bstore:"default now"` SenderAccount string // Failures are delivered back to this local account. Also used for routing. SenderLocalpart smtp.Localpart // Should be a local user and domain. @@ -208,10 +215,9 @@ func Count(ctx context.Context) (int, error) { } // MakeMsg is a convenience function that sets the commonly used fields for a Msg. -func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg { +func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg { now := time.Now() return Msg{ - SenderAccount: senderAccount, SenderLocalpart: sender.Localpart, SenderDomain: sender.IPDomain, RecipientLocalpart: recipient.Localpart, @@ -228,25 +234,31 @@ func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf } } -// Add a new message to the queue. The queue is kicked immediately to start a -// first delivery attempt. +// Add one or more new messages to the queue. They'll get the same BaseID, so they +// can be delivered in a single SMTP transaction, with a single DATA command, but +// may be split into multiple transactions if errors/limits are encountered. The +// queue is kicked immediately to start a first delivery attempt. // -// ID must be 0 and will be set after inserting in the queue. +// ID of the messagse must be 0 and will be set after inserting in the queue. // // Add sets derived fields like RecipientDomainStr, and fields related to queueing, // such as Queued, NextAttempt, LastAttempt, LastError. -func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error { - // todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759 +func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error { + if len(qml) == 0 { + return fmt.Errorf("must queue at least one message") + } - if qm.ID != 0 { - return fmt.Errorf("id of queued message must be 0") + for _, qm := range qml { + if qm.ID != 0 { + return fmt.Errorf("id of queued messages must be 0") + } } if Localserve { - if qm.SenderAccount == "" { + if senderAccount == "" { return fmt.Errorf("cannot queue with localserve without local account") } - acc, err := store.OpenAccount(log, qm.SenderAccount) + acc, err := store.OpenAccount(log, senderAccount) if err != nil { return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err) } @@ -254,17 +266,24 @@ func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error { err := acc.Close() log.Check(err, "closing account") }() - m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix} conf, _ := acc.Conf() - dest := conf.Destinations[qm.Sender().String()] + err = nil acc.WithWLock(func() { - err = acc.DeliverDestination(log, dest, &m, msgFile) + for i, qm := range qml { + qml[i].SenderAccount = senderAccount + m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix} + dest := conf.Destinations[qm.Sender().String()] + err = acc.DeliverDestination(log, dest, &m, msgFile) + if err != nil { + err = fmt.Errorf("delivering message: %v", err) + return // Returned again outside WithWLock. + } + } }) - if err != nil { - return fmt.Errorf("delivering message: %v", err) + if err == nil { + log.Debug("immediately delivered from queue to sender") } - log.Debug("immediately delivered from queue to sender") - return nil + return err } tx, err := DB.Begin(ctx, true) @@ -279,30 +298,49 @@ func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error { } }() - if err := tx.Insert(qm); err != nil { - return err + // Insert messages into queue. If there are multiple messages, they all get a + // non-zero BaseID that is the Msg.ID of the first message inserted. + var baseID int64 + for i := range qml { + qml[i].SenderAccount = senderAccount + qml[i].BaseID = baseID + if err := tx.Insert(&qml[i]); err != nil { + return err + } + if i == 0 && len(qml) > 1 { + baseID = qml[i].ID + qml[i].BaseID = baseID + if err := tx.Update(&qml[i]); err != nil { + return err + } + } } - dst := qm.MessagePath() + var paths []string defer func() { - if dst != "" { - err := os.Remove(dst) - log.Check(err, "removing destination message file for queue", slog.String("path", dst)) + for _, p := range paths { + err := os.Remove(p) + log.Check(err, "removing destination message file for queue", slog.String("path", p)) } }() - dstDir := filepath.Dir(dst) - os.MkdirAll(dstDir, 0770) - if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil { - return fmt.Errorf("linking/copying message to new file: %s", err) - } else if err := moxio.SyncDir(log, dstDir); err != nil { - return fmt.Errorf("sync directory: %v", err) + + for _, qm := range qml { + dst := qm.MessagePath() + paths = append(paths, dst) + dstDir := filepath.Dir(dst) + os.MkdirAll(dstDir, 0770) + if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil { + return fmt.Errorf("linking/copying message to new file: %s", err) + } else if err := moxio.SyncDir(log, dstDir); err != nil { + return fmt.Errorf("sync directory: %v", err) + } } if err := tx.Commit(); err != nil { return fmt.Errorf("commit transaction: %s", err) } tx = nil - dst = "" + paths = nil queuekick() return nil diff --git a/queue/queue_test.go b/queue/queue_test.go index d5ccc1f..f28e151 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -117,12 +117,12 @@ func TestQueue(t *testing.T) { var qm Msg - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil) + err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil) + err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") msgs, err = List(ctxbg) @@ -451,8 +451,8 @@ func TestQueue(t *testing.T) { // Add a message to be delivered with submit because of its route. topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}} - qm = MakeMsg("mjl", path, topath, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "", nil, nil) + err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") wasNetDialer = testDeliver(fakeSubmitServer) if !wasNetDialer { @@ -460,11 +460,11 @@ func TestQueue(t *testing.T) { } // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml := []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") transportSubmitTLS := "submittls" - n, err = Kick(ctxbg, qm.ID, "", "", &transportSubmitTLS) + n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSubmitTLS) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -509,11 +509,11 @@ func TestQueue(t *testing.T) { } // Add a message to be delivered with socks. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") transportSocks := "socks" - n, err = Kick(ctxbg, qm.ID, "", "", &transportSocks) + n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSocks) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -525,10 +525,10 @@ func TestQueue(t *testing.T) { // Add message to be delivered with opportunistic TLS verification. clearTLSResults(t) - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -539,10 +539,10 @@ func TestQueue(t *testing.T) { // Test fallback to plain text with TLS handshake fails. clearTLSResults(t) - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -559,10 +559,10 @@ func TestQueue(t *testing.T) { {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo}, }, } - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -580,10 +580,10 @@ func TestQueue(t *testing.T) { // Add message to be delivered with verified TLS and REQUIRETLS. yes := true - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, &yes) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, &yes)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -597,10 +597,10 @@ func TestQueue(t *testing.T) { {}, }, } - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -618,10 +618,10 @@ func TestQueue(t *testing.T) { {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)}, }, } - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -640,10 +640,10 @@ func TestQueue(t *testing.T) { // Check that message is delivered with TLS-Required: No and non-matching DANE record. no := false - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, &no) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, &no)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -651,10 +651,10 @@ func TestQueue(t *testing.T) { testDeliver(fakeSMTPSTARTTLSServer) // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, &no) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, &no)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -662,10 +662,10 @@ func TestQueue(t *testing.T) { testDeliver(makeBadFakeSMTPSTARTTLSServer(true)) // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, &yes) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, &yes)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -677,10 +677,10 @@ func TestQueue(t *testing.T) { resolver.TLSA = nil // Add message with requiretls that fails immediately due to no verification policy for recipient domain. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, &yes) - err = Add(ctxbg, pkglog, &qm, mf) + qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, &yes)} + err = Add(ctxbg, pkglog, "mjl", mf, qml...) tcheck(t, err, "add message to queue for delivery") - n, err = Kick(ctxbg, qm.ID, "", "", nil) + n, err = Kick(ctxbg, qml[0].ID, "", "", nil) tcheck(t, err, "kick queue") if n != 1 { t.Fatalf("kick changed %d messages, expected 1", n) @@ -692,8 +692,8 @@ func TestQueue(t *testing.T) { }) // Add another message that we'll fail to deliver entirely. - qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil) + err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") msgs, err = List(ctxbg) @@ -883,8 +883,8 @@ func TestQueueStart(t *testing.T) { mf := prepareFile(t) defer os.Remove(mf.Name()) defer mf.Close() - qm := MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "", nil, nil) - err = Add(ctxbg, pkglog, &qm, mf) + qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "", nil, nil) + err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") checkDialed(true) diff --git a/rfc/index.txt b/rfc/index.txt index 15f97cf..5a04ccc 100644 --- a/rfc/index.txt +++ b/rfc/index.txt @@ -97,6 +97,7 @@ https://www.iana.org/assignments/message-headers/message-headers.xhtml 8601 Yes - Message Header Field for Indicating Message Authentication Status 8689 Yes - SMTP Require TLS Option 8904 No - DNS Whitelist (DNSWL) Email Authentication Method Extension +9422 Partial - The LIMITS SMTP Service Extension # SPF 4408 Yes Obs (by RFC 7208) Sender Policy Framework (SPF) for Authorizing Use of Domains in E-Mail, Version 1 diff --git a/smtpserver/dsn.go b/smtpserver/dsn.go index 3dd798a..672eed4 100644 --- a/smtpserver/dsn.go +++ b/smtpserver/dsn.go @@ -53,9 +53,9 @@ func queueDSN(ctx context.Context, log mlog.Log, c *conn, rcptTo smtp.Path, m ds if requireTLS { reqTLS = &requireTLS } - qm := queue.MakeMsg("", smtp.Path{}, rcptTo, has8bit, smtputf8, int64(len(buf)), m.MessageID, nil, reqTLS) + qm := queue.MakeMsg(smtp.Path{}, rcptTo, has8bit, smtputf8, int64(len(buf)), m.MessageID, nil, reqTLS) qm.DSNUTF8 = bufUTF8 - if err := queue.Add(ctx, c.log, &qm, f); err != nil { + if err := queue.Add(ctx, c.log, "", f, qm); err != nil { return err } return nil diff --git a/smtpserver/server.go b/smtpserver/server.go index 5314908..c095633 100644 --- a/smtpserver/server.go +++ b/smtpserver/server.go @@ -72,6 +72,10 @@ var limiterConnectionRate, limiterConnections *ratelimit.Limiter var limitIPMasked1MessagesPerMinute int = 500 var limitIPMasked1SizePerMinute int64 = 1000 * 1024 * 1024 +// Maximum number of RCPT TO commands (i.e. recipients) for a single message +// delivery. Must be at least 100. Announced in LIMIT extension. +const rcptToLimit = 1000 + func init() { // Also called by tests, so they don't trigger the rate limiter. limitersInit() @@ -888,8 +892,9 @@ func (c *conn) cmdHello(p *parser, ehlo bool) { } c.bwritelinef("250-ENHANCEDSTATUSCODES") // ../rfc/2034:71 // todo future? c.writelinef("250-DSN") - c.bwritelinef("250-8BITMIME") // ../rfc/6152:86 - c.bwritecodeline(250, "", "SMTPUTF8", nil) // ../rfc/6531:201 + c.bwritelinef("250-8BITMIME") // ../rfc/6152:86 + c.bwritelinef("250-LIMITS RCPTMAX=%d", rcptToLimit) // rfc/9422:301 + c.bwritecodeline(250, "", "SMTPUTF8", nil) // ../rfc/6531:201 c.xflush() } @@ -1556,9 +1561,9 @@ func (c *conn) cmdRcpt(p *parser) { // todo future: for submission, should we do explicit verification that domains are fully qualified? also for mail from. ../rfc/6409:420 - if len(c.recipients) >= 100 { + if len(c.recipients) >= rcptToLimit { // ../rfc/5321:3535 ../rfc/5321:3571 - xsmtpUserErrorf(smtp.C452StorageFull, smtp.SeProto5TooManyRcpts3, "max of 100 recipients reached") + xsmtpUserErrorf(smtp.C452StorageFull, smtp.SeProto5TooManyRcpts3, "max of %d recipients reached", rcptToLimit) } // We don't want to allow delivery to multiple recipients with a null reverse path. @@ -1974,7 +1979,8 @@ func (c *conn) submit(ctx context.Context, recvHdrFor func(string) string, msgWr // We always deliver through the queue. It would be more efficient to deliver // directly, but we don't want to circumvent all the anti-spam measures. Accounts // on a single mox instance should be allowed to block each other. - for _, rcptAcc := range c.recipients { + qml := make([]queue.Msg, len(c.recipients)) + for i, rcptAcc := range c.recipients { if Localserve { code, timeout := localserveNeedsError(rcptAcc.rcptTo.Localpart) if timeout { @@ -1988,32 +1994,43 @@ func (c *conn) submit(ctx context.Context, recvHdrFor func(string) string, msgWr } xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...) - msgSize := int64(len(xmsgPrefix)) + msgWriter.Size - qm := queue.MakeMsg(c.account.Name, *c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, messageID, xmsgPrefix, c.requireTLS) + qm := queue.MakeMsg(*c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, messageID, xmsgPrefix, c.requireTLS) if !c.futureRelease.IsZero() { qm.NextAttempt = c.futureRelease qm.FutureReleaseRequest = c.futureReleaseRequest } - // todo: it would be good to have a limit on messages (count and total size) a user has in the queue. also/especially with futurerelease. ../rfc/4865:387 - if err := queue.Add(ctx, c.log, &qm, dataFile); err != nil { - // Aborting the transaction is not great. But continuing and generating DSNs will - // probably result in errors as well... - metricSubmission.WithLabelValues("queueerror").Inc() - c.log.Errorx("queuing message", err) - xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err) - } - metricSubmission.WithLabelValues("ok").Inc() - c.log.Info("message queued for delivery", + qml[i] = qm + } + + // todo: it would be good to have a limit on messages (count and total size) a user has in the queue. also/especially with futurerelease. ../rfc/4865:387 + if err := queue.Add(ctx, c.log, c.account.Name, dataFile, qml...); err != nil { + // Aborting the transaction is not great. But continuing and generating DSNs will + // probably result in errors as well... + metricSubmission.WithLabelValues("queueerror").Inc() + c.log.Errorx("queuing message", err) + xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err) + } + metricSubmission.WithLabelValues("ok").Inc() + for i, rcptAcc := range c.recipients { + c.log.Info("messages queued for delivery", slog.Any("mailfrom", *c.mailFrom), slog.Any("rcptto", rcptAcc.rcptTo), slog.Bool("smtputf8", c.smtputf8), - slog.Int64("msgsize", msgSize)) - - err := c.account.DB.Insert(ctx, &store.Outgoing{Recipient: rcptAcc.rcptTo.XString(true)}) - xcheckf(err, "adding outgoing message") + slog.Int64("msgsize", qml[i].Size)) } + err = c.account.DB.Write(ctx, func(tx *bstore.Tx) error { + for _, rcptAcc := range c.recipients { + outgoing := store.Outgoing{Recipient: rcptAcc.rcptTo.XString(true)} + if err := tx.Insert(&outgoing); err != nil { + return fmt.Errorf("adding outgoing message: %v", err) + } + } + return nil + }) + xcheckf(err, "adding outgoing messages") + c.transactionGood++ c.transactionBad-- // Compensate for early earlier pessimistic increase. diff --git a/tlsrptsend/send.go b/tlsrptsend/send.go index a8bb0b2..172e60b 100644 --- a/tlsrptsend/send.go +++ b/tlsrptsend/send.go @@ -589,7 +589,7 @@ Period: %s - %s UTC continue } - qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, from.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) + qm := queue.MakeMsg(from.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil) // Don't try as long as regular deliveries, and stop before we would send the // delayed DSN. Though we also won't send that due to IsTLSReport. // ../rfc/8460:1077 @@ -599,7 +599,7 @@ Period: %s - %s UTC no := false qm.RequireTLS = &no - err = queueAdd(ctx, log, &qm, msgf) + err = queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm) if err != nil { tempError = !queued log.Errorx("queueing message with tls report", err) diff --git a/tlsrptsend/send_test.go b/tlsrptsend/send_test.go index 08839de..f2da06c 100644 --- a/tlsrptsend/send_test.go +++ b/tlsrptsend/send_test.go @@ -412,7 +412,11 @@ func TestSendReports(t *testing.T) { var mutex sync.Mutex var index int - queueAdd = func(ctx context.Context, log mlog.Log, qm *queue.Msg, msgFile *os.File) error { + queueAdd = func(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...queue.Msg) error { + if len(qml) != 1 { + return fmt.Errorf("queued %d messages, expect 1", len(qml)) + } + mutex.Lock() defer mutex.Unlock() @@ -421,13 +425,13 @@ func TestSendReports(t *testing.T) { tcheckf(t, err, "read report message") p := fmt.Sprintf("../testdata/tlsrptsend/data/report%d.eml", index) index++ - err = os.WriteFile(p, append(append([]byte{}, qm.MsgPrefix...), buf...), 0600) + err = os.WriteFile(p, append(append([]byte{}, qml[0].MsgPrefix...), buf...), 0600) tcheckf(t, err, "write report message") reportJSON, err := tlsrpt.ParseMessage(log.Logger, msgFile) tcheckf(t, err, "parsing generated report message") - addr := qm.Recipient().String() + addr := qml[0].Recipient().String() haveReports[addr] = append(haveReports[addr], reportJSON.Convert()) return nil diff --git a/webadmin/admin.js b/webadmin/admin.js index 37b43eb..286b649 100644 --- a/webadmin/admin.js +++ b/webadmin/admin.js @@ -393,7 +393,7 @@ var api; "Reverse": { "Name": "Reverse", "Docs": "", "Fields": [{ "Name": "Hostnames", "Docs": "", "Typewords": ["[]", "string"] }] }, "ClientConfigs": { "Name": "ClientConfigs", "Docs": "", "Fields": [{ "Name": "Entries", "Docs": "", "Typewords": ["[]", "ClientConfigsEntry"] }] }, "ClientConfigsEntry": { "Name": "ClientConfigsEntry", "Docs": "", "Fields": [{ "Name": "Protocol", "Docs": "", "Typewords": ["string"] }, { "Name": "Host", "Docs": "", "Typewords": ["Domain"] }, { "Name": "Port", "Docs": "", "Typewords": ["int32"] }, { "Name": "Listener", "Docs": "", "Typewords": ["string"] }, { "Name": "Note", "Docs": "", "Typewords": ["string"] }] }, - "Msg": { "Name": "Msg", "Docs": "", "Fields": [{ "Name": "ID", "Docs": "", "Typewords": ["int64"] }, { "Name": "Queued", "Docs": "", "Typewords": ["timestamp"] }, { "Name": "SenderAccount", "Docs": "", "Typewords": ["string"] }, { "Name": "SenderLocalpart", "Docs": "", "Typewords": ["Localpart"] }, { "Name": "SenderDomain", "Docs": "", "Typewords": ["IPDomain"] }, { "Name": "RecipientLocalpart", "Docs": "", "Typewords": ["Localpart"] }, { "Name": "RecipientDomain", "Docs": "", "Typewords": ["IPDomain"] }, { "Name": "RecipientDomainStr", "Docs": "", "Typewords": ["string"] }, { "Name": "Attempts", "Docs": "", "Typewords": ["int32"] }, { "Name": "MaxAttempts", "Docs": "", "Typewords": ["int32"] }, { "Name": "DialedIPs", "Docs": "", "Typewords": ["{}", "[]", "IP"] }, { "Name": "NextAttempt", "Docs": "", "Typewords": ["timestamp"] }, { "Name": "LastAttempt", "Docs": "", "Typewords": ["nullable", "timestamp"] }, { "Name": "LastError", "Docs": "", "Typewords": ["string"] }, { "Name": "Has8bit", "Docs": "", "Typewords": ["bool"] }, { "Name": "SMTPUTF8", "Docs": "", "Typewords": ["bool"] }, { "Name": "IsDMARCReport", "Docs": "", "Typewords": ["bool"] }, { "Name": "IsTLSReport", "Docs": "", "Typewords": ["bool"] }, { "Name": "Size", "Docs": "", "Typewords": ["int64"] }, { "Name": "MessageID", "Docs": "", "Typewords": ["string"] }, { "Name": "MsgPrefix", "Docs": "", "Typewords": ["nullable", "string"] }, { "Name": "DSNUTF8", "Docs": "", "Typewords": ["nullable", "string"] }, { "Name": "Transport", "Docs": "", "Typewords": ["string"] }, { "Name": "RequireTLS", "Docs": "", "Typewords": ["nullable", "bool"] }, { "Name": "FutureReleaseRequest", "Docs": "", "Typewords": ["string"] }] }, + "Msg": { "Name": "Msg", "Docs": "", "Fields": [{ "Name": "ID", "Docs": "", "Typewords": ["int64"] }, { "Name": "BaseID", "Docs": "", "Typewords": ["int64"] }, { "Name": "Queued", "Docs": "", "Typewords": ["timestamp"] }, { "Name": "SenderAccount", "Docs": "", "Typewords": ["string"] }, { "Name": "SenderLocalpart", "Docs": "", "Typewords": ["Localpart"] }, { "Name": "SenderDomain", "Docs": "", "Typewords": ["IPDomain"] }, { "Name": "RecipientLocalpart", "Docs": "", "Typewords": ["Localpart"] }, { "Name": "RecipientDomain", "Docs": "", "Typewords": ["IPDomain"] }, { "Name": "RecipientDomainStr", "Docs": "", "Typewords": ["string"] }, { "Name": "Attempts", "Docs": "", "Typewords": ["int32"] }, { "Name": "MaxAttempts", "Docs": "", "Typewords": ["int32"] }, { "Name": "DialedIPs", "Docs": "", "Typewords": ["{}", "[]", "IP"] }, { "Name": "NextAttempt", "Docs": "", "Typewords": ["timestamp"] }, { "Name": "LastAttempt", "Docs": "", "Typewords": ["nullable", "timestamp"] }, { "Name": "LastError", "Docs": "", "Typewords": ["string"] }, { "Name": "Has8bit", "Docs": "", "Typewords": ["bool"] }, { "Name": "SMTPUTF8", "Docs": "", "Typewords": ["bool"] }, { "Name": "IsDMARCReport", "Docs": "", "Typewords": ["bool"] }, { "Name": "IsTLSReport", "Docs": "", "Typewords": ["bool"] }, { "Name": "Size", "Docs": "", "Typewords": ["int64"] }, { "Name": "MessageID", "Docs": "", "Typewords": ["string"] }, { "Name": "MsgPrefix", "Docs": "", "Typewords": ["nullable", "string"] }, { "Name": "DSNUTF8", "Docs": "", "Typewords": ["nullable", "string"] }, { "Name": "Transport", "Docs": "", "Typewords": ["string"] }, { "Name": "RequireTLS", "Docs": "", "Typewords": ["nullable", "bool"] }, { "Name": "FutureReleaseRequest", "Docs": "", "Typewords": ["string"] }] }, "IPDomain": { "Name": "IPDomain", "Docs": "", "Fields": [{ "Name": "IP", "Docs": "", "Typewords": ["IP"] }, { "Name": "Domain", "Docs": "", "Typewords": ["Domain"] }] }, "WebserverConfig": { "Name": "WebserverConfig", "Docs": "", "Fields": [{ "Name": "WebDNSDomainRedirects", "Docs": "", "Typewords": ["[]", "[]", "Domain"] }, { "Name": "WebDomainRedirects", "Docs": "", "Typewords": ["[]", "[]", "string"] }, { "Name": "WebHandlers", "Docs": "", "Typewords": ["[]", "WebHandler"] }] }, "WebHandler": { "Name": "WebHandler", "Docs": "", "Fields": [{ "Name": "LogName", "Docs": "", "Typewords": ["string"] }, { "Name": "Domain", "Docs": "", "Typewords": ["string"] }, { "Name": "PathRegexp", "Docs": "", "Typewords": ["string"] }, { "Name": "DontRedirectPlainHTTP", "Docs": "", "Typewords": ["bool"] }, { "Name": "Compress", "Docs": "", "Typewords": ["bool"] }, { "Name": "WebStatic", "Docs": "", "Typewords": ["nullable", "WebStatic"] }, { "Name": "WebRedirect", "Docs": "", "Typewords": ["nullable", "WebRedirect"] }, { "Name": "WebForward", "Docs": "", "Typewords": ["nullable", "WebForward"] }, { "Name": "Name", "Docs": "", "Typewords": ["string"] }, { "Name": "DNSDomain", "Docs": "", "Typewords": ["Domain"] }] }, @@ -2692,67 +2692,65 @@ const queueList = async () => { client.Transports(), ]); const nowSecs = new Date().getTime() / 1000; - dom._kids(page, crumbs(crumblink('Mox Admin', '#'), 'Queue'), (msgs || []).length === 0 ? 'Currently no messages in the queue.' : [ - dom.p('The messages below are currently in the queue.'), - // todo: sorting by address/timestamps/attempts. perhaps filtering. - dom.table(dom._class('hover'), dom.thead(dom.tr(dom.th('ID'), dom.th('Submitted'), dom.th('From'), dom.th('To'), dom.th('Size'), dom.th('Attempts'), dom.th('Next attempt'), dom.th('Last attempt'), dom.th('Last error'), dom.th('Require TLS'), dom.th('Transport/Retry'), dom.th('Remove'))), dom.tbody((msgs || []).map(m => { - let requiretlsFieldset; - let requiretls; - let transport; - return dom.tr(dom.td('' + m.ID), dom.td(age(new Date(m.Queued), false, nowSecs)), dom.td(m.SenderLocalpart + "@" + ipdomainString(m.SenderDomain)), // todo: escaping of localpart - dom.td(m.RecipientLocalpart + "@" + ipdomainString(m.RecipientDomain)), // todo: escaping of localpart - dom.td(formatSize(m.Size)), dom.td('' + m.Attempts), dom.td(age(new Date(m.NextAttempt), true, nowSecs)), dom.td(m.LastAttempt ? age(new Date(m.LastAttempt), false, nowSecs) : '-'), dom.td(m.LastError || '-'), dom.td(dom.form(requiretlsFieldset = dom.fieldset(requiretls = dom.select(attr.title('How to use TLS for message delivery over SMTP:\n\nDefault: Delivery attempts follow the policies published by the recipient domain: Verification with MTA-STS and/or DANE, or optional opportunistic unverified STARTTLS if the domain does not specify a policy.\n\nWith RequireTLS: For sensitive messages, you may want to require verified TLS. The recipient destination domain SMTP server must support the REQUIRETLS SMTP extension for delivery to succeed. It is automatically chosen when the destination domain mail servers of all recipients are known to support it.\n\nFallback to insecure: If delivery fails due to MTA-STS and/or DANE policies specified by the recipient domain, and the content is not sensitive, you may choose to ignore the recipient domain TLS policies so delivery can succeed.'), dom.option('Default', attr.value('')), dom.option('With RequireTLS', attr.value('yes'), m.RequireTLS === true ? attr.selected('') : []), dom.option('Fallback to insecure', attr.value('no'), m.RequireTLS === false ? attr.selected('') : [])), ' ', dom.submitbutton('Save')), async function submit(e) { - e.preventDefault(); - try { - requiretlsFieldset.disabled = true; - await client.QueueSaveRequireTLS(m.ID, requiretls.value === '' ? null : requiretls.value === 'yes'); - } - catch (err) { - console.log({ err }); - window.alert('Error: ' + errmsg(err)); - return; - } - finally { - requiretlsFieldset.disabled = false; - } - })), dom.td(dom.form(transport = dom.select(attr.title('Transport to use for delivery attempts. The default is direct delivery, connecting to the MX hosts of the domain.'), dom.option('(default)', attr.value('')), Object.keys(transports || []).sort().map(t => dom.option(t, m.Transport === t ? attr.checked('') : []))), ' ', dom.submitbutton('Retry now'), async function submit(e) { - e.preventDefault(); - const target = e.target; - try { - target.disabled = true; - await client.QueueKick(m.ID, transport.value); - } - catch (err) { - console.log({ err }); - window.alert('Error: ' + errmsg(err)); - return; - } - finally { - target.disabled = false; - } - window.location.reload(); // todo: only refresh the list - })), dom.td(dom.clickbutton('Remove', async function click(e) { - e.preventDefault(); - if (!window.confirm('Are you sure you want to remove this message? It will be removed completely.')) { - return; - } - const target = e.target; - try { - target.disabled = true; - await client.QueueDrop(m.ID); - } - catch (err) { - console.log({ err }); - window.alert('Error: ' + errmsg(err)); - return; - } - finally { - target.disabled = false; - } - window.location.reload(); // todo: only refresh the list - }))); - }))), - ]); + dom._kids(page, crumbs(crumblink('Mox Admin', '#'), 'Queue'), + // todo: sorting by address/timestamps/attempts. perhaps filtering. + dom.table(dom._class('hover'), dom.thead(dom.tr(dom.th('ID'), dom.th('Submitted'), dom.th('From'), dom.th('To'), dom.th('Size'), dom.th('Attempts'), dom.th('Next attempt'), dom.th('Last attempt'), dom.th('Last error'), dom.th('Require TLS'), dom.th('Transport/Retry'), dom.th('Remove'))), dom.tbody((msgs || []).length === 0 ? dom.tr(dom.td(attr.colspan('12'), 'Currently no messages in the queue.')) : [], (msgs || []).map(m => { + let requiretlsFieldset; + let requiretls; + let transport; + return dom.tr(dom.td('' + m.ID + (m.BaseID > 0 ? '/' + m.BaseID : '')), dom.td(age(new Date(m.Queued), false, nowSecs)), dom.td(m.SenderLocalpart + "@" + ipdomainString(m.SenderDomain)), // todo: escaping of localpart + dom.td(m.RecipientLocalpart + "@" + ipdomainString(m.RecipientDomain)), // todo: escaping of localpart + dom.td(formatSize(m.Size)), dom.td('' + m.Attempts), dom.td(age(new Date(m.NextAttempt), true, nowSecs)), dom.td(m.LastAttempt ? age(new Date(m.LastAttempt), false, nowSecs) : '-'), dom.td(m.LastError || '-'), dom.td(dom.form(requiretlsFieldset = dom.fieldset(requiretls = dom.select(attr.title('How to use TLS for message delivery over SMTP:\n\nDefault: Delivery attempts follow the policies published by the recipient domain: Verification with MTA-STS and/or DANE, or optional opportunistic unverified STARTTLS if the domain does not specify a policy.\n\nWith RequireTLS: For sensitive messages, you may want to require verified TLS. The recipient destination domain SMTP server must support the REQUIRETLS SMTP extension for delivery to succeed. It is automatically chosen when the destination domain mail servers of all recipients are known to support it.\n\nFallback to insecure: If delivery fails due to MTA-STS and/or DANE policies specified by the recipient domain, and the content is not sensitive, you may choose to ignore the recipient domain TLS policies so delivery can succeed.'), dom.option('Default', attr.value('')), dom.option('With RequireTLS', attr.value('yes'), m.RequireTLS === true ? attr.selected('') : []), dom.option('Fallback to insecure', attr.value('no'), m.RequireTLS === false ? attr.selected('') : [])), ' ', dom.submitbutton('Save')), async function submit(e) { + e.preventDefault(); + try { + requiretlsFieldset.disabled = true; + await client.QueueSaveRequireTLS(m.ID, requiretls.value === '' ? null : requiretls.value === 'yes'); + } + catch (err) { + console.log({ err }); + window.alert('Error: ' + errmsg(err)); + return; + } + finally { + requiretlsFieldset.disabled = false; + } + })), dom.td(dom.form(transport = dom.select(attr.title('Transport to use for delivery attempts. The default is direct delivery, connecting to the MX hosts of the domain.'), dom.option('(default)', attr.value('')), Object.keys(transports || []).sort().map(t => dom.option(t, m.Transport === t ? attr.checked('') : []))), ' ', dom.submitbutton('Retry now'), async function submit(e) { + e.preventDefault(); + const target = e.target; + try { + target.disabled = true; + await client.QueueKick(m.ID, transport.value); + } + catch (err) { + console.log({ err }); + window.alert('Error: ' + errmsg(err)); + return; + } + finally { + target.disabled = false; + } + window.location.reload(); // todo: only refresh the list + })), dom.td(dom.clickbutton('Remove', async function click(e) { + e.preventDefault(); + if (!window.confirm('Are you sure you want to remove this message? It will be removed completely.')) { + return; + } + const target = e.target; + try { + target.disabled = true; + await client.QueueDrop(m.ID); + } + catch (err) { + console.log({ err }); + window.alert('Error: ' + errmsg(err)); + return; + } + finally { + target.disabled = false; + } + window.location.reload(); // todo: only refresh the list + }))); + })))); }; const webserver = async () => { let conf = await client.WebserverConfig(); diff --git a/webadmin/admin.ts b/webadmin/admin.ts index 1ff9cdc..54d3d2f 100644 --- a/webadmin/admin.ts +++ b/webadmin/admin.ts @@ -2300,104 +2300,82 @@ const queueList = async () => { crumblink('Mox Admin', '#'), 'Queue', ), - (msgs || []).length === 0 ? 'Currently no messages in the queue.' : [ - dom.p('The messages below are currently in the queue.'), // todo: sorting by address/timestamps/attempts. perhaps filtering. - dom.table(dom._class('hover'), - dom.thead( - dom.tr( - dom.th('ID'), - dom.th('Submitted'), - dom.th('From'), - dom.th('To'), - dom.th('Size'), - dom.th('Attempts'), - dom.th('Next attempt'), - dom.th('Last attempt'), - dom.th('Last error'), - dom.th('Require TLS'), - dom.th('Transport/Retry'), - dom.th('Remove'), - ), + dom.table(dom._class('hover'), + dom.thead( + dom.tr( + dom.th('ID'), + dom.th('Submitted'), + dom.th('From'), + dom.th('To'), + dom.th('Size'), + dom.th('Attempts'), + dom.th('Next attempt'), + dom.th('Last attempt'), + dom.th('Last error'), + dom.th('Require TLS'), + dom.th('Transport/Retry'), + dom.th('Remove'), ), - dom.tbody( - (msgs || []).map(m => { - let requiretlsFieldset: HTMLFieldSetElement - let requiretls: HTMLSelectElement - let transport: HTMLSelectElement - return dom.tr( - dom.td(''+m.ID), - dom.td(age(new Date(m.Queued), false, nowSecs)), - dom.td(m.SenderLocalpart+"@"+ipdomainString(m.SenderDomain)), // todo: escaping of localpart - dom.td(m.RecipientLocalpart+"@"+ipdomainString(m.RecipientDomain)), // todo: escaping of localpart - dom.td(formatSize(m.Size)), - dom.td(''+m.Attempts), - dom.td(age(new Date(m.NextAttempt), true, nowSecs)), - dom.td(m.LastAttempt ? age(new Date(m.LastAttempt), false, nowSecs) : '-'), - dom.td(m.LastError || '-'), - dom.td( - dom.form( - requiretlsFieldset=dom.fieldset( - requiretls=dom.select( - attr.title('How to use TLS for message delivery over SMTP:\n\nDefault: Delivery attempts follow the policies published by the recipient domain: Verification with MTA-STS and/or DANE, or optional opportunistic unverified STARTTLS if the domain does not specify a policy.\n\nWith RequireTLS: For sensitive messages, you may want to require verified TLS. The recipient destination domain SMTP server must support the REQUIRETLS SMTP extension for delivery to succeed. It is automatically chosen when the destination domain mail servers of all recipients are known to support it.\n\nFallback to insecure: If delivery fails due to MTA-STS and/or DANE policies specified by the recipient domain, and the content is not sensitive, you may choose to ignore the recipient domain TLS policies so delivery can succeed.'), - dom.option('Default', attr.value('')), - dom.option('With RequireTLS', attr.value('yes'), m.RequireTLS === true ? attr.selected('') : []), - dom.option('Fallback to insecure', attr.value('no'), m.RequireTLS === false ? attr.selected('') : []), - ), - ' ', - dom.submitbutton('Save'), - ), - async function submit(e: SubmitEvent) { - e.preventDefault() - try { - requiretlsFieldset.disabled = true - await client.QueueSaveRequireTLS(m.ID, requiretls.value === '' ? null : requiretls.value === 'yes') - } catch (err) { - console.log({err}) - window.alert('Error: ' + errmsg(err)) - return - } finally { - requiretlsFieldset.disabled = false - } - } - ), - ), - dom.td( - dom.form( - transport=dom.select( - attr.title('Transport to use for delivery attempts. The default is direct delivery, connecting to the MX hosts of the domain.'), - dom.option('(default)', attr.value('')), - Object.keys(transports || []).sort().map(t => dom.option(t, m.Transport === t ? attr.checked('') : [])), + ), + dom.tbody( + (msgs || []).length === 0 ? dom.tr(dom.td(attr.colspan('12'), 'Currently no messages in the queue.')) : [], + (msgs || []).map(m => { + let requiretlsFieldset: HTMLFieldSetElement + let requiretls: HTMLSelectElement + let transport: HTMLSelectElement + return dom.tr( + dom.td(''+m.ID + (m.BaseID > 0 ? '/'+m.BaseID : '')), + dom.td(age(new Date(m.Queued), false, nowSecs)), + dom.td(m.SenderLocalpart+"@"+ipdomainString(m.SenderDomain)), // todo: escaping of localpart + dom.td(m.RecipientLocalpart+"@"+ipdomainString(m.RecipientDomain)), // todo: escaping of localpart + dom.td(formatSize(m.Size)), + dom.td(''+m.Attempts), + dom.td(age(new Date(m.NextAttempt), true, nowSecs)), + dom.td(m.LastAttempt ? age(new Date(m.LastAttempt), false, nowSecs) : '-'), + dom.td(m.LastError || '-'), + dom.td( + dom.form( + requiretlsFieldset=dom.fieldset( + requiretls=dom.select( + attr.title('How to use TLS for message delivery over SMTP:\n\nDefault: Delivery attempts follow the policies published by the recipient domain: Verification with MTA-STS and/or DANE, or optional opportunistic unverified STARTTLS if the domain does not specify a policy.\n\nWith RequireTLS: For sensitive messages, you may want to require verified TLS. The recipient destination domain SMTP server must support the REQUIRETLS SMTP extension for delivery to succeed. It is automatically chosen when the destination domain mail servers of all recipients are known to support it.\n\nFallback to insecure: If delivery fails due to MTA-STS and/or DANE policies specified by the recipient domain, and the content is not sensitive, you may choose to ignore the recipient domain TLS policies so delivery can succeed.'), + dom.option('Default', attr.value('')), + dom.option('With RequireTLS', attr.value('yes'), m.RequireTLS === true ? attr.selected('') : []), + dom.option('Fallback to insecure', attr.value('no'), m.RequireTLS === false ? attr.selected('') : []), ), ' ', - dom.submitbutton('Retry now'), - async function submit(e: SubmitEvent) { - e.preventDefault() - const target = e.target! as HTMLButtonElement - try { - target.disabled = true - await client.QueueKick(m.ID, transport.value) - } catch (err) { - console.log({err}) - window.alert('Error: ' + errmsg(err)) - return - } finally { - target.disabled = false - } - window.location.reload() // todo: only refresh the list - } + dom.submitbutton('Save'), ), - ), - dom.td( - dom.clickbutton('Remove', async function click(e: MouseEvent) { + async function submit(e: SubmitEvent) { e.preventDefault() - if (!window.confirm('Are you sure you want to remove this message? It will be removed completely.')) { + try { + requiretlsFieldset.disabled = true + await client.QueueSaveRequireTLS(m.ID, requiretls.value === '' ? null : requiretls.value === 'yes') + } catch (err) { + console.log({err}) + window.alert('Error: ' + errmsg(err)) return + } finally { + requiretlsFieldset.disabled = false } + } + ), + ), + dom.td( + dom.form( + transport=dom.select( + attr.title('Transport to use for delivery attempts. The default is direct delivery, connecting to the MX hosts of the domain.'), + dom.option('(default)', attr.value('')), + Object.keys(transports || []).sort().map(t => dom.option(t, m.Transport === t ? attr.checked('') : [])), + ), + ' ', + dom.submitbutton('Retry now'), + async function submit(e: SubmitEvent) { + e.preventDefault() const target = e.target! as HTMLButtonElement try { target.disabled = true - await client.QueueDrop(m.ID) + await client.QueueKick(m.ID, transport.value) } catch (err) { console.log({err}) window.alert('Error: ' + errmsg(err)) @@ -2406,13 +2384,33 @@ const queueList = async () => { target.disabled = false } window.location.reload() // todo: only refresh the list - }), + } ), - ) - }) - ), + ), + dom.td( + dom.clickbutton('Remove', async function click(e: MouseEvent) { + e.preventDefault() + if (!window.confirm('Are you sure you want to remove this message? It will be removed completely.')) { + return + } + const target = e.target! as HTMLButtonElement + try { + target.disabled = true + await client.QueueDrop(m.ID) + } catch (err) { + console.log({err}) + window.alert('Error: ' + errmsg(err)) + return + } finally { + target.disabled = false + } + window.location.reload() // todo: only refresh the list + }), + ), + ) + }) ), - ], + ), ) } diff --git a/webadmin/api.json b/webadmin/api.json index fec2d3b..430c179 100644 --- a/webadmin/api.json +++ b/webadmin/api.json @@ -3370,6 +3370,13 @@ "int64" ] }, + { + "Name": "BaseID", + "Docs": "A message for multiple recipients will get a BaseID that is identical to the first Msg.ID queued. They may be delivered in a single SMTP transaction if they are going to the same mail server. For messages with a single recipient, this field will be 0.", + "Typewords": [ + "int64" + ] + }, { "Name": "Queued", "Docs": "", diff --git a/webadmin/api.ts b/webadmin/api.ts index c0fd642..6fc55c0 100644 --- a/webadmin/api.ts +++ b/webadmin/api.ts @@ -471,6 +471,7 @@ export interface ClientConfigsEntry { // queueing related fields. export interface Msg { ID: number + BaseID: number // A message for multiple recipients will get a BaseID that is identical to the first Msg.ID queued. They may be delivered in a single SMTP transaction if they are going to the same mail server. For messages with a single recipient, this field will be 0. Queued: Date SenderAccount: string // Failures are delivered back to this local account. Also used for routing. SenderLocalpart: Localpart // Should be a local user and domain. @@ -837,7 +838,7 @@ export const types: TypenameMap = { "Reverse": {"Name":"Reverse","Docs":"","Fields":[{"Name":"Hostnames","Docs":"","Typewords":["[]","string"]}]}, "ClientConfigs": {"Name":"ClientConfigs","Docs":"","Fields":[{"Name":"Entries","Docs":"","Typewords":["[]","ClientConfigsEntry"]}]}, "ClientConfigsEntry": {"Name":"ClientConfigsEntry","Docs":"","Fields":[{"Name":"Protocol","Docs":"","Typewords":["string"]},{"Name":"Host","Docs":"","Typewords":["Domain"]},{"Name":"Port","Docs":"","Typewords":["int32"]},{"Name":"Listener","Docs":"","Typewords":["string"]},{"Name":"Note","Docs":"","Typewords":["string"]}]}, - "Msg": {"Name":"Msg","Docs":"","Fields":[{"Name":"ID","Docs":"","Typewords":["int64"]},{"Name":"Queued","Docs":"","Typewords":["timestamp"]},{"Name":"SenderAccount","Docs":"","Typewords":["string"]},{"Name":"SenderLocalpart","Docs":"","Typewords":["Localpart"]},{"Name":"SenderDomain","Docs":"","Typewords":["IPDomain"]},{"Name":"RecipientLocalpart","Docs":"","Typewords":["Localpart"]},{"Name":"RecipientDomain","Docs":"","Typewords":["IPDomain"]},{"Name":"RecipientDomainStr","Docs":"","Typewords":["string"]},{"Name":"Attempts","Docs":"","Typewords":["int32"]},{"Name":"MaxAttempts","Docs":"","Typewords":["int32"]},{"Name":"DialedIPs","Docs":"","Typewords":["{}","[]","IP"]},{"Name":"NextAttempt","Docs":"","Typewords":["timestamp"]},{"Name":"LastAttempt","Docs":"","Typewords":["nullable","timestamp"]},{"Name":"LastError","Docs":"","Typewords":["string"]},{"Name":"Has8bit","Docs":"","Typewords":["bool"]},{"Name":"SMTPUTF8","Docs":"","Typewords":["bool"]},{"Name":"IsDMARCReport","Docs":"","Typewords":["bool"]},{"Name":"IsTLSReport","Docs":"","Typewords":["bool"]},{"Name":"Size","Docs":"","Typewords":["int64"]},{"Name":"MessageID","Docs":"","Typewords":["string"]},{"Name":"MsgPrefix","Docs":"","Typewords":["nullable","string"]},{"Name":"DSNUTF8","Docs":"","Typewords":["nullable","string"]},{"Name":"Transport","Docs":"","Typewords":["string"]},{"Name":"RequireTLS","Docs":"","Typewords":["nullable","bool"]},{"Name":"FutureReleaseRequest","Docs":"","Typewords":["string"]}]}, + "Msg": {"Name":"Msg","Docs":"","Fields":[{"Name":"ID","Docs":"","Typewords":["int64"]},{"Name":"BaseID","Docs":"","Typewords":["int64"]},{"Name":"Queued","Docs":"","Typewords":["timestamp"]},{"Name":"SenderAccount","Docs":"","Typewords":["string"]},{"Name":"SenderLocalpart","Docs":"","Typewords":["Localpart"]},{"Name":"SenderDomain","Docs":"","Typewords":["IPDomain"]},{"Name":"RecipientLocalpart","Docs":"","Typewords":["Localpart"]},{"Name":"RecipientDomain","Docs":"","Typewords":["IPDomain"]},{"Name":"RecipientDomainStr","Docs":"","Typewords":["string"]},{"Name":"Attempts","Docs":"","Typewords":["int32"]},{"Name":"MaxAttempts","Docs":"","Typewords":["int32"]},{"Name":"DialedIPs","Docs":"","Typewords":["{}","[]","IP"]},{"Name":"NextAttempt","Docs":"","Typewords":["timestamp"]},{"Name":"LastAttempt","Docs":"","Typewords":["nullable","timestamp"]},{"Name":"LastError","Docs":"","Typewords":["string"]},{"Name":"Has8bit","Docs":"","Typewords":["bool"]},{"Name":"SMTPUTF8","Docs":"","Typewords":["bool"]},{"Name":"IsDMARCReport","Docs":"","Typewords":["bool"]},{"Name":"IsTLSReport","Docs":"","Typewords":["bool"]},{"Name":"Size","Docs":"","Typewords":["int64"]},{"Name":"MessageID","Docs":"","Typewords":["string"]},{"Name":"MsgPrefix","Docs":"","Typewords":["nullable","string"]},{"Name":"DSNUTF8","Docs":"","Typewords":["nullable","string"]},{"Name":"Transport","Docs":"","Typewords":["string"]},{"Name":"RequireTLS","Docs":"","Typewords":["nullable","bool"]},{"Name":"FutureReleaseRequest","Docs":"","Typewords":["string"]}]}, "IPDomain": {"Name":"IPDomain","Docs":"","Fields":[{"Name":"IP","Docs":"","Typewords":["IP"]},{"Name":"Domain","Docs":"","Typewords":["Domain"]}]}, "WebserverConfig": {"Name":"WebserverConfig","Docs":"","Fields":[{"Name":"WebDNSDomainRedirects","Docs":"","Typewords":["[]","[]","Domain"]},{"Name":"WebDomainRedirects","Docs":"","Typewords":["[]","[]","string"]},{"Name":"WebHandlers","Docs":"","Typewords":["[]","WebHandler"]}]}, "WebHandler": {"Name":"WebHandler","Docs":"","Fields":[{"Name":"LogName","Docs":"","Typewords":["string"]},{"Name":"Domain","Docs":"","Typewords":["string"]},{"Name":"PathRegexp","Docs":"","Typewords":["string"]},{"Name":"DontRedirectPlainHTTP","Docs":"","Typewords":["bool"]},{"Name":"Compress","Docs":"","Typewords":["bool"]},{"Name":"WebStatic","Docs":"","Typewords":["nullable","WebStatic"]},{"Name":"WebRedirect","Docs":"","Typewords":["nullable","WebRedirect"]},{"Name":"WebForward","Docs":"","Typewords":["nullable","WebForward"]},{"Name":"Name","Docs":"","Typewords":["string"]},{"Name":"DNSDomain","Docs":"","Typewords":["Domain"]}]}, diff --git a/webmail/api.go b/webmail/api.go index c9ecaba..cbaa4d7 100644 --- a/webmail/api.go +++ b/webmail/api.go @@ -628,14 +628,15 @@ func (w Webmail) MessageSubmit(ctx context.Context, m SubmitMessage) { Localpart: fromAddr.Address.Localpart, IPDomain: dns.IPDomain{Domain: fromAddr.Address.Domain}, } - for _, rcpt := range recipients { + qml := make([]queue.Msg, len(recipients)) + for i, rcpt := range recipients { rcptMsgPrefix := recvHdrFor(rcpt.Pack(smtputf8)) + msgPrefix msgSize := int64(len(rcptMsgPrefix)) + xc.Size toPath := smtp.Path{ Localpart: rcpt.Localpart, IPDomain: dns.IPDomain{Domain: rcpt.Domain}, } - qm := queue.MakeMsg(reqInfo.AccountName, fromPath, toPath, has8bit, smtputf8, msgSize, messageID, []byte(rcptMsgPrefix), m.RequireTLS) + qm := queue.MakeMsg(fromPath, toPath, has8bit, smtputf8, msgSize, messageID, []byte(rcptMsgPrefix), m.RequireTLS) if m.FutureRelease != nil { ival := time.Until(*m.FutureRelease) if ival < 0 { @@ -647,13 +648,13 @@ func (w Webmail) MessageSubmit(ctx context.Context, m SubmitMessage) { qm.FutureReleaseRequest = "until;" + m.FutureRelease.Format(time.RFC3339) // todo: possibly add a header to the message stored in the Sent mailbox to indicate it was scheduled for later delivery. } - err := queue.Add(ctx, log, &qm, dataFile) - if err != nil { - metricSubmission.WithLabelValues("queueerror").Inc() - } - xcheckf(ctx, err, "adding message to the delivery queue") - metricSubmission.WithLabelValues("ok").Inc() + qml[i] = qm } + if err := queue.Add(ctx, log, reqInfo.AccountName, dataFile, qml...); err != nil { + metricSubmission.WithLabelValues("queueerror").Inc() + } + xcheckf(ctx, err, "adding messages to the delivery queue") + metricSubmission.WithLabelValues("ok").Inc() var modseq store.ModSeq // Only set if needed.