1
1
Fork 0
mirror of https://github.com/mjl-/mox.git synced 2025-04-21 21:40:01 +03:00

rename variables, struct fields and functions to include an "x" when they can panic for handling errors

and document the convention in develop.txt.
spurred by running errcheck again (it has been a while). it still has too many
false to enable by default.
This commit is contained in:
Mechiel Lukkien 2025-03-24 16:02:12 +01:00
parent a2c79e25c1
commit 7a87522be0
No known key found for this signature in database
18 changed files with 797 additions and 800 deletions

View file

@ -27,7 +27,7 @@ import (
"github.com/mjl-/mox/tlsrptdb"
)
func backupctl(ctx context.Context, ctl *ctl) {
func xbackupctl(ctx context.Context, xctl *ctl) {
/* protocol:
> "backup"
> destdir
@ -41,14 +41,14 @@ func backupctl(ctx context.Context, ctl *ctl) {
// "src" or "dst" are incomplete paths relative to the source or destination data
// directories.
dstDir := ctl.xread()
verbose := ctl.xread() == "verbose"
dstDir := xctl.xread()
verbose := xctl.xread() == "verbose"
// Set when an error is encountered. At the end, we warn if set.
var incomplete bool
// We'll be writing output, and logging both to mox and the ctl stream.
writer := ctl.writer()
xwriter := xctl.writer()
// Format easily readable output for the user.
formatLog := func(prefix, text string, err error, attrs ...slog.Attr) []byte {
@ -67,10 +67,8 @@ func backupctl(ctx context.Context, ctl *ctl) {
// Log an error to both the mox service as the user running "mox backup".
pkglogx := func(prefix, text string, err error, attrs ...slog.Attr) {
ctl.log.Errorx(text, err, attrs...)
_, werr := writer.Write(formatLog(prefix, text, err, attrs...))
ctl.xcheck(werr, "write to ctl")
xctl.log.Errorx(text, err, attrs...)
xwriter.Write(formatLog(prefix, text, err, attrs...))
}
// Log an error but don't mark backup as failed.
@ -87,10 +85,9 @@ func backupctl(ctx context.Context, ctl *ctl) {
// If verbose is enabled, log to the cli command. Always log as info level.
xvlog := func(text string, attrs ...slog.Attr) {
ctl.log.Info(text, attrs...)
xctl.log.Info(text, attrs...)
if verbose {
_, werr := writer.Write(formatLog("", text, nil, attrs...))
ctl.xcheck(werr, "write to ctl")
xwriter.Write(formatLog("", text, nil, attrs...))
}
}
@ -164,12 +161,12 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if df != nil {
err := df.Close()
ctl.log.Check(err, "closing file")
xctl.log.Check(err, "closing file")
}
}()
defer func() {
err := sf.Close()
ctl.log.Check(err, "closing file")
xctl.log.Check(err, "closing file")
}()
if _, err := io.Copy(df, sf); err != nil {
return fmt.Errorf("copying config file %s to %s: %v", srcPath, destPath, err)
@ -213,7 +210,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
}
defer func() {
err := sf.Close()
ctl.log.Check(err, "closing source file")
xctl.log.Check(err, "closing source file")
}()
ensureDestDir(dstpath)
@ -225,7 +222,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if df != nil {
err := df.Close()
ctl.log.Check(err, "closing destination file")
xctl.log.Check(err, "closing destination file")
}
}()
if _, err := io.Copy(df, sf); err != nil {
@ -279,7 +276,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if df != nil {
err := df.Close()
ctl.log.Check(err, "closing destination database file")
xctl.log.Check(err, "closing destination database file")
}
}()
err = db.Read(ctx, func(tx *bstore.Tx) error {
@ -344,7 +341,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
}
defer func() {
err := sf.Close()
ctl.log.Check(err, "closing copied source file")
xctl.log.Check(err, "closing copied source file")
}()
df, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
@ -354,7 +351,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if df != nil {
err := df.Close()
ctl.log.Check(err, "closing partial destination file")
xctl.log.Check(err, "closing partial destination file")
}
}()
if _, err := io.Copy(df, sf); err != nil {
@ -371,7 +368,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
// Start making the backup.
tmStart := time.Now()
ctl.log.Print("making backup", slog.String("destdir", dstDataDir))
xctl.log.Print("making backup", slog.String("destdir", dstDataDir))
if err := os.MkdirAll(dstDataDir, 0770); err != nil {
xerrx("creating destination data directory", err)
@ -405,7 +402,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
}
dstdbpath := filepath.Join(dstDataDir, path)
opts := bstore.Options{MustExist: true, RegisterLogger: ctl.log.Logger}
opts := bstore.Options{MustExist: true, RegisterLogger: xctl.log.Logger}
db, err := bstore.Open(ctx, dstdbpath, &opts, queue.DBTypes...)
if err != nil {
xerrx("open copied queue database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmQueue)))
@ -415,7 +412,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if db != nil {
err := db.Close()
ctl.log.Check(err, "closing new queue db")
xctl.log.Check(err, "closing new queue db")
}
}()
@ -495,7 +492,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
backupAccount := func(acc *store.Account) {
defer func() {
err := acc.Close()
ctl.log.Check(err, "closing account")
xctl.log.Check(err, "closing account")
}()
tmAccount := time.Now()
@ -507,7 +504,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
// todo: should document/check not taking a rlock on account.
// Copy junkfilter files, if configured.
if jf, _, err := acc.OpenJunkFilter(ctx, ctl.log); err != nil {
if jf, _, err := acc.OpenJunkFilter(ctx, xctl.log); err != nil {
if !errors.Is(err, store.ErrNoJunkFilter) {
xerrx("opening junk filter for account (not backed up)", err)
}
@ -518,11 +515,11 @@ func backupctl(ctx context.Context, ctl *ctl) {
bloompath := filepath.Join("accounts", acc.Name, "junkfilter.bloom")
backupFile(bloompath)
err := jf.Close()
ctl.log.Check(err, "closing junkfilter")
xctl.log.Check(err, "closing junkfilter")
}
dstdbpath := filepath.Join(dstDataDir, dbpath)
opts := bstore.Options{MustExist: true, RegisterLogger: ctl.log.Logger}
opts := bstore.Options{MustExist: true, RegisterLogger: xctl.log.Logger}
db, err := bstore.Open(ctx, dstdbpath, &opts, store.DBTypes...)
if err != nil {
xerrx("open copied account database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmAccount)))
@ -532,7 +529,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
defer func() {
if db != nil {
err := db.Close()
ctl.log.Check(err, "close account database")
xctl.log.Check(err, "close account database")
}
}()
@ -635,7 +632,7 @@ func backupctl(ctx context.Context, ctl *ctl) {
// account directories when handling "all other files" below.
accounts := map[string]struct{}{}
for _, accName := range mox.Conf.Accounts() {
acc, err := store.OpenAccount(ctl.log, accName, false)
acc, err := store.OpenAccount(xctl.log, accName, false)
if err != nil {
xerrx("opening account for copying (will try to copy as regular files later)", err, slog.String("account", accName))
continue
@ -691,11 +688,11 @@ func backupctl(ctx context.Context, ctl *ctl) {
xvlog("backup finished", slog.Duration("duration", time.Since(tmStart)))
writer.xclose()
xwriter.xclose()
if incomplete {
ctl.xwrite("errors were encountered during backup")
xctl.xwrite("errors were encountered during backup")
} else {
ctl.xwriteok()
xctl.xwriteok()
}
}

717
ctl.go

File diff suppressed because it is too large Load diff

View file

@ -60,54 +60,54 @@ func TestCtl(t *testing.T) {
var cid int64
testctl := func(fn func(clientctl *ctl)) {
testctl := func(fn func(clientxctl *ctl)) {
t.Helper()
cconn, sconn := net.Pipe()
clientctl := ctl{conn: cconn, log: pkglog}
serverctl := ctl{conn: sconn, log: pkglog}
clientxctl := ctl{conn: cconn, log: pkglog}
serverxctl := ctl{conn: sconn, log: pkglog}
done := make(chan struct{})
go func() {
cid++
servectlcmd(ctxbg, &serverctl, cid, func() {})
servectlcmd(ctxbg, &serverxctl, cid, func() {})
close(done)
}()
fn(&clientctl)
fn(&clientxctl)
cconn.Close()
<-done
sconn.Close()
}
// "deliver"
testctl(func(ctl *ctl) {
ctlcmdDeliver(ctl, "mjl@mox.example")
testctl(func(xctl *ctl) {
ctlcmdDeliver(xctl, "mjl@mox.example")
})
// "setaccountpassword"
testctl(func(ctl *ctl) {
ctlcmdSetaccountpassword(ctl, "mjl", "test4321")
testctl(func(xctl *ctl) {
ctlcmdSetaccountpassword(xctl, "mjl", "test4321")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesList(ctl)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesList(xctl)
})
// All messages.
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "", "", "")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "", "", "")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "mjl", "", "")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "mjl", "", "")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "", "☺.mox.example", "")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "", "☺.mox.example", "")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "mox", "☺.mox.example", "example.com")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "mox", "☺.mox.example", "example.com")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesRemove(ctl, 1)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesRemove(xctl, 1)
})
// Queue a message to list/change/dump.
@ -127,252 +127,252 @@ func TestCtl(t *testing.T) {
qmid := qml[0].ID
// Has entries now.
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesList(ctl)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesList(xctl)
})
// "queuelist"
testctl(func(ctl *ctl) {
ctlcmdQueueList(ctl, queue.Filter{}, queue.Sort{})
testctl(func(xctl *ctl) {
ctlcmdQueueList(xctl, queue.Filter{}, queue.Sort{})
})
// "queueholdset"
testctl(func(ctl *ctl) {
ctlcmdQueueHoldSet(ctl, queue.Filter{}, true)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldSet(xctl, queue.Filter{}, true)
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldSet(ctl, queue.Filter{}, false)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldSet(xctl, queue.Filter{}, false)
})
// "queueschedule"
testctl(func(ctl *ctl) {
ctlcmdQueueSchedule(ctl, queue.Filter{}, true, time.Minute)
testctl(func(xctl *ctl) {
ctlcmdQueueSchedule(xctl, queue.Filter{}, true, time.Minute)
})
// "queuetransport"
testctl(func(ctl *ctl) {
ctlcmdQueueTransport(ctl, queue.Filter{}, "socks")
testctl(func(xctl *ctl) {
ctlcmdQueueTransport(xctl, queue.Filter{}, "socks")
})
// "queuerequiretls"
testctl(func(ctl *ctl) {
ctlcmdQueueRequireTLS(ctl, queue.Filter{}, nil)
testctl(func(xctl *ctl) {
ctlcmdQueueRequireTLS(xctl, queue.Filter{}, nil)
})
// "queuedump"
testctl(func(ctl *ctl) {
ctlcmdQueueDump(ctl, fmt.Sprintf("%d", qmid))
testctl(func(xctl *ctl) {
ctlcmdQueueDump(xctl, fmt.Sprintf("%d", qmid))
})
// "queuefail"
testctl(func(ctl *ctl) {
ctlcmdQueueFail(ctl, queue.Filter{})
testctl(func(xctl *ctl) {
ctlcmdQueueFail(xctl, queue.Filter{})
})
// "queuedrop"
testctl(func(ctl *ctl) {
ctlcmdQueueDrop(ctl, queue.Filter{})
testctl(func(xctl *ctl) {
ctlcmdQueueDrop(xctl, queue.Filter{})
})
// "queueholdruleslist"
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesList(ctl)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesList(xctl)
})
// "queueholdrulesadd"
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "mjl", "", "")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "mjl", "", "")
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesAdd(ctl, "mjl", "localhost", "")
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesAdd(xctl, "mjl", "localhost", "")
})
// "queueholdrulesremove"
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesRemove(ctl, 2)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesRemove(xctl, 2)
})
testctl(func(ctl *ctl) {
ctlcmdQueueHoldrulesList(ctl)
testctl(func(xctl *ctl) {
ctlcmdQueueHoldrulesList(xctl)
})
// "queuesuppresslist"
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressList(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressList(xctl, "mjl")
})
// "queuesuppressadd"
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressAdd(ctl, "mjl", "base@localhost")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressAdd(xctl, "mjl", "base@localhost")
})
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressAdd(ctl, "mjl", "other@localhost")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressAdd(xctl, "mjl", "other@localhost")
})
// "queuesuppresslookup"
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressLookup(ctl, "mjl", "base@localhost")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressLookup(xctl, "mjl", "base@localhost")
})
// "queuesuppressremove"
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressRemove(ctl, "mjl", "base@localhost")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressRemove(xctl, "mjl", "base@localhost")
})
testctl(func(ctl *ctl) {
ctlcmdQueueSuppressList(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdQueueSuppressList(xctl, "mjl")
})
// "queueretiredlist"
testctl(func(ctl *ctl) {
ctlcmdQueueRetiredList(ctl, queue.RetiredFilter{}, queue.RetiredSort{})
testctl(func(xctl *ctl) {
ctlcmdQueueRetiredList(xctl, queue.RetiredFilter{}, queue.RetiredSort{})
})
// "queueretiredprint"
testctl(func(ctl *ctl) {
ctlcmdQueueRetiredPrint(ctl, "1")
testctl(func(xctl *ctl) {
ctlcmdQueueRetiredPrint(xctl, "1")
})
// "queuehooklist"
testctl(func(ctl *ctl) {
ctlcmdQueueHookList(ctl, queue.HookFilter{}, queue.HookSort{})
testctl(func(xctl *ctl) {
ctlcmdQueueHookList(xctl, queue.HookFilter{}, queue.HookSort{})
})
// "queuehookschedule"
testctl(func(ctl *ctl) {
ctlcmdQueueHookSchedule(ctl, queue.HookFilter{}, true, time.Minute)
testctl(func(xctl *ctl) {
ctlcmdQueueHookSchedule(xctl, queue.HookFilter{}, true, time.Minute)
})
// "queuehookprint"
testctl(func(ctl *ctl) {
ctlcmdQueueHookPrint(ctl, "1")
testctl(func(xctl *ctl) {
ctlcmdQueueHookPrint(xctl, "1")
})
// "queuehookcancel"
testctl(func(ctl *ctl) {
ctlcmdQueueHookCancel(ctl, queue.HookFilter{})
testctl(func(xctl *ctl) {
ctlcmdQueueHookCancel(xctl, queue.HookFilter{})
})
// "queuehookretiredlist"
testctl(func(ctl *ctl) {
ctlcmdQueueHookRetiredList(ctl, queue.HookRetiredFilter{}, queue.HookRetiredSort{})
testctl(func(xctl *ctl) {
ctlcmdQueueHookRetiredList(xctl, queue.HookRetiredFilter{}, queue.HookRetiredSort{})
})
// "queuehookretiredprint"
testctl(func(ctl *ctl) {
ctlcmdQueueHookRetiredPrint(ctl, "1")
testctl(func(xctl *ctl) {
ctlcmdQueueHookRetiredPrint(xctl, "1")
})
// "importmbox"
testctl(func(ctl *ctl) {
ctlcmdImport(ctl, true, "mjl", "inbox", "testdata/importtest.mbox")
testctl(func(xctl *ctl) {
ctlcmdImport(xctl, true, "mjl", "inbox", "testdata/importtest.mbox")
})
// "importmaildir"
testctl(func(ctl *ctl) {
ctlcmdImport(ctl, false, "mjl", "inbox", "testdata/importtest.maildir")
testctl(func(xctl *ctl) {
ctlcmdImport(xctl, false, "mjl", "inbox", "testdata/importtest.maildir")
})
// "domainadd"
testctl(func(ctl *ctl) {
ctlcmdConfigDomainAdd(ctl, false, dns.Domain{ASCII: "mox2.example"}, "mjl", "")
testctl(func(xctl *ctl) {
ctlcmdConfigDomainAdd(xctl, false, dns.Domain{ASCII: "mox2.example"}, "mjl", "")
})
// "accountadd"
testctl(func(ctl *ctl) {
ctlcmdConfigAccountAdd(ctl, "mjl2", "mjl2@mox2.example")
testctl(func(xctl *ctl) {
ctlcmdConfigAccountAdd(xctl, "mjl2", "mjl2@mox2.example")
})
// "addressadd"
testctl(func(ctl *ctl) {
ctlcmdConfigAddressAdd(ctl, "mjl3@mox2.example", "mjl2")
testctl(func(xctl *ctl) {
ctlcmdConfigAddressAdd(xctl, "mjl3@mox2.example", "mjl2")
})
// Add a message.
testctl(func(ctl *ctl) {
ctlcmdDeliver(ctl, "mjl3@mox2.example")
testctl(func(xctl *ctl) {
ctlcmdDeliver(xctl, "mjl3@mox2.example")
})
// "retrain", retrain junk filter.
testctl(func(ctl *ctl) {
ctlcmdRetrain(ctl, "mjl2")
testctl(func(xctl *ctl) {
ctlcmdRetrain(xctl, "mjl2")
})
// "addressrm"
testctl(func(ctl *ctl) {
ctlcmdConfigAddressRemove(ctl, "mjl3@mox2.example")
testctl(func(xctl *ctl) {
ctlcmdConfigAddressRemove(xctl, "mjl3@mox2.example")
})
// "accountdisabled"
testctl(func(ctl *ctl) {
ctlcmdConfigAccountDisabled(ctl, "mjl2", "testing")
testctl(func(xctl *ctl) {
ctlcmdConfigAccountDisabled(xctl, "mjl2", "testing")
})
testctl(func(ctl *ctl) {
ctlcmdConfigAccountDisabled(ctl, "mjl2", "")
testctl(func(xctl *ctl) {
ctlcmdConfigAccountDisabled(xctl, "mjl2", "")
})
// "accountrm"
testctl(func(ctl *ctl) {
ctlcmdConfigAccountRemove(ctl, "mjl2")
testctl(func(xctl *ctl) {
ctlcmdConfigAccountRemove(xctl, "mjl2")
})
// "domaindisabled"
testctl(func(ctl *ctl) {
ctlcmdConfigDomainDisabled(ctl, dns.Domain{ASCII: "mox2.example"}, true)
testctl(func(xctl *ctl) {
ctlcmdConfigDomainDisabled(xctl, dns.Domain{ASCII: "mox2.example"}, true)
})
testctl(func(ctl *ctl) {
ctlcmdConfigDomainDisabled(ctl, dns.Domain{ASCII: "mox2.example"}, false)
testctl(func(xctl *ctl) {
ctlcmdConfigDomainDisabled(xctl, dns.Domain{ASCII: "mox2.example"}, false)
})
// "domainrm"
testctl(func(ctl *ctl) {
ctlcmdConfigDomainRemove(ctl, dns.Domain{ASCII: "mox2.example"})
testctl(func(xctl *ctl) {
ctlcmdConfigDomainRemove(xctl, dns.Domain{ASCII: "mox2.example"})
})
// "aliasadd"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasAdd(ctl, "support@mox.example", config.Alias{Addresses: []string{"mjl@mox.example"}})
testctl(func(xctl *ctl) {
ctlcmdConfigAliasAdd(xctl, "support@mox.example", config.Alias{Addresses: []string{"mjl@mox.example"}})
})
// "aliaslist"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasList(ctl, "mox.example")
testctl(func(xctl *ctl) {
ctlcmdConfigAliasList(xctl, "mox.example")
})
// "aliasprint"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasPrint(ctl, "support@mox.example")
testctl(func(xctl *ctl) {
ctlcmdConfigAliasPrint(xctl, "support@mox.example")
})
// "aliasupdate"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasUpdate(ctl, "support@mox.example", "true", "true", "true")
testctl(func(xctl *ctl) {
ctlcmdConfigAliasUpdate(xctl, "support@mox.example", "true", "true", "true")
})
// "aliasaddaddr"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasAddaddr(ctl, "support@mox.example", []string{"mjl2@mox.example"})
testctl(func(xctl *ctl) {
ctlcmdConfigAliasAddaddr(xctl, "support@mox.example", []string{"mjl2@mox.example"})
})
// "aliasrmaddr"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasRmaddr(ctl, "support@mox.example", []string{"mjl2@mox.example"})
testctl(func(xctl *ctl) {
ctlcmdConfigAliasRmaddr(xctl, "support@mox.example", []string{"mjl2@mox.example"})
})
// "aliasrm"
testctl(func(ctl *ctl) {
ctlcmdConfigAliasRemove(ctl, "support@mox.example")
testctl(func(xctl *ctl) {
ctlcmdConfigAliasRemove(xctl, "support@mox.example")
})
// accounttlspubkeyadd
certDER := fakeCert(t)
testctl(func(ctl *ctl) {
ctlcmdConfigTlspubkeyAdd(ctl, "mjl@mox.example", "testkey", false, certDER)
testctl(func(xctl *ctl) {
ctlcmdConfigTlspubkeyAdd(xctl, "mjl@mox.example", "testkey", false, certDER)
})
// "accounttlspubkeylist"
testctl(func(ctl *ctl) {
ctlcmdConfigTlspubkeyList(ctl, "")
testctl(func(xctl *ctl) {
ctlcmdConfigTlspubkeyList(xctl, "")
})
testctl(func(ctl *ctl) {
ctlcmdConfigTlspubkeyList(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdConfigTlspubkeyList(xctl, "mjl")
})
tpkl, err := store.TLSPublicKeyList(ctxbg, "")
@ -383,13 +383,13 @@ func TestCtl(t *testing.T) {
fingerprint := tpkl[0].Fingerprint
// "accounttlspubkeyget"
testctl(func(ctl *ctl) {
ctlcmdConfigTlspubkeyGet(ctl, fingerprint)
testctl(func(xctl *ctl) {
ctlcmdConfigTlspubkeyGet(xctl, fingerprint)
})
// "accounttlspubkeyrm"
testctl(func(ctl *ctl) {
ctlcmdConfigTlspubkeyRemove(ctl, fingerprint)
testctl(func(xctl *ctl) {
ctlcmdConfigTlspubkeyRemove(xctl, fingerprint)
})
tpkl, err = store.TLSPublicKeyList(ctxbg, "")
@ -399,39 +399,39 @@ func TestCtl(t *testing.T) {
}
// "loglevels"
testctl(func(ctl *ctl) {
ctlcmdLoglevels(ctl)
testctl(func(xctl *ctl) {
ctlcmdLoglevels(xctl)
})
// "setloglevels"
testctl(func(ctl *ctl) {
ctlcmdSetLoglevels(ctl, "", "debug")
testctl(func(xctl *ctl) {
ctlcmdSetLoglevels(xctl, "", "debug")
})
testctl(func(ctl *ctl) {
ctlcmdSetLoglevels(ctl, "smtpserver", "debug")
testctl(func(xctl *ctl) {
ctlcmdSetLoglevels(xctl, "smtpserver", "debug")
})
// Export data, import it again
xcmdExport(true, false, []string{filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/"), filepath.FromSlash("testdata/ctl/data/accounts/mjl")}, &cmd{log: pkglog})
xcmdExport(false, false, []string{filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/"), filepath.FromSlash("testdata/ctl/data/accounts/mjl")}, &cmd{log: pkglog})
testctl(func(ctl *ctl) {
ctlcmdImport(ctl, true, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/Inbox.mbox"))
testctl(func(xctl *ctl) {
ctlcmdImport(xctl, true, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/Inbox.mbox"))
})
testctl(func(ctl *ctl) {
ctlcmdImport(ctl, false, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/Inbox"))
testctl(func(xctl *ctl) {
ctlcmdImport(xctl, false, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/Inbox"))
})
// "recalculatemailboxcounts"
testctl(func(ctl *ctl) {
ctlcmdRecalculateMailboxCounts(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdRecalculateMailboxCounts(xctl, "mjl")
})
// "fixmsgsize"
testctl(func(ctl *ctl) {
ctlcmdFixmsgsize(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdFixmsgsize(xctl, "mjl")
})
testctl(func(ctl *ctl) {
acc, err := store.OpenAccount(ctl.log, "mjl", false)
testctl(func(xctl *ctl) {
acc, err := store.OpenAccount(xctl.log, "mjl", false)
tcheck(t, err, "open account")
defer func() {
acc.Close()
@ -443,7 +443,7 @@ func TestCtl(t *testing.T) {
deliver := func(m *store.Message) {
t.Helper()
m.Size = int64(len(content))
msgf, err := store.CreateMessageTemp(ctl.log, "ctltest")
msgf, err := store.CreateMessageTemp(xctl.log, "ctltest")
tcheck(t, err, "create temp file")
defer os.Remove(msgf.Name())
defer msgf.Close()
@ -451,7 +451,7 @@ func TestCtl(t *testing.T) {
tcheck(t, err, "write message file")
acc.WithWLock(func() {
err = acc.DeliverMailbox(ctl.log, "Inbox", m, msgf)
err = acc.DeliverMailbox(xctl.log, "Inbox", m, msgf)
tcheck(t, err, "deliver message")
})
}
@ -471,7 +471,7 @@ func TestCtl(t *testing.T) {
tcheck(t, err, "update mailbox size")
// Fix up the size.
ctlcmdFixmsgsize(ctl, "")
ctlcmdFixmsgsize(xctl, "")
err = acc.DB.Get(ctxbg, &msgBadSize)
tcheck(t, err, "get message")
@ -481,19 +481,19 @@ func TestCtl(t *testing.T) {
})
// "reparse"
testctl(func(ctl *ctl) {
ctlcmdReparse(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdReparse(xctl, "mjl")
})
testctl(func(ctl *ctl) {
ctlcmdReparse(ctl, "")
testctl(func(xctl *ctl) {
ctlcmdReparse(xctl, "")
})
// "reassignthreads"
testctl(func(ctl *ctl) {
ctlcmdReassignthreads(ctl, "mjl")
testctl(func(xctl *ctl) {
ctlcmdReassignthreads(xctl, "mjl")
})
testctl(func(ctl *ctl) {
ctlcmdReassignthreads(ctl, "")
testctl(func(xctl *ctl) {
ctlcmdReassignthreads(xctl, "")
})
// "backup", backup account.
@ -506,11 +506,11 @@ func TestCtl(t *testing.T) {
err = tlsrptdb.Init()
tcheck(t, err, "tlsrptdb init")
defer tlsrptdb.Close()
testctl(func(ctl *ctl) {
testctl(func(xctl *ctl) {
os.RemoveAll("testdata/ctl/data/tmp/backup")
err := os.WriteFile("testdata/ctl/data/receivedid.key", make([]byte, 16), 0600)
tcheck(t, err, "writing receivedid.key")
ctlcmdBackup(ctl, filepath.FromSlash("testdata/ctl/data/tmp/backup"), false)
ctlcmdBackup(xctl, filepath.FromSlash("testdata/ctl/data/tmp/backup"), false)
})
// Verify the backup.
@ -521,7 +521,7 @@ func TestCtl(t *testing.T) {
cmdVerifydata(&xcmd)
// IMAP connection.
testctl(func(ctl *ctl) {
testctl(func(xctl *ctl) {
a, b := net.Pipe()
go func() {
client, err := imapclient.New(mox.Cid(), a, true)
@ -530,7 +530,7 @@ func TestCtl(t *testing.T) {
client.Logout()
defer a.Close()
}()
ctlcmdIMAPServe(ctl, "mjl@mox.example", b, b)
ctlcmdIMAPServe(xctl, "mjl@mox.example", b, b)
})
}

View file

@ -47,6 +47,18 @@ instructions below.
standard slog package for logging, not our mlog package. Packages not intended
for reuse do use mlog as it is more convenient. Internally, we always use
mlog.Log to do the logging, wrapping an slog.Logger.
- The code uses panic for error handling in quite a few places, including
smtpserver, imapserver and web API calls. Functions/methods, variables, struct
fields and types that begin with an "x" indicate they can panic on errors. Both
for i/o errors that are fatal for a connection, and also often for user-induced
errors, for example bad IMAP commands or invalid web API requests. These panics
are caught again at the top of a command or top of the connection. Write code
that is panic-safe, using defer to clean up and release resources.
- Try to check all errors, at the minimum using mlog.Log.Check() to log an error
at the appropriate level. Also when just closing a file. Log messages sometimes
unexpectedly point out latent issues. Only when there is no point in logging,
for example when previous writes to stderr failed, can error logging be skipped.
Test code is less strict about checking errors.
# Reusable packages

View file

@ -30,17 +30,19 @@ import (
// Conn is an IMAP connection to a server.
type Conn struct {
// Connection, may be original TCP or TLS connection. Reads go through c.br, and
// writes through c.bw. It wraps a tracing reading/writer and may wrap flate
// compression.
conn net.Conn
connBroken bool // If connection is broken, we won't flush (and write) again.
br *bufio.Reader
bw *bufio.Writer
compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer.
flateWriter *moxio.FlateWriter
flateBW *bufio.Writer
tr *moxio.TraceReader
tw *moxio.TraceWriter
// writes through c.xbw. The "x" for the writes indicate that failed writes cause
// an i/o panic, which is either turned into a returned error, or passed on (see
// boolean panic). The reader and writer wrap a tracing reading/writer and may wrap
// flate compression.
conn net.Conn
connBroken bool // If connection is broken, we won't flush (and write) again.
br *bufio.Reader
tr *moxio.TraceReader
xbw *bufio.Writer
compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer.
xflateWriter *moxio.FlateWriter
xflateBW *bufio.Writer
xtw *moxio.TraceWriter
log mlog.Log
panic bool
@ -86,8 +88,8 @@ func New(cid int64, conn net.Conn, xpanic bool) (client *Conn, rerr error) {
c.br = bufio.NewReader(c.tr)
// Writes are buffered and write to Conn, which may panic.
c.tw = moxio.NewTraceWriter(log, "CW: ", &c)
c.bw = bufio.NewWriter(c.tw)
c.xtw = moxio.NewTraceWriter(log, "CW: ", &c)
c.xbw = bufio.NewWriter(c.xtw)
defer c.recover(&rerr)
tag := c.xnonspace()
@ -171,14 +173,14 @@ func (c *Conn) xflush() {
return
}
err := c.bw.Flush()
err := c.xbw.Flush()
c.xcheckf(err, "flush")
// If compression is active, we need to flush the deflate stream.
if c.compress {
err := c.flateWriter.Flush()
err := c.xflateWriter.Flush()
c.xcheckf(err, "flush deflate")
err = c.flateBW.Flush()
err = c.xflateBW.Flush()
c.xcheckf(err, "flush deflate buffer")
}
}
@ -186,11 +188,11 @@ func (c *Conn) xflush() {
func (c *Conn) xtrace(level slog.Level) func() {
c.xflush()
c.tr.SetTrace(level)
c.tw.SetTrace(level)
c.xtw.SetTrace(level)
return func() {
c.xflush()
c.tr.SetTrace(mlog.LevelTrace)
c.tw.SetTrace(mlog.LevelTrace)
c.xtw.SetTrace(mlog.LevelTrace)
}
}
@ -214,13 +216,13 @@ func (c *Conn) Close() (rerr error) {
if c.conn == nil {
return nil
}
if !c.connBroken && c.flateWriter != nil {
err := c.flateWriter.Close()
if !c.connBroken && c.xflateWriter != nil {
err := c.xflateWriter.Close()
c.xcheckf(err, "close deflate writer")
err = c.flateBW.Flush()
err = c.xflateBW.Flush()
c.xcheckf(err, "flush deflate buffer")
c.flateWriter = nil
c.flateBW = nil
c.xflateWriter = nil
c.xflateBW = nil
}
err := c.conn.Close()
c.xcheckf(err, "close connection")
@ -248,8 +250,7 @@ func (c *Conn) Commandf(tag string, format string, args ...any) (rerr error) {
}
c.LastTag = tag
_, err := fmt.Fprintf(c.bw, "%s %s\r\n", tag, fmt.Sprintf(format, args...))
c.xcheckf(err, "write command")
fmt.Fprintf(c.xbw, "%s %s\r\n", tag, fmt.Sprintf(format, args...))
c.xflush()
return
}
@ -337,8 +338,7 @@ func (c *Conn) Writelinef(format string, args ...any) (rerr error) {
defer c.recover(&rerr)
s := fmt.Sprintf(format, args...)
_, err := fmt.Fprintf(c.bw, "%s\r\n", s)
c.xcheckf(err, "writeline")
fmt.Fprintf(c.xbw, "%s\r\n", s)
c.xflush()
return nil
}
@ -348,8 +348,7 @@ func (c *Conn) Writelinef(format string, args ...any) (rerr error) {
func (c *Conn) WriteSyncLiteral(s string) (untagged []Untagged, rerr error) {
defer c.recover(&rerr)
_, err := fmt.Fprintf(c.bw, "{%d}\r\n", len(s))
c.xcheckf(err, "write sync literal size")
fmt.Fprintf(c.xbw, "{%d}\r\n", len(s))
c.xflush()
plus, err := c.br.Peek(1)
@ -358,7 +357,7 @@ func (c *Conn) WriteSyncLiteral(s string) (untagged []Untagged, rerr error) {
_, err = c.Readline()
c.xcheckf(err, "read continuation line")
_, err = c.bw.Write([]byte(s))
_, err = c.xbw.Write([]byte(s))
c.xcheckf(err, "write literal data")
c.xflush()
return nil, nil

View file

@ -58,9 +58,9 @@ func (c *Conn) Login(username, password string) (untagged []Untagged, result Res
defer c.recover(&rerr)
c.LastTag = c.nextTag()
fmt.Fprintf(c.bw, "%s login %s ", c.LastTag, astring(username))
fmt.Fprintf(c.xbw, "%s login %s ", c.LastTag, astring(username))
defer c.xtrace(mlog.LevelTraceauth)()
fmt.Fprintf(c.bw, "%s\r\n", astring(password))
fmt.Fprintf(c.xbw, "%s\r\n", astring(password))
c.xtrace(mlog.LevelTrace) // Restore.
return c.Response()
}
@ -69,18 +69,19 @@ func (c *Conn) Login(username, password string) (untagged []Untagged, result Res
func (c *Conn) AuthenticatePlain(username, password string) (untagged []Untagged, result Result, rerr error) {
defer c.recover(&rerr)
c.Commandf("", "authenticate plain")
err := c.Commandf("", "authenticate plain")
c.xcheckf(err, "writing authenticate command")
_, untagged, result, rerr = c.ReadContinuation()
c.xcheckf(rerr, "reading continuation")
if result.Status != "" {
c.xerrorf("got result status %q, expected continuation", result.Status)
}
defer c.xtrace(mlog.LevelTraceauth)()
xw := base64.NewEncoder(base64.StdEncoding, c.bw)
xw := base64.NewEncoder(base64.StdEncoding, c.xbw)
fmt.Fprintf(xw, "\u0000%s\u0000%s", username, password)
xw.Close()
c.xtrace(mlog.LevelTrace) // Restore.
fmt.Fprintf(c.bw, "\r\n")
fmt.Fprintf(c.xbw, "\r\n")
c.xflush()
return c.Response()
}
@ -153,15 +154,15 @@ func (c *Conn) CompressDeflate() (untagged []Untagged, result Result, rerr error
untagged, result, rerr = c.Transactf("compress deflate")
c.xcheck(rerr)
c.flateBW = bufio.NewWriter(c)
fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression)
c.xflateBW = bufio.NewWriter(c)
fw0, err := flate.NewWriter(c.xflateBW, flate.DefaultCompression)
c.xcheckf(err, "deflate") // Cannot happen.
fw := moxio.NewFlateWriter(fw0)
c.compress = true
c.flateWriter = fw
c.tw = moxio.NewTraceWriter(mlog.New("imapclient", nil), "CW: ", fw)
c.bw = bufio.NewWriter(c.tw)
c.xflateWriter = fw
c.xtw = moxio.NewTraceWriter(mlog.New("imapclient", nil), "CW: ", fw)
c.xbw = bufio.NewWriter(c.xtw)
rc := c.xprefixConn()
fr := flate.NewReaderPartial(rc)
@ -303,8 +304,7 @@ func (c *Conn) Append(mailbox string, message Append, more ...Append) (untagged
tag := c.nextTag()
c.LastTag = tag
_, err := fmt.Fprintf(c.bw, "%s append %s", tag, astring(mailbox))
c.xcheckf(err, "write command")
fmt.Fprintf(c.xbw, "%s append %s", tag, astring(mailbox))
msgs := append([]Append{message}, more...)
for _, m := range msgs {
@ -316,14 +316,14 @@ func (c *Conn) Append(mailbox string, message Append, more ...Append) (untagged
// todo: use literal8 if needed, with "UTF8()" if required.
// todo: for larger messages, use a synchronizing literal.
fmt.Fprintf(c.bw, " (%s)%s {%d+}\r\n", strings.Join(m.Flags, " "), date, m.Size)
fmt.Fprintf(c.xbw, " (%s)%s {%d+}\r\n", strings.Join(m.Flags, " "), date, m.Size)
defer c.xtrace(mlog.LevelTracedata)()
_, err := io.Copy(c.bw, m.Data)
_, err := io.Copy(c.xbw, m.Data)
c.xcheckf(err, "write message data")
c.xtrace(mlog.LevelTrace) // Restore
}
fmt.Fprintf(c.bw, "\r\n")
fmt.Fprintf(c.xbw, "\r\n")
c.xflush()
return c.Response()
}
@ -441,14 +441,15 @@ func (c *Conn) replace(cmd string, num string, mailbox string, msg Append) (unta
}
// todo: only use literal8 if needed, possibly with "UTF8()"
// todo: encode mailbox
c.Commandf("", "%s %s %s (%s)%s ~{%d+}", cmd, num, astring(mailbox), strings.Join(msg.Flags, " "), date, msg.Size)
err := c.Commandf("", "%s %s %s (%s)%s ~{%d+}", cmd, num, astring(mailbox), strings.Join(msg.Flags, " "), date, msg.Size)
c.xcheckf(err, "writing replace command")
defer c.xtrace(mlog.LevelTracedata)()
_, err := io.Copy(c.bw, msg.Data)
_, err = io.Copy(c.xbw, msg.Data)
c.xcheckf(err, "write message data")
c.xtrace(mlog.LevelTrace)
fmt.Fprintf(c.bw, "\r\n")
fmt.Fprintf(c.xbw, "\r\n")
c.xflush()
return c.Response()

View file

@ -856,8 +856,7 @@ func (c *Conn) xliteral() []byte {
c.xerrorf("refusing to read more than 1MB: %d", size)
}
if sync {
_, err := fmt.Fprintf(c.bw, "+ ok\r\n")
c.xcheckf(err, "write continuation")
fmt.Fprintf(c.xbw, "+ ok\r\n")
c.xflush()
}
buf := make([]byte, int(size))

View file

@ -428,9 +428,9 @@ func (cmd *fetchCmd) process(atts []fetchAtt) {
}
// Write errors are turned into panics because we write through c.
fmt.Fprintf(cmd.conn.bw, "* %d FETCH ", cmd.conn.xsequence(cmd.uid))
data.writeTo(cmd.conn, cmd.conn.bw)
cmd.conn.bw.Write([]byte("\r\n"))
fmt.Fprintf(cmd.conn.xbw, "* %d FETCH ", cmd.conn.xsequence(cmd.uid))
data.writeTo(cmd.conn, cmd.conn.xbw)
cmd.conn.xbw.Write([]byte("\r\n"))
}
// result for one attribute. if processing fails, e.g. because data was requested

View file

@ -264,7 +264,7 @@ func (c *conn) cmdList(tag, cmd string, p *parser) {
c.bwritelinef("%s", line)
}
for _, meta := range respMetadata {
meta.writeTo(c, c.bw)
meta.writeTo(c, c.xbw)
c.bwritelinef("")
}
c.ok(tag, cmd)

View file

@ -155,18 +155,18 @@ func (c *conn) cmdGetmetadata(tag, cmd string, p *parser) {
// Response syntax: ../rfc/5464:807 ../rfc/5464:778
// We can only send untagged responses when we have any matches.
if len(annotations) > 0 {
fmt.Fprintf(c.bw, "* METADATA %s (", mailboxt(mailboxName).pack(c))
fmt.Fprintf(c.xbw, "* METADATA %s (", mailboxt(mailboxName).pack(c))
for i, a := range annotations {
if i > 0 {
fmt.Fprint(c.bw, " ")
fmt.Fprint(c.xbw, " ")
}
astring(a.Key).writeTo(c, c.bw)
fmt.Fprint(c.bw, " ")
astring(a.Key).writeTo(c, c.xbw)
fmt.Fprint(c.xbw, " ")
if a.IsString {
string0(string(a.Value)).writeTo(c, c.bw)
string0(string(a.Value)).writeTo(c, c.xbw)
} else {
v := readerSizeSyncliteral{bytes.NewReader(a.Value), int64(len(a.Value)), true}
v.writeTo(c, c.bw)
v.writeTo(c, c.xbw)
}
}
c.bwritelinef(")")

View file

@ -9,7 +9,7 @@ import (
type token interface {
pack(c *conn) string
writeTo(c *conn, w io.Writer)
writeTo(c *conn, xw io.Writer) // Writes to xw panic on error.
}
type bare string
@ -18,8 +18,8 @@ func (t bare) pack(c *conn) string {
return string(t)
}
func (t bare) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t bare) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
type niltoken struct{}
@ -30,8 +30,8 @@ func (t niltoken) pack(c *conn) string {
return "NIL"
}
func (t niltoken) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t niltoken) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
func nilOrString(s string) token {
@ -60,8 +60,8 @@ func (t string0) pack(c *conn) string {
return r
}
func (t string0) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t string0) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
type dquote string
@ -78,8 +78,8 @@ func (t dquote) pack(c *conn) string {
return r
}
func (t dquote) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t dquote) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
type syncliteral string
@ -88,9 +88,9 @@ func (t syncliteral) pack(c *conn) string {
return fmt.Sprintf("{%d}\r\n", len(t)) + string(t)
}
func (t syncliteral) writeTo(c *conn, w io.Writer) {
fmt.Fprintf(w, "{%d}\r\n", len(t))
w.Write([]byte(t))
func (t syncliteral) writeTo(c *conn, xw io.Writer) {
fmt.Fprintf(xw, "{%d}\r\n", len(t))
xw.Write([]byte(t))
}
// data from reader with known size.
@ -112,14 +112,14 @@ func (t readerSizeSyncliteral) pack(c *conn) string {
return fmt.Sprintf("%s{%d}\r\n", lit, t.size) + string(buf)
}
func (t readerSizeSyncliteral) writeTo(c *conn, w io.Writer) {
func (t readerSizeSyncliteral) writeTo(c *conn, xw io.Writer) {
var lit string
if t.lit8 {
lit = "~"
}
fmt.Fprintf(w, "%s{%d}\r\n", lit, t.size)
fmt.Fprintf(xw, "%s{%d}\r\n", lit, t.size)
defer c.xtrace(mlog.LevelTracedata)()
if _, err := io.Copy(w, io.LimitReader(t.r, t.size)); err != nil {
if _, err := io.Copy(xw, io.LimitReader(t.r, t.size)); err != nil {
panic(err)
}
}
@ -137,17 +137,14 @@ func (t readerSyncliteral) pack(c *conn) string {
return fmt.Sprintf("{%d}\r\n", len(buf)) + string(buf)
}
func (t readerSyncliteral) writeTo(c *conn, w io.Writer) {
func (t readerSyncliteral) writeTo(c *conn, xw io.Writer) {
buf, err := io.ReadAll(t.r)
if err != nil {
panic(err)
}
fmt.Fprintf(w, "{%d}\r\n", len(buf))
fmt.Fprintf(xw, "{%d}\r\n", len(buf))
defer c.xtrace(mlog.LevelTracedata)()
_, err = w.Write(buf)
if err != nil {
panic(err)
}
xw.Write(buf)
}
// list with tokens space-separated
@ -165,15 +162,15 @@ func (t listspace) pack(c *conn) string {
return s
}
func (t listspace) writeTo(c *conn, w io.Writer) {
fmt.Fprint(w, "(")
func (t listspace) writeTo(c *conn, xw io.Writer) {
fmt.Fprint(xw, "(")
for i, e := range t {
if i > 0 {
fmt.Fprint(w, " ")
fmt.Fprint(xw, " ")
}
e.writeTo(c, w)
e.writeTo(c, xw)
}
fmt.Fprint(w, ")")
fmt.Fprint(xw, ")")
}
// concatenate tokens space-separated
@ -190,12 +187,12 @@ func (t concatspace) pack(c *conn) string {
return s
}
func (t concatspace) writeTo(c *conn, w io.Writer) {
func (t concatspace) writeTo(c *conn, xw io.Writer) {
for i, e := range t {
if i > 0 {
fmt.Fprint(w, " ")
fmt.Fprint(xw, " ")
}
e.writeTo(c, w)
e.writeTo(c, xw)
}
}
@ -210,9 +207,9 @@ func (t concat) pack(c *conn) string {
return s
}
func (t concat) writeTo(c *conn, w io.Writer) {
func (t concat) writeTo(c *conn, xw io.Writer) {
for _, e := range t {
e.writeTo(c, w)
e.writeTo(c, xw)
}
}
@ -234,8 +231,8 @@ next:
return string(t)
}
func (t astring) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t astring) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
// mailbox with utf7 encoding if connection requires it, or utf8 otherwise.
@ -249,8 +246,8 @@ func (t mailboxt) pack(c *conn) string {
return astring(s).pack(c)
}
func (t mailboxt) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t mailboxt) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}
type number uint32
@ -259,6 +256,6 @@ func (t number) pack(c *conn) string {
return fmt.Sprintf("%d", t)
}
func (t number) writeTo(c *conn, w io.Writer) {
w.Write([]byte(t.pack(c)))
func (t number) writeTo(c *conn, xw io.Writer) {
xw.Write([]byte(t.pack(c)))
}

View file

@ -191,14 +191,16 @@ type conn struct {
tls bool // Whether TLS has been initialized.
viaHTTPS bool // Whether this connection came in via HTTPS (using TLS ALPN).
br *bufio.Reader // From remote, with TLS unwrapped in case of TLS, and possibly wrapping inflate.
tr *moxio.TraceReader // Kept to change trace level when reading/writing cmd/auth/data.
line chan lineErr // If set, instead of reading from br, a line is read from this channel. For reading a line in IDLE while also waiting for mailbox/account updates.
lastLine string // For detecting if syntax error is fatal, i.e. if this ends with a literal. Without crlf.
bw *bufio.Writer // To remote, with TLS added in case of TLS, and possibly wrapping deflate, see conn.flateWriter.
tr *moxio.TraceReader // Kept to change trace level when reading/writing cmd/auth/data.
tw *moxio.TraceWriter
slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy.
lastlog time.Time // For printing time since previous log line.
baseTLSConfig *tls.Config // Base TLS config to use for handshake.
xbw *bufio.Writer // To remote, with TLS added in case of TLS, and possibly wrapping deflate, see conn.xflateWriter. Writes go through xtw to conn.Write, which panics on errors, hence the "x".
xtw *moxio.TraceWriter
xflateWriter *moxio.FlateWriter // For flushing output after flushing conn.xbw, and for closing.
xflateBW *bufio.Writer // Wraps raw connection writes, xflateWriter writes here, also needs flushing.
slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy.
lastlog time.Time // For printing time since previous log line.
baseTLSConfig *tls.Config // Base TLS config to use for handshake.
remoteIP net.IP
noRequireSTARTTLS bool
cmd string // Currently executing, for deciding to applyChanges and logging.
@ -208,8 +210,6 @@ type conn struct {
log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine.
enabled map[capability]bool // All upper-case.
compress bool // Whether compression is enabled, via compress command.
flateWriter *moxio.FlateWriter // For flushing output after flushing conn.bw, and for closing.
flateBW *bufio.Writer // Wraps raw connection writes, flateWriter writes here, also needs flushing.
// Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with
// value "$". When used, UIDs must be verified to still exist, because they may
@ -529,11 +529,11 @@ func (c *conn) Write(buf []byte) (int, error) {
func (c *conn) xtrace(level slog.Level) func() {
c.xflush()
c.tr.SetTrace(level)
c.tw.SetTrace(level)
c.xtw.SetTrace(level)
return func() {
c.xflush()
c.tr.SetTrace(mlog.LevelTrace)
c.tw.SetTrace(mlog.LevelTrace)
c.xtw.SetTrace(mlog.LevelTrace)
}
}
@ -641,7 +641,7 @@ func (c *conn) writelinef(format string, args ...any) {
// Buffer line for write.
func (c *conn) bwritelinef(format string, args ...any) {
format += "\r\n"
fmt.Fprintf(c.bw, format, args...)
fmt.Fprintf(c.xbw, format, args...)
}
func (c *conn) xflush() {
@ -650,7 +650,7 @@ func (c *conn) xflush() {
return
}
err := c.bw.Flush()
err := c.xbw.Flush()
xcheckf(err, "flush") // Should never happen, the Write caused by the Flush should panic on i/o error.
// If compression is enabled, we need to flush its stream.
@ -658,11 +658,11 @@ func (c *conn) xflush() {
// Note: Flush writes a sync message if there is nothing to flush. Ideally we
// wouldn't send that, but we would have to keep track of whether data needs to be
// flushed.
err := c.flateWriter.Flush()
err := c.xflateWriter.Flush()
xcheckf(err, "flush deflate")
// The flate writer writes to a bufio.Writer, we must also flush that.
err = c.flateBW.Flush()
err = c.xflateBW.Flush()
xcheckf(err, "flush deflate writer")
}
}
@ -753,8 +753,8 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x
c.tr = moxio.NewTraceReader(c.log, "C: ", c.conn)
// todo: tracing should be done on whatever comes out of c.br. the remote connection write a command plus data, and bufio can read it in one read, causing a command parser that sets the tracing level to data to have no effect. we are now typically logging sent messages, when mail clients append to the Sent mailbox.
c.br = bufio.NewReader(c.tr)
c.tw = moxio.NewTraceWriter(c.log, "S: ", c)
c.bw = bufio.NewWriter(c.tw)
c.xtw = moxio.NewTraceWriter(c.log, "S: ", c)
c.xbw = bufio.NewWriter(c.xtw)
// Many IMAP connections use IDLE to wait for new incoming messages. We'll enable
// keepalive to get a higher chance of the connection staying alive, or otherwise
@ -1144,9 +1144,9 @@ func (c *conn) command() {
// If compression was enabled, we flush & close the deflate stream.
if c.compress {
// Note: Close and flush can Write and may panic with an i/o error.
if err := c.flateWriter.Close(); err != nil {
if err := c.xflateWriter.Close(); err != nil {
c.log.Debugx("close deflate writer", err)
} else if err := c.flateBW.Flush(); err != nil {
} else if err := c.xflateBW.Flush(); err != nil {
c.log.Debugx("flush deflate buffer", err)
}
}
@ -1870,15 +1870,15 @@ func (c *conn) cmdCompress(tag, cmd string, p *parser) {
c.log.Debug("compression enabled")
c.ok(tag, cmd)
c.flateBW = bufio.NewWriter(c)
fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression)
c.xflateBW = bufio.NewWriter(c)
fw0, err := flate.NewWriter(c.xflateBW, flate.DefaultCompression)
xcheckf(err, "deflate") // Cannot happen.
fw := moxio.NewFlateWriter(fw0)
xfw := moxio.NewFlateWriter(fw0)
c.compress = true
c.flateWriter = fw
c.tw = moxio.NewTraceWriter(c.log, "S: ", c.flateWriter)
c.bw = bufio.NewWriter(c.tw) // The previous c.bw will not have buffered data.
c.xflateWriter = xfw
c.xtw = moxio.NewTraceWriter(c.log, "S: ", c.xflateWriter)
c.xbw = bufio.NewWriter(c.xtw) // The previous c.xbw will not have buffered data.
rc := xprefixConn(c.conn, c.br) // c.br may contain buffered data.
// We use the special partial reader. Some clients write commands and flush the

134
import.go
View file

@ -131,22 +131,22 @@ func xcmdXImport(mbox bool, c *cmd) {
ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
}
func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
func ctlcmdImport(xctl *ctl, mbox bool, account, mailbox, src string) {
if mbox {
ctl.xwrite("importmbox")
xctl.xwrite("importmbox")
} else {
ctl.xwrite("importmaildir")
xctl.xwrite("importmaildir")
}
ctl.xwrite(account)
xctl.xwrite(account)
if strings.EqualFold(mailbox, "Inbox") {
mailbox = "Inbox"
}
ctl.xwrite(mailbox)
ctl.xwrite(src)
ctl.xreadok()
xctl.xwrite(mailbox)
xctl.xwrite(src)
xctl.xreadok()
fmt.Fprintln(os.Stderr, "importing...")
for {
line := ctl.xread()
line := xctl.xread()
if strings.HasPrefix(line, "progress ") {
n := line[len("progress "):]
fmt.Fprintf(os.Stderr, "%s...\n", n)
@ -157,11 +157,11 @@ func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
}
break
}
count := ctl.xread()
count := xctl.xread()
fmt.Fprintf(os.Stderr, "%s imported\n", count)
}
func importctl(ctx context.Context, ctl *ctl, mbox bool) {
func ximportctl(ctx context.Context, xctl *ctl, mbox bool) {
/* protocol:
> "importmaildir" or "importmbox"
> account
@ -172,15 +172,15 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
< "ok" when done, or error
< count (of total imported messages, only if not error)
*/
account := ctl.xread()
mailbox := ctl.xread()
src := ctl.xread()
account := xctl.xread()
mailbox := xctl.xread()
src := xctl.xread()
kind := "maildir"
if mbox {
kind = "mbox"
}
ctl.log.Info("importing messages",
xctl.log.Info("importing messages",
slog.String("kind", kind),
slog.String("account", account),
slog.String("mailbox", mailbox),
@ -194,34 +194,34 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
// Ensure normalized form.
mailbox = norm.NFC.String(mailbox)
mailbox, _, err = store.CheckMailboxName(mailbox, true)
ctl.xcheck(err, "checking mailbox name")
xctl.xcheck(err, "checking mailbox name")
// Open account, creating a database file if it doesn't exist yet. It must be known
// in the configuration file.
a, err := store.OpenAccount(ctl.log, account, false)
ctl.xcheck(err, "opening account")
a, err := store.OpenAccount(xctl.log, account, false)
xctl.xcheck(err, "opening account")
defer func() {
if a != nil {
err := a.Close()
ctl.log.Check(err, "closing account after import")
xctl.log.Check(err, "closing account after import")
}
}()
err = a.ThreadingWait(ctl.log)
ctl.xcheck(err, "waiting for account thread upgrade")
err = a.ThreadingWait(xctl.log)
xctl.xcheck(err, "waiting for account thread upgrade")
defer func() {
if mboxf != nil {
err := mboxf.Close()
ctl.log.Check(err, "closing mbox file after import")
xctl.log.Check(err, "closing mbox file after import")
}
if mdnewf != nil {
err := mdnewf.Close()
ctl.log.Check(err, "closing maildir new after import")
xctl.log.Check(err, "closing maildir new after import")
}
if mdcurf != nil {
err := mdcurf.Close()
ctl.log.Check(err, "closing maildir cur after import")
xctl.log.Check(err, "closing maildir cur after import")
}
}()
@ -233,14 +233,14 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
// may be a different user who can access the files.
if mbox {
mboxf, err = os.Open(src)
ctl.xcheck(err, "open mbox file")
msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
xctl.xcheck(err, "open mbox file")
msgreader = store.NewMboxReader(xctl.log, store.CreateMessageTemp, src, mboxf)
} else {
mdnewf, err = os.Open(filepath.Join(src, "new"))
ctl.xcheck(err, "open subdir new of maildir")
xctl.xcheck(err, "open subdir new of maildir")
mdcurf, err = os.Open(filepath.Join(src, "cur"))
ctl.xcheck(err, "open subdir cur of maildir")
msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
xctl.xcheck(err, "open subdir cur of maildir")
msgreader = store.NewMaildirReader(xctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
}
// todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
@ -249,16 +249,16 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
var changes []store.Change
tx, err := a.DB.Begin(ctx, true)
ctl.xcheck(err, "begin transaction")
xctl.xcheck(err, "begin transaction")
defer func() {
if tx != nil {
err := tx.Rollback()
ctl.log.Check(err, "rolling back transaction")
xctl.log.Check(err, "rolling back transaction")
}
}()
// All preparations done. Good to go.
ctl.xwriteok()
xctl.xwriteok()
// We will be delivering messages. If we fail halfway, we need to remove the created msg files.
var newIDs []int64
@ -268,22 +268,22 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
return
}
if x != ctl.x {
ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
if x != xctl.x {
xctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
debug.PrintStack()
metrics.PanicInc(metrics.Import)
} else {
ctl.log.Error("import error")
xctl.log.Error("import error")
}
for _, id := range newIDs {
p := a.MessagePath(id)
err := os.Remove(p)
ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
xctl.log.Check(err, "closing message file after import error", slog.String("path", p))
}
newIDs = nil
ctl.xerror(fmt.Sprintf("import error: %v", x))
xctl.xerror(fmt.Sprintf("import error: %v", x))
}()
var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
@ -291,18 +291,18 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
// Ensure mailbox exists.
var mb store.Mailbox
mb, changes, err = a.MailboxEnsure(tx, mailbox, true, store.SpecialUse{}, &modseq)
ctl.xcheck(err, "ensuring mailbox exists")
xctl.xcheck(err, "ensuring mailbox exists")
nkeywords := len(mb.Keywords)
jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
jf, _, err := a.OpenJunkFilter(ctx, xctl.log)
if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
ctl.xcheck(err, "open junk filter")
xctl.xcheck(err, "open junk filter")
}
defer func() {
if jf != nil {
err = jf.CloseDiscard()
ctl.xcheck(err, "close junk filter")
xctl.xcheck(err, "close junk filter")
}
}()
@ -312,30 +312,30 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
var addSize int64
du := store.DiskUsage{ID: 1}
err = tx.Get(&du)
ctl.xcheck(err, "get disk usage")
xctl.xcheck(err, "get disk usage")
msgDirs := map[string]struct{}{}
process := func(m *store.Message, msgf *os.File, origPath string) {
defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
defer store.CloseRemoveTempFile(xctl.log, msgf, "message to import")
addSize += m.Size
if maxSize > 0 && du.MessageSize+addSize > maxSize {
ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
xctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
}
// Parse message and store parsed information for later fast retrieval.
p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
p, err := message.EnsurePart(xctl.log.Logger, false, msgf, m.Size)
if err != nil {
ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
xctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
}
m.ParsedBuf, err = json.Marshal(p)
ctl.xcheck(err, "marshal parsed message structure")
xctl.xcheck(err, "marshal parsed message structure")
// Set fields needed for future threading. By doing it now, MessageAdd won't
// have to parse the Part again.
p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
m.PrepareThreading(ctl.log, &p)
m.PrepareThreading(xctl.log, &p)
if m.Received.IsZero() {
if p.Envelope != nil && !p.Envelope.Date.IsZero() {
@ -348,10 +348,10 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
m.JunkFlagsForMailbox(mb, conf)
if jf != nil && m.NeedsTraining() {
if words, err := jf.ParseMessage(p); err != nil {
ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
xctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
} else {
err = jf.Train(ctx, !m.Junk, words)
ctl.xcheck(err, "training junk filter")
xctl.xcheck(err, "training junk filter")
m.TrainedJunk = &m.Junk
}
}
@ -359,7 +359,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
if modseq == 0 {
var err error
modseq, err = a.NextModSeq(tx)
ctl.xcheck(err, "assigning next modseq")
xctl.xcheck(err, "assigning next modseq")
mb.ModSeq = modseq
}
@ -376,8 +376,8 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
SkipUpdateDiskUsage: true, // We do this once at the end.
SkipCheckQuota: true, // We check before.
}
err = a.MessageAdd(ctl.log, tx, &mb, m, msgf, opts)
ctl.xcheck(err, "delivering message")
err = a.MessageAdd(xctl.log, tx, &mb, m, msgf, opts)
xctl.xcheck(err, "delivering message")
newIDs = append(newIDs, m.ID)
changes = append(changes, m.ChangeAddUID())
@ -385,7 +385,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
n++
if n%1000 == 0 {
ctl.xwrite(fmt.Sprintf("progress %d", n))
xctl.xwrite(fmt.Sprintf("progress %d", n))
}
}
@ -394,15 +394,15 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
if err == io.EOF {
break
}
ctl.xcheck(err, "reading next message")
xctl.xcheck(err, "reading next message")
process(m, msgf, origPath)
}
// Match threads.
if len(newIDs) > 0 {
err = a.AssignThreads(ctx, ctl.log, tx, newIDs[0], 0, io.Discard)
ctl.xcheck(err, "assigning messages to threads")
err = a.AssignThreads(ctx, xctl.log, tx, newIDs[0], 0, io.Discard)
xctl.xcheck(err, "assigning messages to threads")
}
changes = append(changes, mb.ChangeCounts())
@ -411,35 +411,35 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
}
err = tx.Update(&mb)
ctl.xcheck(err, "updating message counts and keywords in mailbox")
xctl.xcheck(err, "updating message counts and keywords in mailbox")
err = a.AddMessageSize(ctl.log, tx, addSize)
ctl.xcheck(err, "updating total message size")
err = a.AddMessageSize(xctl.log, tx, addSize)
xctl.xcheck(err, "updating total message size")
for msgDir := range msgDirs {
err := moxio.SyncDir(ctl.log, msgDir)
ctl.xcheck(err, "sync dir")
err := moxio.SyncDir(xctl.log, msgDir)
xctl.xcheck(err, "sync dir")
}
if jf != nil {
err := jf.Close()
ctl.log.Check(err, "close junk filter")
xctl.log.Check(err, "close junk filter")
jf = nil
}
err = tx.Commit()
ctl.xcheck(err, "commit")
xctl.xcheck(err, "commit")
tx = nil
ctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs)))
xctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs)))
newIDs = nil
store.BroadcastChanges(a, changes)
})
err = a.Close()
ctl.xcheck(err, "closing account")
xctl.xcheck(err, "closing account")
a = nil
ctl.xwriteok()
ctl.xwrite(fmt.Sprintf("%d", n))
xctl.xwriteok()
xctl.xwrite(fmt.Sprintf("%d", n))
}

