add message size consistency check

the bulk of a message is stored on disk. a message prefix is stored in the
database (for prefixed headers like "Received:"). this adds a check to ensure
Size = prefix length + on-disk file size.

verifydata also checks for this now.

and one older and one new (since yesterday) bug was found. the first when
appending a message without a header/body section (uncommon). the second when
sending messages from webmail with localserve (uncommon).
This commit is contained in:
Mechiel Lukkien 2023-08-08 22:10:53 +02:00
parent 49cf16d3f2
commit 8c3c12d96a
No known key found for this signature in database
9 changed files with 259 additions and 17 deletions

148
ctl.go
View file

@ -723,6 +723,142 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
})
w.xclose()
case "fixmsgsize":
/* protocol:
> "fixmsgsize"
> account or empty
< "ok" or error
< stream
*/
accountOpt := ctl.xread()
ctl.xwriteok()
w := ctl.writer()
var foundProblem bool
const batchSize = 10000
xfixmsgsize := func(accName string) {
acc, err := store.OpenAccount(accName)
ctl.xcheck(err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account after fixing message sizes")
}()
total := 0
var lastID int64
for {
var n int
acc.WithRLock(func() {
mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
// Don't process all message in one transaction, we could block the account for too long.
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(batchSize)
q.SortAsc("ID")
return q.ForEach(func(m store.Message) error {
lastID = m.ID
p := acc.MessagePath(m.ID)
st, err := os.Stat(p)
if err != nil {
mb := store.Mailbox{ID: m.MailboxID}
if xerr := tx.Get(&mb); xerr != nil {
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
ctl.xcheck(werr, "write")
}
_, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
ctl.xcheck(werr, "write")
return nil
}
filesize := st.Size()
correctSize := int64(len(m.MsgPrefix)) + filesize
if m.Size == correctSize {
return nil
}
foundProblem = true
mb := store.Mailbox{ID: m.MailboxID}
if err := tx.Get(&mb); err != nil {
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
ctl.xcheck(werr, "write")
}
_, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
ctl.xcheck(err, "write")
// We assume that the original message size was accounted as stored in the mailbox
// total size. If this isn't correct, the user can always run
// recalculatemailboxcounts.
mb.Size -= m.Size
mb.Size += correctSize
if err := tx.Update(&mb); err != nil {
return fmt.Errorf("update mailbox counts: %v", err)
}
mailboxCounts[mb.ID] = mb
m.Size = correctSize
mr := acc.MessageReader(m)
part, err := message.EnsurePart(mr, m.Size)
if err != nil {
_, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
ctl.xcheck(werr, "write")
}
m.ParsedBuf, err = json.Marshal(part)
if err != nil {
return fmt.Errorf("marshal parsed message: %v", err)
}
total++
n++
if err := tx.Update(&m); err != nil {
return fmt.Errorf("update message: %v", err)
}
return nil
})
})
ctl.xcheck(err, "find and fix wrong message sizes")
var changes []store.Change
for _, mb := range mailboxCounts {
changes = append(changes, mb.ChangeCounts())
}
store.BroadcastChanges(acc, changes)
})
if n < batchSize {
break
}
}
_, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
ctl.xcheck(err, "write")
}
if accountOpt != "" {
xfixmsgsize(accountOpt)
} else {
for i, accName := range mox.Conf.Accounts() {
var line string
if i > 0 {
line = "\n"
}
_, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
ctl.xcheck(err, "write")
xfixmsgsize(accName)
}
}
if foundProblem {
_, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
ctl.xcheck(err, "write")
}
w.xclose()
case "reparse":
/* protocol:
> "reparse"
@ -735,6 +871,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
ctl.xwriteok()
w := ctl.writer()
const batchSize = 100
xreparseAccount := func(accName string) {
acc, err := store.OpenAccount(accName)
ctl.xcheck(err, "open account")
@ -747,12 +885,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
var lastID int64
for {
var n int
// Batch in transactions of 100 messages, so we don't block the account too long.
// Don't process all message in one transaction, we could block the account for too long.
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(100)
q.Limit(batchSize)
q.SortAsc("ID")
return q.ForEach(func(m store.Message) error {
lastID = m.ID
@ -776,11 +914,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
})
ctl.xcheck(err, "update messages with parsed mime structure")
if n < 100 {
if n < batchSize {
break
}
}
_, err = fmt.Fprintf(w, "%d messages reparsed for account %s\n", total, accName)
_, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
ctl.xcheck(err, "write")
}
@ -792,7 +930,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
if i > 0 {
line = "\n"
}
_, err := fmt.Fprintf(w, "%sreparsing account %s\n", line, accName)
_, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
ctl.xcheck(err, "write")
xreparseAccount(accName)
}

View file

@ -156,10 +156,60 @@ func TestCtl(t *testing.T) {
ctlcmdImport(ctl, false, "mjl", "inbox", "testdata/ctl/data/tmp/export/maildir/Inbox")
})
// "recalculatemailboxcounts"
testctl(func(ctl *ctl) {
ctlcmdRecalculateMailboxCounts(ctl, "mjl")
})
// "fixmsgsize"
testctl(func(ctl *ctl) {
ctlcmdFixmsgsize(ctl, "mjl")
})
testctl(func(ctl *ctl) {
acc, err := store.OpenAccount("mjl")
tcheck(t, err, "open account")
defer acc.Close()
content := []byte("Subject: hi\r\n\r\nbody\r\n")
deliver := func(m *store.Message) {
t.Helper()
m.Size = int64(len(content))
msgf, err := store.CreateMessageTemp("ctltest")
tcheck(t, err, "create temp file")
_, err = msgf.Write(content)
tcheck(t, err, "write message file")
err = acc.DeliverMailbox(xlog, "Inbox", m, msgf, true)
tcheck(t, err, "deliver message")
err = msgf.Close()
tcheck(t, err, "close message file")
}
var msgBadSize store.Message
deliver(&msgBadSize)
msgBadSize.Size = 1
err = acc.DB.Update(ctxbg, &msgBadSize)
tcheck(t, err, "update message to bad size")
mb := store.Mailbox{ID: msgBadSize.MailboxID}
err = acc.DB.Get(ctxbg, &mb)
tcheck(t, err, "get db")
mb.Size -= int64(len(content))
mb.Size += 1
err = acc.DB.Update(ctxbg, &mb)
tcheck(t, err, "update mailbox size")
// Fix up the size.
ctlcmdFixmsgsize(ctl, "")
err = acc.DB.Get(ctxbg, &msgBadSize)
tcheck(t, err, "get message")
if msgBadSize.Size != int64(len(content)) {
t.Fatalf("after fixing, message size is %d, should be %d", msgBadSize.Size, len(content))
}
})
// "reparse"
testctl(func(ctl *ctl) {
ctlcmdReparse(ctl, "mjl")
})

View file

@ -2731,7 +2731,7 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) {
Received: tm,
Flags: storeFlags,
Keywords: keywords,
Size: size,
Size: size + int64(len(msgPrefix)),
MsgPrefix: msgPrefix,
}

View file

@ -25,10 +25,10 @@ func TestStatus(t *testing.T) {
// Again, now with a message in the mailbox.
tc.transactf("ok", "append inbox {4+}\r\ntest")
tc.transactf("ok", "status inbox (messages uidnext uidvalidity unseen deleted size recent appendlimit)")
tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 0, "SIZE": 4, "RECENT": 0, "APPENDLIMIT": 0}})
tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 0, "SIZE": 6, "RECENT": 0, "APPENDLIMIT": 0}})
tc.client.Select("inbox")
tc.client.StoreFlagsSet("1", true, `\Deleted`)
tc.transactf("ok", "status inbox (messages uidnext uidvalidity unseen deleted size recent appendlimit)")
tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 1, "SIZE": 4, "RECENT": 0, "APPENDLIMIT": 0}})
tc.xuntagged(imapclient.UntaggedStatus{Mailbox: "Inbox", Attrs: map[string]int64{"MESSAGES": 1, "UIDVALIDITY": 1, "UIDNEXT": 2, "UNSEEN": 1, "DELETED": 1, "SIZE": 6, "RECENT": 0, "APPENDLIMIT": 0}})
}

32
main.go
View file

@ -143,6 +143,7 @@ var commands = []struct {
{"reassignuids", cmdReassignUIDs},
{"fixuidmeta", cmdFixUIDMeta},
{"dmarcdb addreport", cmdDMARCDBAddReport},
{"fixmsgsize", cmdFixmsgsize},
{"reparse", cmdReparse},
{"ensureparsed", cmdEnsureParsed},
{"message parse", cmdMessageParse},
@ -1988,6 +1989,37 @@ func cmdVersion(c *cmd) {
fmt.Println(moxvar.Version)
}
func cmdFixmsgsize(c *cmd) {
c.unlisted = true
c.params = "[account]"
c.help = `Ensure message sizes in the database matching the sum of the message prefix length and on-disk file size.
Messages with an inconsistent size are also parsed again.
If an inconsistency is found, you should probably also run "mox
bumpuidvalidity" on the mailboxes or entire account to force IMAP clients to
refetch messages.
`
args := c.Parse()
if len(args) > 1 {
c.Usage()
}
mustLoadConfig()
var account string
if len(args) == 1 {
account = args[0]
}
ctlcmdFixmsgsize(xctl(), account)
}
func ctlcmdFixmsgsize(ctl *ctl, account string) {
ctl.xwrite("fixmsgsize")
ctl.xwrite(account)
ctl.xreadok()
ctl.xstreamto(os.Stdout)
}
func cmdReparse(c *cmd) {
c.unlisted = true
c.params = "[account]"

View file

@ -215,7 +215,7 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
err := acc.Close()
log.Check(err, "closing account")
}()
m := store.Message{Size: size}
m := store.Message{Size: size, MsgPrefix: msgPrefix}
conf, _ := acc.Conf()
dest := conf.Destinations[mailFrom.String()]
acc.WithWLock(func() {

View file

@ -404,6 +404,7 @@ func TestSpam(t *testing.T) {
MsgFromValidated: true,
MsgFromValidation: store.ValidationStrict,
Flags: store.Flags{Seen: true, Junk: true},
Size: int64(len(deliverMessage)),
}
for i := 0; i < 3; i++ {
nm := m
@ -503,6 +504,7 @@ func TestDMARCSent(t *testing.T) {
RcptToLocalpart: smtp.Localpart("mjl"),
RcptToDomain: "mox.example",
Flags: store.Flags{Seen: true, Junk: true},
Size: int64(len(deliverMessage)),
}
for i := 0; i < 3; i++ {
nm := m
@ -524,7 +526,7 @@ func TestDMARCSent(t *testing.T) {
})
// Insert a message that we sent to the address that is about to send to us.
var sentMsg store.Message
sentMsg := store.Message{Size: int64(len(deliverMessage))}
tinsertmsg(t, ts.acc, "Sent", &sentMsg, deliverMessage)
err := ts.acc.DB.Insert(ctxbg, &store.Recipient{MessageID: sentMsg.ID, Localpart: "remote", Domain: "example.org", OrgDomain: "example.org", Sent: time.Now()})
tcheck(t, err, "inserting message recipient")

View file

@ -759,7 +759,7 @@ func initAccount(db *bstore.DB) error {
// it was the last user.
func (a *Account) Close() error {
if CheckConsistencyOnClose {
xerr := a.checkConsistency()
xerr := a.CheckConsistency()
err := closeAccount(a)
if xerr != nil {
panic(xerr)
@ -769,17 +769,20 @@ func (a *Account) Close() error {
return closeAccount(a)
}
// checkConsistency checks the consistency of the database and returns a non-nil
// CheckConsistency checks the consistency of the database and returns a non-nil
// error for these cases:
//
// - Missing on-disk file for message.
// - Mismatch between message size and length of MsgPrefix and on-disk file.
// - Missing HaveCounts.
// - Incorrect mailbox counts.
// - Message with UID >= mailbox uid next.
// - Mailbox uidvalidity >= account uid validity.
// - ModSeq > 0, CreateSeq > 0, CreateSeq <= ModSeq.
func (a *Account) checkConsistency() error {
func (a *Account) CheckConsistency() error {
var uiderrors []string // With a limit, could be many.
var modseqerrors []string // With limit.
var fileerrors []string // With limit.
var errors []string
err := a.DB.Read(context.Background(), func(tx *bstore.Tx) error {
@ -819,6 +822,18 @@ func (a *Account) checkConsistency() error {
uiderr := fmt.Sprintf("message %d in mailbox %q (id %d) has uid %d >= mailbox uidnext %d", m.ID, mb.Name, mb.ID, m.UID, mb.UIDNext)
uiderrors = append(uiderrors, uiderr)
}
if m.Expunged {
return nil
}
p := a.MessagePath(m.ID)
st, err := os.Stat(p)
if err != nil {
existserr := fmt.Sprintf("message %d in mailbox %q (id %d) on-disk file %s: %v", m.ID, mb.Name, mb.ID, p, err)
fileerrors = append(fileerrors, existserr)
} else if len(fileerrors) < 20 && m.Size != int64(len(m.MsgPrefix))+st.Size() {
sizeerr := fmt.Sprintf("message %d in mailbox %q (id %d) has size %d != len msgprefix %d + on-disk file size %d = %d", m.ID, mb.Name, mb.ID, m.Size, len(m.MsgPrefix), st.Size(), int64(len(m.MsgPrefix))+st.Size())
fileerrors = append(fileerrors, sizeerr)
}
return nil
})
if err != nil {
@ -842,6 +857,7 @@ func (a *Account) checkConsistency() error {
}
errors = append(errors, uiderrors...)
errors = append(errors, modseqerrors...)
errors = append(errors, fileerrors...)
if len(errors) > 0 {
return fmt.Errorf("%s", strings.Join(errors, "; "))
}

View file

@ -137,9 +137,13 @@ possibly making them potentially no longer readable by the previous version.
checkf(err, path, "checking database file")
}
checkFile := func(path string) {
_, err := os.Stat(path)
checkFile := func(dbpath, path string, prefixSize int, size int64) {
st, err := os.Stat(path)
checkf(err, path, "checking if file exists")
if err == nil && int64(prefixSize)+st.Size() != size {
filesize := st.Size()
checkf(fmt.Errorf("%s: message size is %d, should be %d (length of MsgPrefix %d + file size %d), see \"mox fixmsgsize\"", path, size, int64(prefixSize)+st.Size(), prefixSize, filesize), dbpath, "checking message size")
}
}
checkQueue := func() {
@ -155,7 +159,7 @@ possibly making them potentially no longer readable by the previous version.
mp := store.MessagePath(m.ID)
seen[mp] = struct{}{}
p := filepath.Join(dataDir, "queue", mp)
checkFile(p)
checkFile(dbpath, p, len(m.MsgPrefix), m.Size)
return nil
})
checkf(err, dbpath, "reading messages in queue database to check files")
@ -263,7 +267,7 @@ possibly making them potentially no longer readable by the previous version.
mp := store.MessagePath(m.ID)
seen[mp] = struct{}{}
p := filepath.Join(accdir, "msg", mp)
checkFile(p)
checkFile(dbpath, p, len(m.MsgPrefix), m.Size)
return nil
})
checkf(err, dbpath, "reading messages in account database to check files")