mirror of
https://github.com/mjl-/mox.git
synced 2024-12-26 00:13:47 +03:00
in store/, change functions from calling panic to returning errors
this is a library package, errors should be explicit. callers had to be careful when calling these "X" functions. now it's explicit.
This commit is contained in:
parent
936a0d5afe
commit
08eb1a5472
9 changed files with 173 additions and 165 deletions
|
@ -467,11 +467,6 @@ func importMessages(ctx context.Context, log *mlog.Log, token string, acc *store
|
|||
err = f.Close()
|
||||
log.Check(err, "closing temporary message file for delivery")
|
||||
}
|
||||
x := recover()
|
||||
if x != nil {
|
||||
// todo: get a variant of DeliverX that returns an error instead of panicking.
|
||||
log.Error("delivery panic", mlog.Field("err", x))
|
||||
}
|
||||
}()
|
||||
m.MailboxID = mb.ID
|
||||
m.MailboxOrigID = mb.ID
|
||||
|
@ -503,7 +498,10 @@ func importMessages(ctx context.Context, log *mlog.Log, token string, acc *store
|
|||
const consumeFile = true
|
||||
const sync = false
|
||||
const notrain = true
|
||||
acc.DeliverX(log, tx, m, f, consumeFile, mb.Sent, sync, notrain) // todo: need a deliver that returns an error.
|
||||
if err := acc.DeliverMessage(log, tx, m, f, consumeFile, mb.Sent, sync, notrain); err != nil {
|
||||
problemf("delivering message %s: %s (continuing)", pos, err)
|
||||
return
|
||||
}
|
||||
deliveredIDs = append(deliveredIDs, m.ID)
|
||||
changes = append(changes, store.ChangeAddUID{MailboxID: m.MailboxID, UID: m.UID, Flags: m.Flags})
|
||||
messages[mb.Name]++
|
||||
|
|
|
@ -654,10 +654,10 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x
|
|||
x := recover()
|
||||
if x == nil || x == cleanClose {
|
||||
c.log.Info("connection closed")
|
||||
} else if err, ok := x.(error); ok || isClosed(err) {
|
||||
} else if err, ok := x.(error); ok && isClosed(err) {
|
||||
c.log.Infox("connection closed", err)
|
||||
} else {
|
||||
c.log.Error("unhandled error", mlog.Field("err", x))
|
||||
c.log.Error("unhandled panic", mlog.Field("err", x))
|
||||
debug.PrintStack()
|
||||
metrics.PanicInc("imapserver")
|
||||
}
|
||||
|
@ -739,6 +739,9 @@ func (c *conn) command() {
|
|||
panic(x)
|
||||
}
|
||||
|
||||
var sxerr syntaxError
|
||||
var uerr userError
|
||||
var serr serverError
|
||||
if isClosed(err) {
|
||||
c.log.Infox("imap command ioerror", err, logFields...)
|
||||
result = "ioerror"
|
||||
|
@ -746,12 +749,7 @@ func (c *conn) command() {
|
|||
debug.PrintStack()
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var sxerr syntaxError
|
||||
var uerr userError
|
||||
var serr serverError
|
||||
if errors.As(err, &sxerr) {
|
||||
} else if errors.As(err, &sxerr) {
|
||||
result = "badsyntax"
|
||||
if c.ncmds == 0 {
|
||||
// Other side is likely speaking something else than IMAP, send error message and
|
||||
|
@ -793,11 +791,10 @@ func (c *conn) command() {
|
|||
c.bwriteresultf("%s NO %s %v", tag, cmd, err)
|
||||
}
|
||||
} else {
|
||||
result = "error"
|
||||
c.log.Infox("imap command error", err, logFields...)
|
||||
// todo: introduce a store.Error, and check for that, don't blindly pass on errors?
|
||||
debug.PrintStack()
|
||||
c.bwriteresultf("%s NO %s %v", tag, cmd, err)
|
||||
// Other type of panic, we pass it on, aborting the connection.
|
||||
result = "panic"
|
||||
c.log.Errorx("imap command panic", err, logFields...)
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1148,7 +1145,8 @@ func xcheckmailboxname(name string, allowInbox bool) string {
|
|||
// If the mailbox does not exist, panic is called with a user error.
|
||||
// Must be called with account rlock held.
|
||||
func (c *conn) xmailbox(tx *bstore.Tx, name string, missingErrCode string) store.Mailbox {
|
||||
mb := c.account.MailboxFindX(tx, name)
|
||||
mb, err := c.account.MailboxFind(tx, name)
|
||||
xcheckf(err, "finding mailbox")
|
||||
if mb == nil {
|
||||
// missingErrCode can be empty, or e.g. TRYCREATE or ALREADYEXISTS.
|
||||
xusercodeErrorf(missingErrCode, "%w", store.ErrUnknownMailbox)
|
||||
|
@ -1909,14 +1907,17 @@ func (c *conn) cmdCreate(tag, cmd string, p *parser) {
|
|||
p += "/"
|
||||
}
|
||||
p += elem
|
||||
if c.account.MailboxExistsX(tx, p) {
|
||||
exists, err := c.account.MailboxExists(tx, p)
|
||||
xcheckf(err, "checking if mailbox exists")
|
||||
if exists {
|
||||
if i == len(elems)-1 {
|
||||
// ../rfc/9051:1914
|
||||
xuserErrorf("mailbox already exists")
|
||||
}
|
||||
continue
|
||||
}
|
||||
_, nchanges := c.account.MailboxEnsureX(tx, p, true)
|
||||
_, nchanges, err := c.account.MailboxEnsure(tx, p, true)
|
||||
xcheckf(err, "ensuring mailbox exists")
|
||||
changes = append(changes, nchanges...)
|
||||
created = append(created, p)
|
||||
}
|
||||
|
@ -2050,10 +2051,13 @@ func (c *conn) cmdRename(tag, cmd string, p *parser) {
|
|||
// We do indeed create a new destination mailbox and actually move the messages.
|
||||
// ../rfc/9051:2101
|
||||
if src == "Inbox" {
|
||||
if c.account.MailboxExistsX(tx, dst) {
|
||||
exists, err := c.account.MailboxExists(tx, dst)
|
||||
xcheckf(err, "checking if destination mailbox exists")
|
||||
if exists {
|
||||
xusercodeErrorf("ALREADYEXISTS", "destination mailbox %q already exists", dst)
|
||||
}
|
||||
srcMB := c.account.MailboxFindX(tx, src)
|
||||
srcMB, err := c.account.MailboxFind(tx, src)
|
||||
xcheckf(err, "finding source mailbox")
|
||||
if srcMB == nil {
|
||||
xserverErrorf("inbox not found")
|
||||
}
|
||||
|
@ -2066,7 +2070,7 @@ func (c *conn) cmdRename(tag, cmd string, p *parser) {
|
|||
UIDValidity: uidval,
|
||||
UIDNext: 1,
|
||||
}
|
||||
err := tx.Insert(&dstMB)
|
||||
err = tx.Insert(&dstMB)
|
||||
xcheckf(err, "create new destination mailbox")
|
||||
|
||||
var messages []store.Message
|
||||
|
@ -2207,7 +2211,9 @@ func (c *conn) cmdSubscribe(tag, cmd string, p *parser) {
|
|||
var changes []store.Change
|
||||
|
||||
c.xdbwrite(func(tx *bstore.Tx) {
|
||||
changes = c.account.SubscriptionEnsureX(tx, name)
|
||||
var err error
|
||||
changes, err = c.account.SubscriptionEnsure(tx, name)
|
||||
xcheckf(err, "ensuring subscription")
|
||||
})
|
||||
|
||||
c.broadcast(changes)
|
||||
|
@ -2235,7 +2241,9 @@ func (c *conn) cmdUnsubscribe(tag, cmd string, p *parser) {
|
|||
// It's OK if not currently subscribed, ../rfc/9051:2215
|
||||
err := tx.Delete(&store.Subscription{Name: name})
|
||||
if err == bstore.ErrAbsent {
|
||||
if !c.account.MailboxExistsX(tx, name) {
|
||||
exists, err := c.account.MailboxExists(tx, name)
|
||||
xcheckf(err, "checking if mailbox exists")
|
||||
if !exists {
|
||||
xuserErrorf("mailbox does not exist")
|
||||
}
|
||||
return
|
||||
|
@ -2561,7 +2569,8 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) {
|
|||
MsgPrefix: msgPrefix,
|
||||
}
|
||||
isSent := name == "Sent"
|
||||
c.account.DeliverX(c.log, tx, &msg, msgFile, true, isSent, true, false)
|
||||
err := c.account.DeliverMessage(c.log, tx, &msg, msgFile, true, isSent, true, false)
|
||||
xcheckf(err, "delivering message")
|
||||
})
|
||||
|
||||
// Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
|
||||
|
|
|
@ -192,7 +192,6 @@ func importctl(ctl *ctl, mbox bool) {
|
|||
// We will be delivering messages. If we fail halfway, we need to remove the created msg files.
|
||||
var deliveredIDs []int64
|
||||
|
||||
// Handle errors from store.*X calls.
|
||||
defer func() {
|
||||
x := recover()
|
||||
if x == nil {
|
||||
|
@ -225,7 +224,8 @@ func importctl(ctl *ctl, mbox bool) {
|
|||
isSent := mailbox == "Sent"
|
||||
const sync = false
|
||||
const notrain = true
|
||||
a.DeliverX(ctl.log, tx, m, mf, consumeFile, isSent, sync, notrain)
|
||||
err := a.DeliverMessage(ctl.log, tx, m, mf, consumeFile, isSent, sync, notrain)
|
||||
ctl.xcheck(err, "delivering message")
|
||||
deliveredIDs = append(deliveredIDs, m.ID)
|
||||
ctl.log.Debug("delivered message", mlog.Field("id", m.ID))
|
||||
changes = append(changes, store.ChangeAddUID{MailboxID: m.MailboxID, UID: m.UID, Flags: m.Flags})
|
||||
|
@ -236,7 +236,8 @@ func importctl(ctl *ctl, mbox bool) {
|
|||
a.WithWLock(func() {
|
||||
// Ensure mailbox exists.
|
||||
var mb store.Mailbox
|
||||
mb, changes = a.MailboxEnsureX(tx, mailbox, true)
|
||||
mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
|
||||
ctl.xcheck(err, "ensuring mailbox exists")
|
||||
|
||||
jf, _, err := a.OpenJunkFilter(ctl.log)
|
||||
if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
|
||||
|
|
|
@ -2,6 +2,7 @@ package smtpserver
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
@ -172,7 +173,10 @@ func analyze(ctx context.Context, log *mlog.Log, resolver dns.Resolver, d delive
|
|||
if rs != nil {
|
||||
mailbox = rs.Mailbox
|
||||
}
|
||||
mb := d.acc.MailboxFindX(tx, mailbox)
|
||||
mb, err := d.acc.MailboxFind(tx, mailbox)
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding destination mailbox: %w", err)
|
||||
}
|
||||
if mb != nil {
|
||||
// We want to deliver to mb.ID, but this message may be rejected and sent to the
|
||||
// Rejects mailbox instead, which MailboxID overwritten. Record the ID in
|
||||
|
@ -187,7 +191,6 @@ func analyze(ctx context.Context, log *mlog.Log, resolver dns.Resolver, d delive
|
|||
log.Debug("mailbox not found in database", mlog.Field("mailbox", mailbox))
|
||||
}
|
||||
|
||||
var err error
|
||||
isjunk, conclusive, method, err = reputation(tx, log, d.m)
|
||||
reason = string(method)
|
||||
return err
|
||||
|
|
|
@ -577,7 +577,7 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C
|
|||
} else if err, ok := x.(error); ok && isClosed(err) {
|
||||
c.log.Infox("connection closed", err)
|
||||
} else {
|
||||
c.log.Error("unhandled error", mlog.Field("err", x))
|
||||
c.log.Error("unhandled panic", mlog.Field("err", x))
|
||||
debug.PrintStack()
|
||||
metrics.PanicInc("smtpserver")
|
||||
}
|
||||
|
@ -679,7 +679,7 @@ func command(c *conn) {
|
|||
} else {
|
||||
// Other type of panic, we pass it on, aborting the connection.
|
||||
c.log.Errorx("command panic", err)
|
||||
panic(x)
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
195
store/account.go
195
store/account.go
|
@ -20,7 +20,6 @@ database.
|
|||
package store
|
||||
|
||||
// todo: make up a function naming scheme that indicates whether caller should broadcast changes.
|
||||
// todo: fewer (no?) "X" functions, but only explicit error handling.
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -394,13 +393,6 @@ type Account struct {
|
|||
nused int // Reference count, while >0, this account is alive and shared.
|
||||
}
|
||||
|
||||
func xcheckf(err error, format string, args ...any) {
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
panic(fmt.Errorf("%s: %w", msg, err))
|
||||
}
|
||||
}
|
||||
|
||||
// InitialUIDValidity returns a UIDValidity used for initializing an account.
|
||||
// It can be replaced during tests with a predictable value.
|
||||
var InitialUIDValidity = func() uint32 {
|
||||
|
@ -576,7 +568,7 @@ func (a *Account) WithRLock(fn func()) {
|
|||
fn()
|
||||
}
|
||||
|
||||
// DeliverX delivers a mail message to the account.
|
||||
// DeliverMessage delivers a mail message to the account.
|
||||
//
|
||||
// If consumeFile is set, the original msgFile is moved/renamed or copied and
|
||||
// removed as part of delivery.
|
||||
|
@ -594,14 +586,16 @@ func (a *Account) WithRLock(fn func()) {
|
|||
// Must be called with account rlock or wlock.
|
||||
//
|
||||
// Caller must broadcast new message.
|
||||
func (a *Account) DeliverX(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, consumeFile, isSent, sync, notrain bool) {
|
||||
func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, consumeFile, isSent, sync, notrain bool) error {
|
||||
mb := Mailbox{ID: m.MailboxID}
|
||||
err := tx.Get(&mb)
|
||||
xcheckf(err, "get mailbox")
|
||||
if err := tx.Get(&mb); err != nil {
|
||||
return fmt.Errorf("get mailbox: %w", err)
|
||||
}
|
||||
m.UID = mb.UIDNext
|
||||
mb.UIDNext++
|
||||
err = tx.Update(&mb)
|
||||
xcheckf(err, "updating mailbox nextuid")
|
||||
if err := tx.Update(&mb); err != nil {
|
||||
return fmt.Errorf("updating mailbox nextuid: %w", err)
|
||||
}
|
||||
|
||||
conf, _ := a.Conf()
|
||||
m.JunkFlagsForMailbox(mb.Name, conf)
|
||||
|
@ -616,7 +610,9 @@ func (a *Account) DeliverX(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os
|
|||
}
|
||||
part = &p
|
||||
buf, err := json.Marshal(part)
|
||||
xcheckf(err, "marshal parsed message")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal parsed message: %w", err)
|
||||
}
|
||||
m.ParsedBuf = buf
|
||||
}
|
||||
|
||||
|
@ -625,8 +621,9 @@ func (a *Account) DeliverX(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os
|
|||
m.MailboxDestinedID = 0
|
||||
}
|
||||
|
||||
err = tx.Insert(m)
|
||||
xcheckf(err, "inserting message")
|
||||
if err := tx.Insert(m); err != nil {
|
||||
return fmt.Errorf("inserting message: %w", err)
|
||||
}
|
||||
|
||||
if isSent {
|
||||
// Attempt to parse the message for its To/Cc/Bcc headers, which we insert into Recipient.
|
||||
|
@ -666,8 +663,9 @@ func (a *Account) DeliverX(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os
|
|||
OrgDomain: publicsuffix.Lookup(context.TODO(), d).Name(),
|
||||
Sent: sent,
|
||||
}
|
||||
err = tx.Insert(&mr)
|
||||
xcheckf(err, "inserting sent message recipients")
|
||||
if err := tx.Insert(&mr); err != nil {
|
||||
return fmt.Errorf("inserting sent message recipients: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -678,30 +676,37 @@ func (a *Account) DeliverX(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os
|
|||
|
||||
// Sync file data to disk.
|
||||
if sync {
|
||||
err = msgFile.Sync()
|
||||
xcheckf(err, "fsync message file")
|
||||
if err := msgFile.Sync(); err != nil {
|
||||
return fmt.Errorf("fsync message file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if consumeFile {
|
||||
err := os.Rename(msgFile.Name(), msgPath)
|
||||
xcheckf(err, "moving msg file to destination directory")
|
||||
if err := os.Rename(msgFile.Name(), msgPath); err != nil {
|
||||
return fmt.Errorf("moving msg file to destination directory: %w", err)
|
||||
}
|
||||
} else if err := os.Link(msgFile.Name(), msgPath); err != nil {
|
||||
// Assume file system does not support hardlinks. Copy it instead.
|
||||
err := writeFile(msgPath, &moxio.AtReader{R: msgFile})
|
||||
xcheckf(err, "copying message to new file")
|
||||
if err := writeFile(msgPath, &moxio.AtReader{R: msgFile}); err != nil {
|
||||
return fmt.Errorf("copying message to new file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sync {
|
||||
err = moxio.SyncDir(msgDir)
|
||||
xcheckf(err, "sync directory")
|
||||
if err := moxio.SyncDir(msgDir); err != nil {
|
||||
return fmt.Errorf("sync directory: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !notrain && m.NeedsTraining() {
|
||||
l := []Message{*m}
|
||||
err = a.RetrainMessages(log, tx, l, false)
|
||||
xcheckf(err, "training junkfilter")
|
||||
if err := a.RetrainMessages(log, tx, l, false); err != nil {
|
||||
return fmt.Errorf("training junkfilter: %w", err)
|
||||
}
|
||||
*m = l[0]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// write contents of r to new file dst, for delivering a message.
|
||||
|
@ -877,65 +882,50 @@ func (a *Account) MailboxEnsure(tx *bstore.Tx, name string, subscribe bool) (mb
|
|||
return mb, changes, nil
|
||||
}
|
||||
|
||||
// MailboxEnsureX calls MailboxEnsure, panicing with the error if it is not nil.
|
||||
func (a *Account) MailboxEnsureX(tx *bstore.Tx, name string, subscribe bool) (Mailbox, []Change) {
|
||||
mb, changes, err := a.MailboxEnsure(tx, name, subscribe)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return mb, changes
|
||||
}
|
||||
|
||||
// Check if mailbox exists.
|
||||
// MailboxExists checks if mailbox exists.
|
||||
// Caller must hold account rlock.
|
||||
func (a *Account) MailboxExistsX(tx *bstore.Tx, name string) bool {
|
||||
func (a *Account) MailboxExists(tx *bstore.Tx, name string) (bool, error) {
|
||||
q := bstore.QueryTx[Mailbox](tx)
|
||||
q.FilterEqual("Name", name)
|
||||
exists, err := q.Exists()
|
||||
xcheckf(err, "checking existence")
|
||||
return exists
|
||||
return q.Exists()
|
||||
}
|
||||
|
||||
// MailboxFindX finds a mailbox by name.
|
||||
func (a *Account) MailboxFindX(tx *bstore.Tx, name string) *Mailbox {
|
||||
// MailboxFind finds a mailbox by name, returning a nil mailbox and nil error if mailbox does not exist.
|
||||
func (a *Account) MailboxFind(tx *bstore.Tx, name string) (*Mailbox, error) {
|
||||
q := bstore.QueryTx[Mailbox](tx)
|
||||
q.FilterEqual("Name", name)
|
||||
mb, err := q.Get()
|
||||
if err == bstore.ErrAbsent {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
xcheckf(err, "lookup mailbox")
|
||||
return &mb
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("looking up mailbox: %w", err)
|
||||
}
|
||||
return &mb, nil
|
||||
}
|
||||
|
||||
// SubscriptionEnsureX ensures a subscription for name exists. The mailbox does not
|
||||
// SubscriptionEnsure ensures a subscription for name exists. The mailbox does not
|
||||
// have to exist. Any parents are not automatically subscribed.
|
||||
// Changes are broadcasted.
|
||||
func (a *Account) SubscriptionEnsureX(tx *bstore.Tx, name string) []Change {
|
||||
err := tx.Get(&Subscription{name})
|
||||
if err == nil {
|
||||
return nil
|
||||
// Changes are returned and must be broadcasted by the caller.
|
||||
func (a *Account) SubscriptionEnsure(tx *bstore.Tx, name string) ([]Change, error) {
|
||||
if err := tx.Get(&Subscription{name}); err == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
err = tx.Insert(&Subscription{name})
|
||||
xcheckf(err, "inserting subscription")
|
||||
if err := tx.Insert(&Subscription{name}); err != nil {
|
||||
return nil, fmt.Errorf("inserting subscription: %w", err)
|
||||
}
|
||||
|
||||
q := bstore.QueryTx[Mailbox](tx)
|
||||
q.FilterEqual("Name", name)
|
||||
exists, err := q.Exists()
|
||||
xcheckf(err, "looking up mailbox for subscription")
|
||||
if exists {
|
||||
return []Change{ChangeAddSubscription{name}}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("looking up mailbox for subscription: %w", err)
|
||||
}
|
||||
return []Change{ChangeAddMailbox{Name: name, Flags: []string{`\Subscribed`, `\NonExistent`}}}
|
||||
}
|
||||
|
||||
// List mailboxes. Only those that exist, so names with only a subscription are not returned.
|
||||
// Caller must have account rlock held.
|
||||
func (a *Account) MailboxesX(tx *bstore.Tx) []Mailbox {
|
||||
l, err := bstore.QueryTx[Mailbox](tx).List()
|
||||
xcheckf(err, "fetching mailboxes")
|
||||
return l
|
||||
if exists {
|
||||
return []Change{ChangeAddSubscription{name}}, nil
|
||||
}
|
||||
return []Change{ChangeAddMailbox{Name: name, Flags: []string{`\Subscribed`, `\NonExistent`}}}, nil
|
||||
}
|
||||
|
||||
// MessageRuleset returns the first ruleset (if any) that message the message
|
||||
|
@ -1046,14 +1036,16 @@ func (a *Account) Deliver(log *mlog.Log, dest config.Destination, m *Message, ms
|
|||
// Message delivery and possible mailbox creation are broadcasted.
|
||||
func (a *Account) DeliverMailbox(log *mlog.Log, mailbox string, m *Message, msgFile *os.File, consumeFile bool) error {
|
||||
var changes []Change
|
||||
err := extransact(a.DB, true, func(tx *bstore.Tx) error {
|
||||
mb, chl := a.MailboxEnsureX(tx, mailbox, true)
|
||||
err := a.DB.Write(func(tx *bstore.Tx) error {
|
||||
mb, chl, err := a.MailboxEnsure(tx, mailbox, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensuring mailbox: %w", err)
|
||||
}
|
||||
m.MailboxID = mb.ID
|
||||
m.MailboxOrigID = mb.ID
|
||||
changes = append(changes, chl...)
|
||||
|
||||
a.DeliverX(log, tx, m, msgFile, consumeFile, mb.Sent, true, false)
|
||||
return nil
|
||||
return a.DeliverMessage(log, tx, m, msgFile, consumeFile, mb.Sent, true, false)
|
||||
})
|
||||
// todo: if rename succeeded but transaction failed, we should remove the file.
|
||||
if err != nil {
|
||||
|
@ -1083,8 +1075,11 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS
|
|||
}
|
||||
}()
|
||||
|
||||
err := extransact(a.DB, true, func(tx *bstore.Tx) error {
|
||||
mb := a.MailboxFindX(tx, rejectsMailbox)
|
||||
err := a.DB.Write(func(tx *bstore.Tx) error {
|
||||
mb, err := a.MailboxFind(tx, rejectsMailbox)
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding mailbox: %w", err)
|
||||
}
|
||||
if mb == nil {
|
||||
// No messages have been delivered yet.
|
||||
hasSpace = true
|
||||
|
@ -1096,18 +1091,24 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS
|
|||
qdel := bstore.QueryTx[Message](tx)
|
||||
qdel.FilterNonzero(Message{MailboxID: mb.ID})
|
||||
qdel.FilterLess("Received", old)
|
||||
var err error
|
||||
remove, err = qdel.List()
|
||||
xcheckf(err, "listing old messages")
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing old messages: %w", err)
|
||||
}
|
||||
|
||||
changes = a.xremoveMessages(log, tx, mb, remove)
|
||||
changes, err = a.removeMessages(log, tx, mb, remove)
|
||||
if err != nil {
|
||||
return fmt.Errorf("removing messages: %w", err)
|
||||
}
|
||||
|
||||
// We allow up to n messages.
|
||||
qcount := bstore.QueryTx[Message](tx)
|
||||
qcount.FilterNonzero(Message{MailboxID: mb.ID})
|
||||
qcount.Limit(1000)
|
||||
n, err := qcount.Count()
|
||||
xcheckf(err, "counting rejects")
|
||||
if err != nil {
|
||||
return fmt.Errorf("counting rejects: %w", err)
|
||||
}
|
||||
hasSpace = n < 1000
|
||||
|
||||
return nil
|
||||
|
@ -1124,9 +1125,9 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS
|
|||
return hasSpace, nil
|
||||
}
|
||||
|
||||
func (a *Account) xremoveMessages(log *mlog.Log, tx *bstore.Tx, mb *Mailbox, l []Message) []Change {
|
||||
func (a *Account) removeMessages(log *mlog.Log, tx *bstore.Tx, mb *Mailbox, l []Message) ([]Change, error) {
|
||||
if len(l) == 0 {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
ids := make([]int64, len(l))
|
||||
anyids := make([]any, len(l))
|
||||
|
@ -1139,30 +1140,33 @@ func (a *Account) xremoveMessages(log *mlog.Log, tx *bstore.Tx, mb *Mailbox, l [
|
|||
// from a Sent mailbox to the rejects mailbox...
|
||||
qdmr := bstore.QueryTx[Recipient](tx)
|
||||
qdmr.FilterEqual("MessageID", anyids...)
|
||||
_, err := qdmr.Delete()
|
||||
xcheckf(err, "deleting from message recipient")
|
||||
if _, err := qdmr.Delete(); err != nil {
|
||||
return nil, fmt.Errorf("deleting from message recipient: %w", err)
|
||||
}
|
||||
|
||||
// Actually remove the messages.
|
||||
qdm := bstore.QueryTx[Message](tx)
|
||||
qdm.FilterIDs(ids)
|
||||
var deleted []Message
|
||||
qdm.Gather(&deleted)
|
||||
_, err = qdm.Delete()
|
||||
xcheckf(err, "deleting from messages")
|
||||
if _, err := qdm.Delete(); err != nil {
|
||||
return nil, fmt.Errorf("deleting from messages: %w", err)
|
||||
}
|
||||
|
||||
// Mark as neutral and train so junk filter gets untrained with these (junk) messages.
|
||||
for i := range deleted {
|
||||
deleted[i].Junk = false
|
||||
deleted[i].Notjunk = false
|
||||
}
|
||||
err = a.RetrainMessages(log, tx, deleted, true)
|
||||
xcheckf(err, "training deleted messages")
|
||||
if err := a.RetrainMessages(log, tx, deleted, true); err != nil {
|
||||
return nil, fmt.Errorf("training deleted messages: %w", err)
|
||||
}
|
||||
|
||||
changes := make([]Change, len(l))
|
||||
for i, m := range l {
|
||||
changes[i] = ChangeRemoveUIDs{mb.ID, []UID{m.UID}}
|
||||
}
|
||||
return changes
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
// RejectsRemove removes a message from the rejects mailbox if present.
|
||||
|
@ -1180,19 +1184,26 @@ func (a *Account) RejectsRemove(log *mlog.Log, rejectsMailbox, messageID string)
|
|||
}
|
||||
}()
|
||||
|
||||
err := extransact(a.DB, true, func(tx *bstore.Tx) error {
|
||||
mb := a.MailboxFindX(tx, rejectsMailbox)
|
||||
err := a.DB.Write(func(tx *bstore.Tx) error {
|
||||
mb, err := a.MailboxFind(tx, rejectsMailbox)
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding mailbox: %w", err)
|
||||
}
|
||||
if mb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
q := bstore.QueryTx[Message](tx)
|
||||
q.FilterNonzero(Message{MailboxID: mb.ID, MessageID: messageID})
|
||||
var err error
|
||||
remove, err = q.List()
|
||||
xcheckf(err, "listing messages to remove")
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing messages to remove: %w", err)
|
||||
}
|
||||
|
||||
changes = a.xremoveMessages(log, tx, mb, remove)
|
||||
changes, err = a.removeMessages(log, tx, mb, remove)
|
||||
if err != nil {
|
||||
return fmt.Errorf("removing messages: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -72,13 +72,15 @@ func TestMailbox(t *testing.T) {
|
|||
tcheck(t, err, "sent mailbox")
|
||||
msent.MailboxID = mbsent.ID
|
||||
msent.MailboxOrigID = mbsent.ID
|
||||
acc.DeliverX(xlog, tx, &msent, msgFile, false, true, true, false)
|
||||
err = acc.DeliverMessage(xlog, tx, &msent, msgFile, false, true, true, false)
|
||||
tcheck(t, err, "deliver message")
|
||||
|
||||
err = tx.Insert(&mbrejects)
|
||||
tcheck(t, err, "insert rejects mailbox")
|
||||
mreject.MailboxID = mbrejects.ID
|
||||
mreject.MailboxOrigID = mbrejects.ID
|
||||
acc.DeliverX(xlog, tx, &mreject, msgFile, false, false, true, false)
|
||||
err = acc.DeliverMessage(xlog, tx, &mreject, msgFile, false, false, true, false)
|
||||
tcheck(t, err, "deliver message")
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -133,44 +135,50 @@ func TestMailbox(t *testing.T) {
|
|||
|
||||
acc.WithWLock(func() {
|
||||
err := acc.DB.Write(func(tx *bstore.Tx) error {
|
||||
acc.MailboxEnsureX(tx, "Testbox", true)
|
||||
return nil
|
||||
_, _, err := acc.MailboxEnsure(tx, "Testbox", true)
|
||||
return err
|
||||
})
|
||||
tcheck(t, err, "ensure mailbox exists")
|
||||
err = acc.DB.Read(func(tx *bstore.Tx) error {
|
||||
acc.MailboxEnsureX(tx, "Testbox", true)
|
||||
return nil
|
||||
_, _, err := acc.MailboxEnsure(tx, "Testbox", true)
|
||||
return err
|
||||
})
|
||||
tcheck(t, err, "ensure mailbox exists")
|
||||
|
||||
err = acc.DB.Write(func(tx *bstore.Tx) error {
|
||||
acc.MailboxEnsureX(tx, "Testbox2", false)
|
||||
_, _, err := acc.MailboxEnsure(tx, "Testbox2", false)
|
||||
tcheck(t, err, "create mailbox")
|
||||
|
||||
exists := acc.MailboxExistsX(tx, "Testbox2")
|
||||
exists, err := acc.MailboxExists(tx, "Testbox2")
|
||||
tcheck(t, err, "checking that mailbox exists")
|
||||
if !exists {
|
||||
t.Fatalf("mailbox does not exist")
|
||||
}
|
||||
|
||||
exists = acc.MailboxExistsX(tx, "Testbox3")
|
||||
exists, err = acc.MailboxExists(tx, "Testbox3")
|
||||
tcheck(t, err, "checking that mailbox does not exist")
|
||||
if exists {
|
||||
t.Fatalf("mailbox does exist")
|
||||
}
|
||||
|
||||
xmb := acc.MailboxFindX(tx, "Testbox3")
|
||||
xmb, err := acc.MailboxFind(tx, "Testbox3")
|
||||
tcheck(t, err, "finding non-existing mailbox")
|
||||
if xmb != nil {
|
||||
t.Fatalf("did find Testbox3: %v", xmb)
|
||||
}
|
||||
xmb = acc.MailboxFindX(tx, "Testbox2")
|
||||
xmb, err = acc.MailboxFind(tx, "Testbox2")
|
||||
tcheck(t, err, "finding existing mailbox")
|
||||
if xmb == nil {
|
||||
t.Fatalf("did not find Testbox2")
|
||||
}
|
||||
|
||||
changes := acc.SubscriptionEnsureX(tx, "Testbox2")
|
||||
changes, err := acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
tcheck(t, err, "ensuring new subscription")
|
||||
if len(changes) == 0 {
|
||||
t.Fatalf("new subscription did not result in changes")
|
||||
}
|
||||
changes = acc.SubscriptionEnsureX(tx, "Testbox2")
|
||||
changes, err = acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
tcheck(t, err, "ensuring already present subscription")
|
||||
if len(changes) != 0 {
|
||||
t.Fatalf("already present subscription resulted in changes")
|
||||
}
|
||||
|
|
|
@ -128,7 +128,9 @@ func ExportMessages(log *mlog.Log, db *bstore.DB, accountDir string, archiver Ar
|
|||
name2id := map[string]int64{}
|
||||
|
||||
mailboxes, err := bstore.QueryTx[Mailbox](tx).List()
|
||||
xcheckf(err, "query mailboxes")
|
||||
if err != nil {
|
||||
return fmt.Errorf("query mailboxes: %w", err)
|
||||
}
|
||||
for _, mb := range mailboxes {
|
||||
id2name[mb.ID] = mb.Name
|
||||
name2id[mb.Name] = mb.ID
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/mjl-/bstore"
|
||||
)
|
||||
|
||||
// todo: get rid of this. it's a bad idea to indiscriminately turn all panics into an error.
|
||||
func extransact(db *bstore.DB, write bool, fn func(tx *bstore.Tx) error) (rerr error) {
|
||||
defer func() {
|
||||
x := recover()
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
if err, ok := x.(error); ok {
|
||||
rerr = err
|
||||
} else {
|
||||
panic(x)
|
||||
}
|
||||
}()
|
||||
if write {
|
||||
return db.Write(fn)
|
||||
}
|
||||
return db.Read(fn)
|
||||
}
|
Loading…
Reference in a new issue