View file

@ -1661,11 +1661,11 @@ new mail deliveries.
}
mustLoadConfig()
ctl := xctl()
ctl.xwrite("stop")
xctl := xctl()
xctl.xwrite("stop")
// Read will hang until remote has shut down.
buf := make([]byte, 128)
n, err := ctl.conn.Read(buf)
n, err := xctl.conn.Read(buf)
if err == nil {
log.Fatalf("expected eof after graceful shutdown, got data %q", buf[:n])
} else if err != io.EOF {

View file

@ -44,20 +44,20 @@ func NewComposer(w io.Writer, maxSize int64, smtputf8 bool) *Composer {
// Write implements io.Writer, but calls panic (that is handled higher up) on
// i/o errors.
func (c *Composer) Write(buf []byte) (int, error) {
if c.maxSize > 0 && c.Size+int64(len(buf)) > c.maxSize {
c.Checkf(ErrMessageSize, "writing message")
func (xc *Composer) Write(buf []byte) (int, error) {
if xc.maxSize > 0 && xc.Size+int64(len(buf)) > xc.maxSize {
xc.Checkf(ErrMessageSize, "writing message")
}
n, err := c.bw.Write(buf)
n, err := xc.bw.Write(buf)
if n > 0 {
c.Size += int64(n)
xc.Size += int64(n)
}
c.Checkf(err, "write")
xc.Checkf(err, "write")
return n, nil
}
// Checkf checks err, panicing with sentinel error value.
func (c *Composer) Checkf(err error, format string, args ...any) {
func (xc *Composer) Checkf(err error, format string, args ...any) {
if err != nil {
// We expose the original error too, needed at least for ErrMessageSize.
panic(fmt.Errorf("%w: %w: %v", ErrCompose, err, fmt.Sprintf(format, args...)))
@ -65,14 +65,14 @@ func (c *Composer) Checkf(err error, format string, args ...any) {
}
// Flush writes any buffered output.
func (c *Composer) Flush() {
err := c.bw.Flush()
c.Checkf(err, "flush")
func (xc *Composer) Flush() {
err := xc.bw.Flush()
xc.Checkf(err, "flush")
}
// Header writes a message header.
func (c *Composer) Header(k, v string) {
fmt.Fprintf(c, "%s: %s\r\n", k, v)
func (xc *Composer) Header(k, v string) {
fmt.Fprintf(xc, "%s: %s\r\n", k, v)
}
// NameAddress holds both an address display name, and an SMTP path address.
@ -82,7 +82,7 @@ type NameAddress struct {
}
// HeaderAddrs writes a message header with addresses.
func (c *Composer) HeaderAddrs(k string, l []NameAddress) {
func (xc *Composer) HeaderAddrs(k string, l []NameAddress) {
if len(l) == 0 {
return
}
@ -93,7 +93,7 @@ func (c *Composer) HeaderAddrs(k string, l []NameAddress) {
v += ","
linelen++
}
addr := mail.Address{Name: a.DisplayName, Address: a.Address.Pack(c.SMTPUTF8)}
addr := mail.Address{Name: a.DisplayName, Address: a.Address.Pack(xc.SMTPUTF8)}
s := addr.String()
if v != "" && linelen+1+len(s) > 77 {
v += "\r\n\t"
@ -105,16 +105,16 @@ func (c *Composer) HeaderAddrs(k string, l []NameAddress) {
v += s
linelen += len(s)
}
fmt.Fprintf(c, "%s: %s\r\n", k, v)
fmt.Fprintf(xc, "%s: %s\r\n", k, v)
}
// Subject writes a subject message header.
func (c *Composer) Subject(subject string) {
func (xc *Composer) Subject(subject string) {
var subjectValue string
subjectLineLen := len("Subject: ")
subjectWord := false
for i, word := range strings.Split(subject, " ") {
if !c.SMTPUTF8 && !isASCII(word) {
if !xc.SMTPUTF8 && !isASCII(word) {
word = mime.QEncoding.Encode("utf-8", word)
}
if i > 0 {
@ -129,19 +129,19 @@ func (c *Composer) Subject(subject string) {
subjectLineLen += len(word)
subjectWord = true
}
c.Header("Subject", subjectValue)
xc.Header("Subject", subjectValue)
}
// Line writes an empty line.
func (c *Composer) Line() {
_, _ = c.Write([]byte("\r\n"))
func (xc *Composer) Line() {
_, _ = xc.Write([]byte("\r\n"))
}
// TextPart prepares a text part to be added. Text should contain lines terminated
// with newlines (lf), which are replaced with crlf. The returned text may be
// quotedprintable, if needed. The returned ct and cte headers are for use with
// Content-Type and Content-Transfer-Encoding headers.
func (c *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte string) {
func (xc *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte string) {
if !strings.HasSuffix(text, "\n") {
text += "\n"
}
@ -153,10 +153,10 @@ func (c *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte stri
if NeedsQuotedPrintable(text) {
var sb strings.Builder
_, err := io.Copy(quotedprintable.NewWriter(&sb), strings.NewReader(text))
c.Checkf(err, "converting text to quoted printable")
xc.Checkf(err, "converting text to quoted printable")
text = sb.String()
cte = "quoted-printable"
} else if c.Has8bit || charset == "utf-8" {
} else if xc.Has8bit || charset == "utf-8" {
cte = "8bit"
} else {
cte = "7bit"

View file

@ -313,7 +313,7 @@ binary should be setgid that group:
homedir, err := os.UserHomeDir()
xcheckf(err, "finding homedir for storing message after failed delivery")
maildir := filepath.Join(homedir, "moxsubmit.failures")
os.Mkdir(maildir, 0700)
os.Mkdir(maildir, 0700) // Exists is no problem, failure is found during create.
f, err := os.CreateTemp(maildir, "newmsg.")
xcheckf(err, "creating temp file for storing message after failed delivery")
// note: not removing the partial file if writing/closing below fails.

View file

@ -323,14 +323,16 @@ type conn struct {
origConn net.Conn
conn net.Conn
tls bool
extRequireTLS bool // Whether to announce and allow the REQUIRETLS extension.
viaHTTPS bool // Whether the connection came in via the HTTPS port (using TLS ALPN).
resolver dns.Resolver
r *bufio.Reader
w *bufio.Writer
tr *moxio.TraceReader // Kept for changing trace level during cmd/auth/data.
tw *moxio.TraceWriter
tls bool
extRequireTLS bool // Whether to announce and allow the REQUIRETLS extension.
viaHTTPS bool // Whether the connection came in via the HTTPS port (using TLS ALPN).
resolver dns.Resolver
// The "x" in the readers and writes indicate Read and Write errors use panic to
// propagate the error.
xbr *bufio.Reader
xbw *bufio.Writer
xtr *moxio.TraceReader // Kept for changing trace level during cmd/auth/data.
xtw *moxio.TraceWriter
slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy.
lastlog time.Time // Used for printing the delta time since the previous logging for this connection.
submission bool // ../rfc/6409:19 applies
@ -692,12 +694,12 @@ func (c *conn) xcheckAuth() {
func (c *conn) xtrace(level slog.Level) func() {
c.xflush()
c.tr.SetTrace(level)
c.tw.SetTrace(level)
c.xtr.SetTrace(level)
c.xtw.SetTrace(level)
return func() {
c.xflush()
c.tr.SetTrace(mlog.LevelTrace)
c.tw.SetTrace(mlog.LevelTrace)
c.xtr.SetTrace(mlog.LevelTrace)
c.xtw.SetTrace(mlog.LevelTrace)
}
}
@ -780,7 +782,7 @@ func (c *conn) Read(buf []byte) (int, error) {
var bufpool = moxio.NewBufpool(8, 2*1024)
func (c *conn) readline() string {
line, err := bufpool.Readline(c.log, c.r)
line, err := bufpool.Readline(c.log, c.xbr)
if err != nil && errors.Is(err, moxio.ErrLineTooLong) {
c.writecodeline(smtp.C500BadSyntax, smtp.SeProto5Other0, "line too long, smtp max is 512, we reached 2048", nil)
panic(fmt.Errorf("%s (%w)", err, errIO))
@ -834,12 +836,12 @@ func (c *conn) bwritecodeline(code int, secode string, msg string, err error) {
// Buffered-write a formatted response line to connection.
func (c *conn) bwritelinef(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
fmt.Fprint(c.w, msg+"\r\n")
fmt.Fprint(c.xbw, msg+"\r\n")
}
// Flush pending buffered writes to connection.
func (c *conn) xflush() {
c.w.Flush() // Errors will have caused a panic in Write.
c.xbw.Flush() // Errors will have caused a panic in Write.
}
// Write (with flush) a response line with codes and message. err is not written, used for logging and can be nil.
@ -919,10 +921,10 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C
}
return l
})
c.tr = moxio.NewTraceReader(c.log, "RC: ", c)
c.r = bufio.NewReader(c.tr)
c.tw = moxio.NewTraceWriter(c.log, "LS: ", c)
c.w = bufio.NewWriter(c.tw)
c.xtr = moxio.NewTraceReader(c.log, "RC: ", c)
c.xbr = bufio.NewReader(c.xtr)
c.xtw = moxio.NewTraceWriter(c.log, "LS: ", c)
c.xbw = bufio.NewWriter(c.xtw)
metricConnection.WithLabelValues(c.kind()).Inc()
c.log.Info("new connection",
@ -1007,9 +1009,9 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C
// If another command is present, don't flush our buffered response yet. Holding
// off will cause us to respond with a single packet.
n := c.r.Buffered()
n := c.xbr.Buffered()
if n > 0 {
buf, err := c.r.Peek(n)
buf, err := c.xbr.Peek(n)
if err == nil && bytes.IndexByte(buf, '\n') >= 0 {
continue
}
@ -1246,9 +1248,9 @@ func (c *conn) cmdStarttls(p *parser) {
// but make sure any bytes already read and in the buffer are used for the TLS
// handshake.
conn := c.conn
if n := c.r.Buffered(); n > 0 {
if n := c.xbr.Buffered(); n > 0 {
conn = &moxio.PrefixConn{
PrefixReader: io.LimitReader(c.r, int64(n)),
PrefixReader: io.LimitReader(c.xbr, int64(n)),
Conn: conn,
}
}
@ -2121,7 +2123,7 @@ func (c *conn) cmdData(p *parser) {
}
defer store.CloseRemoveTempFile(c.log, dataFile, "smtpserver delivered message")
msgWriter := message.NewWriter(dataFile)
dr := smtp.NewDataReader(c.r)
dr := smtp.NewDataReader(c.xbr)
n, err := io.Copy(&limitWriter{maxSize: c.maxMessageSize, w: msgWriter}, dr)
c.xtrace(mlog.LevelTrace) // Restore.
if err != nil {

View file

@ -231,7 +231,7 @@ func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize in
// Does not set Seen flag for muted threads.
//
// Progress is written to progressWriter, every 100k messages.
func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, xprogressWriter io.Writer) error {
// We use a more basic version of the thread-matching algorithm describe in:
// ../rfc/5256:443
// The algorithm assumes you'll select messages, then group into threads. We normally do
@ -240,6 +240,9 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
// soon as we process them. We can handle large number of messages, but not very
// quickly because we make lots of database queries.
// xprogressWriter can call panic on write errors, when assigning threads through a
// ctl command.
type childMsg struct {
ID int64 // This message will be fetched and updated with the threading fields once the parent is resolved.
MessageID string // Of child message. Once child is resolved, its own children can be resolved too.
@ -533,18 +536,18 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
nassigned += n
if nassigned%100000 == 0 {
log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
return fmt.Errorf("writing progress: %v", err)
}
}
}
if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
return fmt.Errorf("writing progress: %v", err)
}
log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
return fmt.Errorf("writing progress: %v", err)
}