diff --git a/dmarcdb/eval.go b/dmarcdb/eval.go index b293b07..2f175ba 100644 --- a/dmarcdb/eval.go +++ b/dmarcdb/eval.go @@ -1,17 +1,13 @@ package dmarcdb import ( - "bufio" "compress/gzip" "context" "encoding/xml" - "errors" "fmt" "io" "mime" "mime/multipart" - "mime/quotedprintable" - "net/mail" "net/textproto" "net/url" "os" @@ -342,9 +338,12 @@ func Start(resolver dns.Resolver) { _, err := bstore.QueryDB[Evaluation](ctx, db).FilterLess("Evaluated", nextEnd.Add(-48*time.Hour)).Delete() log.Check(err, "removing stale dmarc evaluations from database") + log.Info("sending dmarc aggregate reports", mlog.Field("end", nextEnd.UTC()), mlog.Field("intervals", intervals)) if err := sendReports(ctx, log.WithCid(mox.Cid()), resolver, db, nextEnd, intervals); err != nil { log.Errorx("sending dmarc aggregate reports", err) metricReportError.Inc() + } else { + log.Info("finding sending dmarc aggregate reports") } } }() @@ -361,15 +360,20 @@ func nextWholeHour(now time.Time) time.Time { // perhaps faster and in parallel when there are lots of reports. Perhaps also // depending on reporting interval (faster for 1h, slower for 24h). // Replaced by tests. -var sleepBetween = func() { - time.Sleep(3 * time.Second) +var sleepBetween = func(ctx context.Context, between time.Duration) (ok bool) { + t := time.NewTimer(between) + select { + case <-ctx.Done(): + t.Stop() + return false + case <-t.C: + return true + } } // sendReports gathers all policy domains that have evaluations that should // receive a DMARC report and sends a report to each. func sendReports(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db *bstore.DB, endTime time.Time, intervals []int) error { - log.Info("sending dmarc aggregate reports", mlog.Field("end", endTime.UTC()), mlog.Field("intervals", intervals)) - ivals := make([]any, len(intervals)) for i, v := range intervals { ivals[i] = v @@ -378,10 +382,14 @@ func sendReports(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db * destDomains := map[string]bool{} // Gather all domains that we plan to send to. + nsend := 0 q := bstore.QueryDB[Evaluation](ctx, db) q.FilterLess("Evaluated", endTime) q.FilterEqual("IntervalHours", ivals...) err := q.ForEach(func(e Evaluation) error { + if !e.Optional && !destDomains[e.PolicyPublished.Domain] { + nsend++ + } destDomains[e.PolicyPublished.Domain] = destDomains[e.PolicyPublished.Domain] || !e.Optional return nil }) @@ -389,7 +397,20 @@ func sendReports(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db * return fmt.Errorf("looking for domains to send reports to: %v", err) } + var wg sync.WaitGroup + + // Sleep in between sending reports. We process hourly, and spread the reports over + // the hour, but with max 5 minute interval. + between := 45 * time.Minute + if nsend > 0 { + between /= time.Duration(nsend) + } + if between > 5*time.Minute { + between = 5 * time.Minute + } + // Attempt to send report to each domain. + n := 0 for d, send := range destDomains { // Cleanup evaluations for domain with only optionals. if !send { @@ -397,15 +418,37 @@ func sendReports(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db * continue } - rlog := log.WithCid(mox.Cid()).Fields(mlog.Field("domain", d)) - if _, err := sendReportDomain(ctx, rlog, resolver, db, endTime, d); err != nil { - rlog.Errorx("sending dmarc aggregate report to domain", err) - metricReportError.Inc() + if n > 0 { + if ok := sleepBetween(ctx, between); !ok { + return nil + } } + n++ - sleepBetween() + // Send in goroutine, so a slow process doesn't block progress. + wg.Add(1) + go func(domain string) { + defer func() { + // In case of panic don't take the whole program down. + x := recover() + if x != nil { + log.Error("unhandled panic in dmarcdb sendReports", mlog.Field("panic", x)) + debug.PrintStack() + metrics.PanicInc(metrics.Dmarcdb) + } + }() + defer wg.Done() + + rlog := log.WithCid(mox.Cid()).Fields(mlog.Field("domain", domain)) + if _, err := sendReportDomain(ctx, rlog, resolver, db, endTime, domain); err != nil { + rlog.Errorx("sending dmarc aggregate report to domain", err) + metricReportError.Inc() + } + }(d) } + wg.Wait() + return nil } @@ -508,7 +551,7 @@ func sendReportDomain(ctx context.Context, log *mlog.Log, resolver dns.Resolver, // todo future: we could perhaps still send this report, assuming the values we know. in case of temporary error, we could also schedule again regardless of next interval hour (we would now only retry a 24h-interval report after 24h passed). // Remove records unless it was a temporary error. We'll try again next round. cleanup = status != dmarc.StatusTemperror - return cleanup, fmt.Errorf("looking up current dmarc record for reporting address") + return cleanup, fmt.Errorf("looking up current dmarc record for reporting address: %v", err) } var recipients []recipient @@ -717,7 +760,7 @@ func sendReportDomain(ctx context.Context, log *mlog.Log, resolver dns.Resolver, from := smtp.Address{Localpart: "postmaster", Domain: mox.Conf.Static.HostnameDomain} // Subject follows the form in RFC. ../rfc/7489:1871 - subject := fmt.Sprintf("Report Domain: %s Submitter: %s Report-ID: %s", dom.ASCII, mox.Conf.Static.HostnameDomain.ASCII, report.ReportMetadata.ReportID) + subject := fmt.Sprintf("Report Domain: %s Submitter: %s Report-ID: <%s>", dom.ASCII, mox.Conf.Static.HostnameDomain.ASCII, report.ReportMetadata.ReportID) // Human-readable part for convenience. ../rfc/7489:1803 text := fmt.Sprintf(`Attached is an aggregate DMARC report with results of evaluations of the DMARC @@ -732,7 +775,7 @@ Period: %s - %s UTC `, dom, mox.Conf.Static.HostnameDomain, report.ReportMetadata.ReportID, beginTime.UTC().Format(time.DateTime), endTime.UTC().Format(time.DateTime)) // The attached file follows the naming convention from the RFC. ../rfc/7489:1812 - reportFilename := fmt.Sprintf("%s!%s!%d!%d!%s.xml.gz", mox.Conf.Static.HostnameDomain.ASCII, dom.ASCII, beginTime.Unix(), endTime.Add(-time.Second).Unix(), report.ReportMetadata.ReportID) + reportFilename := fmt.Sprintf("%s!%s!%d!%d.xml.gz", mox.Conf.Static.HostnameDomain.ASCII, dom.ASCII, beginTime.Unix(), endTime.Add(-time.Second).Unix()) var addrs []smtp.Address for _, rcpt := range recipients { @@ -796,175 +839,41 @@ Period: %s - %s UTC return true, nil } -// todo future: factor out common code for composing messages, between webmail/api.go and the two compose functions below - -// xcomposer is a helper to compose a message. Operations on xcomposer panic -// with "err", which xcomposer.recover recovers. -type xcomposer struct { - w *bufio.Writer - err error - has8bit bool - smtputf8 bool -} - -// Write implements io.Writer, but calls panic (that is handled higher up) on -// i/o errors. -func (xc *xcomposer) Write(buf []byte) (int, error) { - n, err := xc.w.Write(buf) - xc.checkf(err, "write") - return n, nil -} - -// recover sentical error, storing it into rerr. -func (xc *xcomposer) recover(rerr *error) { - x := recover() - if x == nil { - return - } - if err, ok := x.(error); ok && errors.Is(err, xc.err) { - *rerr = err - } else { - panic(x) - } -} - -// check error, panicing with sentinal error value. -func (xc *xcomposer) checkf(err error, format string, args ...any) { - if err != nil { - panic(fmt.Errorf("%w: %s: %v", xc.err, err, fmt.Sprintf(format, args...))) - } -} - -// write a message header. -func (xc *xcomposer) header(k, v string) { - fmt.Fprintf(xc, "%s: %s\r\n", k, v) -} - -// write a message header with addresses. -func (xc *xcomposer) headerAddrs(k string, l []smtp.Address) { - if len(l) == 0 { - return - } - v := "" - linelen := len(k) + len(": ") - for _, a := range l { - if v != "" { - v += "," - linelen++ - } - addr := mail.Address{Address: a.Pack(xc.smtputf8)} - s := addr.String() - if v != "" && linelen+1+len(s) > 77 { - v += "\r\n\t" - linelen = 1 - } else if v != "" { - v += " " - linelen++ - } - v += s - linelen += len(s) - } - fmt.Fprintf(xc, "%s: %s\r\n", k, v) -} - -// write a subject message header. -func (xc *xcomposer) subject(subject string) { - var subjectValue string - subjectLineLen := len("Subject: ") - subjectWord := false - for i, word := range strings.Split(subject, " ") { - if !xc.smtputf8 && !isASCII(word) { - word = mime.QEncoding.Encode("utf-8", word) - } - if i > 0 { - subjectValue += " " - subjectLineLen++ - } - if subjectWord && subjectLineLen+len(word) > 77 { - subjectValue += "\r\n\t" - subjectLineLen = 1 - } - subjectValue += word - subjectLineLen += len(word) - subjectWord = true - } - xc.header("Subject", subjectValue) -} - -// write an empty line. -func (xc *xcomposer) line() { - _, _ = xc.Write([]byte("\r\n")) -} - -func (xc *xcomposer) textPart(text string) (textBody []byte, ct, cte string) { - if !strings.HasSuffix(text, "\n") { - text += "\n" - } - text = strings.ReplaceAll(text, "\n", "\r\n") - charset := "us-ascii" - if !isASCII(text) { - charset = "utf-8" - } - if message.NeedsQuotedPrintable(text) { - var sb strings.Builder - _, err := io.Copy(quotedprintable.NewWriter(&sb), strings.NewReader(text)) - xc.checkf(err, "converting text to quoted printable") - text = sb.String() - cte = "quoted-printable" - } else if xc.has8bit || charset == "utf-8" { - cte = "8bit" - } else { - cte = "7bit" - } - - ct = mime.FormatMediaType("text/plain", map[string]string{"charset": charset}) - return []byte(text), ct, cte -} - -func isASCII(s string) bool { - for _, c := range s { - if c >= 0x80 { - return false - } - } - return true -} - func composeAggregateReport(ctx context.Context, log *mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []smtp.Address, subject, text, filename string, reportXMLGzipFile *os.File) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) { - xc := &xcomposer{bufio.NewWriter(mf), errors.New("compose"), false, false} - defer xc.recover(&rerr) + xc := message.NewComposer(mf) + defer xc.Recover(&rerr) // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains. for _, a := range recipients { if a.Localpart.IsInternational() { - xc.smtputf8 = true + xc.SMTPUTF8 = true break } } - xc.headerAddrs("From", []smtp.Address{fromAddr}) - xc.headerAddrs("To", recipients) - xc.subject(subject) - messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(smtputf8)) - xc.header("Message-Id", messageID) - xc.header("Date", time.Now().Format(message.RFC5322Z)) - xc.header("User-Agent", "mox/"+moxvar.Version) - xc.header("MIME-Version", "1.0") + xc.HeaderAddrs("From", []smtp.Address{fromAddr}) + xc.HeaderAddrs("To", recipients) + xc.Subject(subject) + messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8)) + xc.Header("Message-Id", messageID) + xc.Header("Date", time.Now().Format(message.RFC5322Z)) + xc.Header("User-Agent", "mox/"+moxvar.Version) + xc.Header("MIME-Version", "1.0") // Multipart message, with a text/plain and the report attached. mp := multipart.NewWriter(xc) - xc.header("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, mp.Boundary())) - xc.line() + xc.Header("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, mp.Boundary())) + xc.Line() // Textual part, just mentioning this is a DMARC report. - textBody, ct, cte := xc.textPart(text) + textBody, ct, cte := xc.TextPart(text) textHdr := textproto.MIMEHeader{} textHdr.Set("Content-Type", ct) textHdr.Set("Content-Transfer-Encoding", cte) textp, err := mp.CreatePart(textHdr) - xc.checkf(err, "adding text part to message") + xc.Checkf(err, "adding text part to message") _, err = textp.Write(textBody) - xc.checkf(err, "writing text part") + xc.Checkf(err, "writing text part") // DMARC report as attachment. ahdr := textproto.MIMEHeader{} @@ -973,21 +882,21 @@ func composeAggregateReport(ctx context.Context, log *mlog.Log, mf *os.File, fro cd := mime.FormatMediaType("attachment", map[string]string{"filename": filename}) ahdr.Set("Content-Disposition", cd) ap, err := mp.CreatePart(ahdr) - xc.checkf(err, "adding dmarc aggregate report to message") + xc.Checkf(err, "adding dmarc aggregate report to message") wc := moxio.Base64Writer(ap) _, err = io.Copy(wc, &moxio.AtReader{R: reportXMLGzipFile}) - xc.checkf(err, "adding attachment") + xc.Checkf(err, "adding attachment") err = wc.Close() - xc.checkf(err, "flushing attachment") + xc.Checkf(err, "flushing attachment") err = mp.Close() - xc.checkf(err, "closing multipart") + xc.Checkf(err, "closing multipart") - xc.w.Flush() + xc.Flush() - msgPrefix = dkimSign(ctx, log, fromAddr, smtputf8, mf) + msgPrefix = dkimSign(ctx, log, fromAddr, xc.SMTPUTF8, mf) - return msgPrefix, xc.has8bit, xc.smtputf8, messageID, nil + return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil } // Though this functionality is quite underspecified, we'll do our best to send our @@ -1048,38 +957,38 @@ Submitting-URI: %s } func composeErrorReport(ctx context.Context, log *mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []smtp.Address, subject, text string) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) { - xc := &xcomposer{bufio.NewWriter(mf), errors.New("compose"), false, false} - defer xc.recover(&rerr) + xc := message.NewComposer(mf) + defer xc.Recover(&rerr) // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains. for _, a := range recipients { if a.Localpart.IsInternational() { - xc.smtputf8 = true + xc.SMTPUTF8 = true break } } - xc.headerAddrs("From", []smtp.Address{fromAddr}) - xc.headerAddrs("To", recipients) - xc.header("Subject", subject) - messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(smtputf8)) - xc.header("Message-Id", messageID) - xc.header("Date", time.Now().Format(message.RFC5322Z)) - xc.header("User-Agent", "mox/"+moxvar.Version) - xc.header("MIME-Version", "1.0") + xc.HeaderAddrs("From", []smtp.Address{fromAddr}) + xc.HeaderAddrs("To", recipients) + xc.Header("Subject", subject) + messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8)) + xc.Header("Message-Id", messageID) + xc.Header("Date", time.Now().Format(message.RFC5322Z)) + xc.Header("User-Agent", "mox/"+moxvar.Version) + xc.Header("MIME-Version", "1.0") - textBody, ct, cte := xc.textPart(text) - xc.header("Content-Type", ct) - xc.header("Content-Transfer-Encoding", cte) - xc.line() + textBody, ct, cte := xc.TextPart(text) + xc.Header("Content-Type", ct) + xc.Header("Content-Transfer-Encoding", cte) + xc.Line() _, err := xc.Write(textBody) - xc.checkf(err, "writing text") + xc.Checkf(err, "writing text") - xc.w.Flush() + xc.Flush() msgPrefix = dkimSign(ctx, log, fromAddr, smtputf8, mf) - return msgPrefix, xc.has8bit, xc.smtputf8, messageID, nil + return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil } func dkimSign(ctx context.Context, log *mlog.Log, fromAddr smtp.Address, smtputf8 bool, mf *os.File) string { diff --git a/dmarcdb/eval_test.go b/dmarcdb/eval_test.go index 147e593..8b13ef8 100644 --- a/dmarcdb/eval_test.go +++ b/dmarcdb/eval_test.go @@ -276,7 +276,7 @@ func TestSendReports(t *testing.T) { return <-step } - sleepBetween = func() {} + sleepBetween = func(ctx context.Context, between time.Duration) (ok bool) { return true } test := func(evals []Evaluation, expAggrAddrs map[string]struct{}, expErrorAddrs map[string]struct{}, optExpReport *dmarcrpt.Feedback) { t.Helper() diff --git a/message/compose.go b/message/compose.go new file mode 100644 index 0000000..a196778 --- /dev/null +++ b/message/compose.go @@ -0,0 +1,162 @@ +package message + +import ( + "bufio" + "errors" + "fmt" + "io" + "mime" + "mime/quotedprintable" + "net/mail" + "strings" + + "github.com/mjl-/mox/smtp" +) + +var errCompose = errors.New("compose") + +// Composer helps compose a message. Operations that fail call panic, which can be +// caught with Composer.Recover. Writes are buffered. +type Composer struct { + Has8bit bool // Whether message contains 8bit data. + SMTPUTF8 bool // Whether message needs to be sent with SMTPUTF8 extension. + + bw *bufio.Writer +} + +func NewComposer(w io.Writer) *Composer { + return &Composer{bw: bufio.NewWriter(w)} +} + +// Write implements io.Writer, but calls panic (that is handled higher up) on +// i/o errors. +func (c *Composer) Write(buf []byte) (int, error) { + n, err := c.bw.Write(buf) + c.Checkf(err, "write") + return n, nil +} + +// Recover recovers the sentinel panic error value, storing it into rerr. +func (c *Composer) Recover(rerr *error) { + x := recover() + if x == nil { + return + } + if err, ok := x.(error); ok && errors.Is(err, errCompose) { + *rerr = err + } else { + panic(x) + } +} + +// Checkf checks err, panicing with sentinel error value. +func (c *Composer) Checkf(err error, format string, args ...any) { + if err != nil { + panic(fmt.Errorf("%w: %s: %v", errCompose, err, fmt.Sprintf(format, args...))) + } +} + +// Flush writes any buffered output. +func (c *Composer) Flush() { + err := c.bw.Flush() + c.Checkf(err, "flush") +} + +// Header writes a message header. +func (c *Composer) Header(k, v string) { + fmt.Fprintf(c, "%s: %s\r\n", k, v) +} + +// HeaderAddrs writes a message header with addresses. +func (c *Composer) HeaderAddrs(k string, l []smtp.Address) { + if len(l) == 0 { + return + } + v := "" + linelen := len(k) + len(": ") + for _, a := range l { + if v != "" { + v += "," + linelen++ + } + addr := mail.Address{Address: a.Pack(c.SMTPUTF8)} + s := addr.String() + if v != "" && linelen+1+len(s) > 77 { + v += "\r\n\t" + linelen = 1 + } else if v != "" { + v += " " + linelen++ + } + v += s + linelen += len(s) + } + fmt.Fprintf(c, "%s: %s\r\n", k, v) +} + +// Subject writes a subject message header. +func (c *Composer) Subject(subject string) { + var subjectValue string + subjectLineLen := len("Subject: ") + subjectWord := false + for i, word := range strings.Split(subject, " ") { + if !c.SMTPUTF8 && !isASCII(word) { + word = mime.QEncoding.Encode("utf-8", word) + } + if i > 0 { + subjectValue += " " + subjectLineLen++ + } + if subjectWord && subjectLineLen+len(word) > 77 { + subjectValue += "\r\n\t" + subjectLineLen = 1 + } + subjectValue += word + subjectLineLen += len(word) + subjectWord = true + } + c.Header("Subject", subjectValue) +} + +// Line writes an empty line. +func (c *Composer) Line() { + _, _ = c.Write([]byte("\r\n")) +} + +// TextPart prepares a text part to be added. Text should contain lines terminated +// with newlines (lf), which are replaced with crlf. The returned text may be +// quotedprintable, if needed. The returned ct and cte headers are for use with +// Content-Type and Content-Transfer-Encoding headers. +func (c *Composer) TextPart(text string) (textBody []byte, ct, cte string) { + if !strings.HasSuffix(text, "\n") { + text += "\n" + } + text = strings.ReplaceAll(text, "\n", "\r\n") + charset := "us-ascii" + if !isASCII(text) { + charset = "utf-8" + } + if NeedsQuotedPrintable(text) { + var sb strings.Builder + _, err := io.Copy(quotedprintable.NewWriter(&sb), strings.NewReader(text)) + c.Checkf(err, "converting text to quoted printable") + text = sb.String() + cte = "quoted-printable" + } else if c.Has8bit || charset == "utf-8" { + cte = "8bit" + } else { + cte = "7bit" + } + + ct = mime.FormatMediaType("text/plain", map[string]string{"charset": charset}) + return []byte(text), ct, cte +} + +func isASCII(s string) bool { + for _, c := range s { + if c >= 0x80 { + return false + } + } + return true +}