From 8c3c12d96a55b4624f03939887e87c8a3422ce5e Mon Sep 17 00:00:00 2001 From: Mechiel Lukkien Date: Tue, 8 Aug 2023 22:10:53 +0200 Subject: [PATCH] add message size consistency check the bulk of a message is stored on disk. a message prefix is stored in the database (for prefixed headers like "Received:"). this adds a check to ensure Size = prefix length + on-disk file size. verifydata also checks for this now. and one older and one new (since yesterday) bug was found. the first when appending a message without a header/body section (uncommon). the second when sending messages from webmail with localserve (uncommon). --- ctl.go | 148 ++++++++++++++++++++++++++++++++++++-- ctl_test.go | 50 +++++++++++++ imapserver/server.go | 2 +- imapserver/status_test.go | 4 +- main.go | 32 +++++++++ queue/queue.go | 2 +- smtpserver/server_test.go | 4 +- store/account.go | 22 +++++- verifydata.go | 12 ++-- 9 files changed, 259 insertions(+), 17 deletions(-) diff --git a/ctl.go b/ctl.go index 6d9f20d..1eacaae 100644 --- a/ctl.go +++ b/ctl.go @@ -723,6 +723,142 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) { }) w.xclose() + case "fixmsgsize": + /* protocol: + > "fixmsgsize" + > account or empty + < "ok" or error + < stream + */ + + accountOpt := ctl.xread() + ctl.xwriteok() + w := ctl.writer() + + var foundProblem bool + const batchSize = 10000 + + xfixmsgsize := func(accName string) { + acc, err := store.OpenAccount(accName) + ctl.xcheck(err, "open account") + defer func() { + err := acc.Close() + log.Check(err, "closing account after fixing message sizes") + }() + + total := 0 + var lastID int64 + for { + var n int + + acc.WithRLock(func() { + mailboxCounts := map[int64]store.Mailbox{} // For broadcasting. + + // Don't process all message in one transaction, we could block the account for too long. + err := acc.DB.Write(ctx, func(tx *bstore.Tx) error { + q := bstore.QueryTx[store.Message](tx) + q.FilterEqual("Expunged", false) + q.FilterGreater("ID", lastID) + q.Limit(batchSize) + q.SortAsc("ID") + return q.ForEach(func(m store.Message) error { + lastID = m.ID + + p := acc.MessagePath(m.ID) + st, err := os.Stat(p) + if err != nil { + mb := store.Mailbox{ID: m.MailboxID} + if xerr := tx.Get(&mb); xerr != nil { + _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr) + ctl.xcheck(werr, "write") + } + _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err) + ctl.xcheck(werr, "write") + return nil + } + filesize := st.Size() + correctSize := int64(len(m.MsgPrefix)) + filesize + if m.Size == correctSize { + return nil + } + + foundProblem = true + + mb := store.Mailbox{ID: m.MailboxID} + if err := tx.Get(&mb); err != nil { + _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err) + ctl.xcheck(werr, "write") + } + _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize) + ctl.xcheck(err, "write") + + // We assume that the original message size was accounted as stored in the mailbox + // total size. If this isn't correct, the user can always run + // recalculatemailboxcounts. + mb.Size -= m.Size + mb.Size += correctSize + if err := tx.Update(&mb); err != nil { + return fmt.Errorf("update mailbox counts: %v", err) + } + mailboxCounts[mb.ID] = mb + + m.Size = correctSize + + mr := acc.MessageReader(m) + part, err := message.EnsurePart(mr, m.Size) + if err != nil { + _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err) + ctl.xcheck(werr, "write") + } + m.ParsedBuf, err = json.Marshal(part) + if err != nil { + return fmt.Errorf("marshal parsed message: %v", err) + } + total++ + n++ + if err := tx.Update(&m); err != nil { + return fmt.Errorf("update message: %v", err) + } + return nil + }) + + }) + ctl.xcheck(err, "find and fix wrong message sizes") + + var changes []store.Change + for _, mb := range mailboxCounts { + changes = append(changes, mb.ChangeCounts()) + } + store.BroadcastChanges(acc, changes) + }) + if n < batchSize { + break + } + } + _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName) + ctl.xcheck(err, "write") + } + + if accountOpt != "" { + xfixmsgsize(accountOpt) + } else { + for i, accName := range mox.Conf.Accounts() { + var line string + if i > 0 { + line = "\n" + } + _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName) + ctl.xcheck(err, "write") + xfixmsgsize(accName) + } + } + if foundProblem { + _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n") + ctl.xcheck(err, "write") + } + + w.xclose() + case "reparse": /* protocol: > "reparse" @@ -735,6 +871,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) { ctl.xwriteok() w := ctl.writer() + const batchSize = 100 + xreparseAccount := func(accName string) { acc, err := store.OpenAccount(accName) ctl.xcheck(err, "open account") @@ -747,12 +885,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) { var lastID int64 for { var n int - // Batch in transactions of 100 messages, so we don't block the account too long. + // Don't process all message in one transaction, we could block the account for too long. err := acc.DB.Write(ctx, func(tx *bstore.Tx) error { q := bstore.QueryTx[store.Message](tx) q.FilterEqual("Expunged", false) q.FilterGreater("ID", lastID) - q.Limit(100) + q.Limit(batchSize) q.SortAsc("ID") return q.ForEach(func(m store.Message) error { lastID = m.ID @@ -776,11 +914,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) { }) ctl.xcheck(err, "update messages with parsed mime structure") - if n < 100 { + if n < batchSize { break } } - _, err = fmt.Fprintf(w, "%d messages reparsed for account %s\n", total, accName) + _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName) ctl.xcheck(err, "write") } @@ -792,7 +930,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) { if i > 0 { line = "\n" } - _, err := fmt.Fprintf(w, "%sreparsing account %s\n", line, accName) + _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName) ctl.xcheck(err, "write") xreparseAccount(accName) } diff --git a/ctl_test.go b/ctl_test.go index 4aa18ae..3f957e7 100644 --- a/ctl_test.go +++ b/ctl_test.go @@ -156,10 +156,60 @@ func TestCtl(t *testing.T) { ctlcmdImport(ctl, false, "mjl", "inbox", "testdata/ctl/data/tmp/export/maildir/Inbox") }) + // "recalculatemailboxcounts" testctl(func(ctl *ctl) { ctlcmdRecalculateMailboxCounts(ctl, "mjl") }) + // "fixmsgsize" + testctl(func(ctl *ctl) { + ctlcmdFixmsgsize(ctl, "mjl") + }) + testctl(func(ctl *ctl) { + acc, err := store.OpenAccount("mjl") + tcheck(t, err, "open account") + defer acc.Close() + + content := []byte("Subject: hi\r\n\r\nbody\r\n") + + deliver := func(m *store.Message) { + t.Helper() + m.Size = int64(len(content)) + msgf, err := store.CreateMessageTemp("ctltest") + tcheck(t, err, "create temp file") + _, err = msgf.Write(content) + tcheck(t, err, "write message file") + err = acc.DeliverMailbox(xlog, "Inbox", m, msgf, true) + tcheck(t, err, "deliver message") + err = msgf.Close() + tcheck(t, err, "close message file") + } + + var msgBadSize store.Message + deliver(&msgBadSize) + + msgBadSize.Size = 1 + err = acc.DB.Update(ctxbg, &msgBadSize) + tcheck(t, err, "update message to bad size") + mb := store.Mailbox{ID: msgBadSize.MailboxID} + err = acc.DB.Get(ctxbg, &mb) + tcheck(t, err, "get db") + mb.Size -= int64(len(content)) + mb.Size += 1 + err = acc.DB.Update(ctxbg, &mb) + tcheck(t, err, "update mailbox size") + + // Fix up the size. + ctlcmdFixmsgsize(ctl, "") + + err = acc.DB.Get(ctxbg, &msgBadSize) + tcheck(t, err, "get message") + if msgBadSize.Size != int64(len(content)) { + t.Fatalf("after fixing, message size is %d, should be %d", msgBadSize.Size, len(content)) + } + }) + + // "reparse" testctl(func(ctl *ctl) { ctlcmdReparse(ctl, "mjl") }) diff --git a/imapserver/server.go b/imapserver/server.go index 76855ea..7569f1d 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -2731,7 +2731,7 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) { Received: tm, Flags: storeFlags, Keywords: keywords, - Size: size, + Size: size + int64(len(msgPrefix)), MsgPrefix: msgPrefix, } diff --git a/imapserver/status_test.go b/imapserver/status_test.go index 1ba4b49..f1abdcc 100644 --- a/imapserver/status_test.go +++ b/imapserver/status_test.go @@ -25,10 +25,10 @@ func TestStatus(t *testing.T) { // Again, now with a message in the mailbox. tc.transactf("ok", "append inbox {4+}\r\ntest") tc.transactf("ok", "status inbox (messages uidnext uidvalidity unseen deleted size recent appendlimit)") - tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 0, "SIZE": 4, "RECENT": 0, "APPENDLIMIT": 0}}) + tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 0, "SIZE": 6, "RECENT": 0, "APPENDLIMIT": 0}}) tc.client.Select("inbox") tc.client.StoreFlagsSet("1", true, `\Deleted`) tc.transactf("ok", "status inbox (messages uidnext uidvalidity unseen deleted size recent appendlimit)") - tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 1, "SIZE": 4, "RECENT": 0, "APPENDLIMIT": 0}}) + tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 1, "SIZE": 6, "RECENT": 0, "APPENDLIMIT": 0}}) } diff --git a/main.go b/main.go index 42786af..d7cdfe2 100644 --- a/main.go +++ b/main.go @@ -143,6 +143,7 @@ var commands = []struct { {"reassignuids", cmdReassignUIDs}, {"fixuidmeta", cmdFixUIDMeta}, {"dmarcdb addreport", cmdDMARCDBAddReport}, + {"fixmsgsize", cmdFixmsgsize}, {"reparse", cmdReparse}, {"ensureparsed", cmdEnsureParsed}, {"message parse", cmdMessageParse}, @@ -1988,6 +1989,37 @@ func cmdVersion(c *cmd) { fmt.Println(moxvar.Version) } +func cmdFixmsgsize(c *cmd) { + c.unlisted = true + c.params = "[account]" + c.help = `Ensure message sizes in the database matching the sum of the message prefix length and on-disk file size. + +Messages with an inconsistent size are also parsed again. + +If an inconsistency is found, you should probably also run "mox +bumpuidvalidity" on the mailboxes or entire account to force IMAP clients to +refetch messages. +` + args := c.Parse() + if len(args) > 1 { + c.Usage() + } + + mustLoadConfig() + var account string + if len(args) == 1 { + account = args[0] + } + ctlcmdFixmsgsize(xctl(), account) +} + +func ctlcmdFixmsgsize(ctl *ctl, account string) { + ctl.xwrite("fixmsgsize") + ctl.xwrite(account) + ctl.xreadok() + ctl.xstreamto(os.Stdout) +} + func cmdReparse(c *cmd) { c.unlisted = true c.params = "[account]" diff --git a/queue/queue.go b/queue/queue.go index 87a8f4a..c72aaa6 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -215,7 +215,7 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp err := acc.Close() log.Check(err, "closing account") }() - m := store.Message{Size: size} + m := store.Message{Size: size, MsgPrefix: msgPrefix} conf, _ := acc.Conf() dest := conf.Destinations[mailFrom.String()] acc.WithWLock(func() { diff --git a/smtpserver/server_test.go b/smtpserver/server_test.go index 5949e18..06b0ca4 100644 --- a/smtpserver/server_test.go +++ b/smtpserver/server_test.go @@ -404,6 +404,7 @@ func TestSpam(t *testing.T) { MsgFromValidated: true, MsgFromValidation: store.ValidationStrict, Flags: store.Flags{Seen: true, Junk: true}, + Size: int64(len(deliverMessage)), } for i := 0; i < 3; i++ { nm := m @@ -503,6 +504,7 @@ func TestDMARCSent(t *testing.T) { RcptToLocalpart: smtp.Localpart("mjl"), RcptToDomain: "mox.example", Flags: store.Flags{Seen: true, Junk: true}, + Size: int64(len(deliverMessage)), } for i := 0; i < 3; i++ { nm := m @@ -524,7 +526,7 @@ func TestDMARCSent(t *testing.T) { }) // Insert a message that we sent to the address that is about to send to us. - var sentMsg store.Message + sentMsg := store.Message{Size: int64(len(deliverMessage))} tinsertmsg(t, ts.acc, "Sent", &sentMsg, deliverMessage) err := ts.acc.DB.Insert(ctxbg, &store.Recipient{MessageID: sentMsg.ID, Localpart: "remote", Domain: "example.org", OrgDomain: "example.org", Sent: time.Now()}) tcheck(t, err, "inserting message recipient") diff --git a/store/account.go b/store/account.go index 582c162..6f10544 100644 --- a/store/account.go +++ b/store/account.go @@ -759,7 +759,7 @@ func initAccount(db *bstore.DB) error { // it was the last user. func (a *Account) Close() error { if CheckConsistencyOnClose { - xerr := a.checkConsistency() + xerr := a.CheckConsistency() err := closeAccount(a) if xerr != nil { panic(xerr) @@ -769,17 +769,20 @@ func (a *Account) Close() error { return closeAccount(a) } -// checkConsistency checks the consistency of the database and returns a non-nil +// CheckConsistency checks the consistency of the database and returns a non-nil // error for these cases: // +// - Missing on-disk file for message. +// - Mismatch between message size and length of MsgPrefix and on-disk file. // - Missing HaveCounts. // - Incorrect mailbox counts. // - Message with UID >= mailbox uid next. // - Mailbox uidvalidity >= account uid validity. // - ModSeq > 0, CreateSeq > 0, CreateSeq <= ModSeq. -func (a *Account) checkConsistency() error { +func (a *Account) CheckConsistency() error { var uiderrors []string // With a limit, could be many. var modseqerrors []string // With limit. + var fileerrors []string // With limit. var errors []string err := a.DB.Read(context.Background(), func(tx *bstore.Tx) error { @@ -819,6 +822,18 @@ func (a *Account) checkConsistency() error { uiderr := fmt.Sprintf("message %d in mailbox %q (id %d) has uid %d >= mailbox uidnext %d", m.ID, mb.Name, mb.ID, m.UID, mb.UIDNext) uiderrors = append(uiderrors, uiderr) } + if m.Expunged { + return nil + } + p := a.MessagePath(m.ID) + st, err := os.Stat(p) + if err != nil { + existserr := fmt.Sprintf("message %d in mailbox %q (id %d) on-disk file %s: %v", m.ID, mb.Name, mb.ID, p, err) + fileerrors = append(fileerrors, existserr) + } else if len(fileerrors) < 20 && m.Size != int64(len(m.MsgPrefix))+st.Size() { + sizeerr := fmt.Sprintf("message %d in mailbox %q (id %d) has size %d != len msgprefix %d + on-disk file size %d = %d", m.ID, mb.Name, mb.ID, m.Size, len(m.MsgPrefix), st.Size(), int64(len(m.MsgPrefix))+st.Size()) + fileerrors = append(fileerrors, sizeerr) + } return nil }) if err != nil { @@ -842,6 +857,7 @@ func (a *Account) checkConsistency() error { } errors = append(errors, uiderrors...) errors = append(errors, modseqerrors...) + errors = append(errors, fileerrors...) if len(errors) > 0 { return fmt.Errorf("%s", strings.Join(errors, "; ")) } diff --git a/verifydata.go b/verifydata.go index db4284a..b1357b9 100644 --- a/verifydata.go +++ b/verifydata.go @@ -137,9 +137,13 @@ possibly making them potentially no longer readable by the previous version. checkf(err, path, "checking database file") } - checkFile := func(path string) { - _, err := os.Stat(path) + checkFile := func(dbpath, path string, prefixSize int, size int64) { + st, err := os.Stat(path) checkf(err, path, "checking if file exists") + if err == nil && int64(prefixSize)+st.Size() != size { + filesize := st.Size() + checkf(fmt.Errorf("%s: message size is %d, should be %d (length of MsgPrefix %d + file size %d), see \"mox fixmsgsize\"", path, size, int64(prefixSize)+st.Size(), prefixSize, filesize), dbpath, "checking message size") + } } checkQueue := func() { @@ -155,7 +159,7 @@ possibly making them potentially no longer readable by the previous version. mp := store.MessagePath(m.ID) seen[mp] = struct{}{} p := filepath.Join(dataDir, "queue", mp) - checkFile(p) + checkFile(dbpath, p, len(m.MsgPrefix), m.Size) return nil }) checkf(err, dbpath, "reading messages in queue database to check files") @@ -263,7 +267,7 @@ possibly making them potentially no longer readable by the previous version. mp := store.MessagePath(m.ID) seen[mp] = struct{}{} p := filepath.Join(accdir, "msg", mp) - checkFile(p) + checkFile(dbpath, p, len(m.MsgPrefix), m.Size) return nil }) checkf(err, dbpath, "reading messages in account database to check files")