mox/ctl.go
Mechiel Lukkien 28fae96a9b
make mox compile on windows, without "mox serve" but with working "mox localserve"
getting mox to compile required changing code in only a few places where
package "syscall" was used: for accessing file access times and for umask
handling. an open problem is how to start a process as an unprivileged user on
windows.  that's why "mox serve" isn't implemented yet. and just finding a way
to implement it now may not be good enough in the near future: we may want to
starting using a more complete privilege separation approach, with a process
handling sensitive tasks (handling private keys, authentication), where we may
want to pass file descriptors between processes. how would that work on
windows?

anyway, getting mox to compile for windows doesn't mean it works properly on
windows. the largest issue: mox would normally open a file, rename or remove
it, and finally close it. this happens during message delivery. that doesn't
work on windows, the rename/remove would fail because the file is still open.
so this commit swaps many "remove" and "close" calls. renames are a longer
story: message delivery had two ways to deliver: with "consuming" the
(temporary) message file (which would rename it to its final destination), and
without consuming (by hardlinking the file, falling back to copying). the last
delivery to a recipient of a message (and the only one in the common case of a
single recipient) would consume the message, and the earlier recipients would
not.  during delivery, the already open message file was used, to parse the
message.  we still want to use that open message file, and the caller now stays
responsible for closing it, but we no longer try to rename (consume) the file.
we always hardlink (or copy) during delivery (this works on windows), and the
caller is responsible for closing and removing (in that order) the original
temporary file. this does cost one syscall more. but it makes the delivery code
(responsibilities) a bit simpler.

there is one more obvious issue: the file system path separator. mox already
used the "filepath" package to join paths in many places, but not everywhere.
and it still used strings with slashes for local file access. with this commit,
the code now uses filepath.FromSlash for path strings with slashes, uses
"filepath" in a few more places where it previously didn't. also switches from
"filepath" to regular "path" package when handling mailbox names in a few
places, because those always use forward slashes, regardless of local file
system conventions.  windows can handle forward slashes when opening files, so
test code that passes path strings with forward slashes straight to go stdlib
file i/o functions are left unchanged to reduce code churn. the regular
non-test code, or test code that uses path strings in places other than
standard i/o functions, does have the paths converted for consistent paths
(otherwise we would end up with paths with mixed forward/backward slashes in
log messages).

windows cannot dup a listening socket. for "mox localserve", it isn't
important, and we can work around the issue. the current approach for "mox
serve" (forking a process and passing file descriptors of listening sockets on
"privileged" ports) won't work on windows. perhaps it isn't needed on windows,
and any user can listen on "privileged" ports? that would be welcome.

on windows, os.Open cannot open a directory, so we cannot call Sync on it after
message delivery. a cursory internet search indicates that directories cannot
be synced on windows. the story is probably much more nuanced than that, with
long deep technical details/discussions/disagreement/confusion, like on unix.
for "mox localserve" we can get away with making syncdir a no-op.
2023-10-14 10:54:07 +02:00

995 lines
25 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/queue"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/store"
)
// ctl represents a connection to the ctl unix domain socket of a running mox instance.
// ctl provides functions to read/write commands/responses/data streams.
type ctl struct {
cmd string // Set for server-side of commands.
conn net.Conn
r *bufio.Reader // Set for first reader.
x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
log *mlog.Log // If set, along with x, logging is done here.
}
// xctl opens a ctl connection.
func xctl() *ctl {
p := mox.DataDirPath("ctl")
conn, err := net.Dial("unix", p)
if err != nil {
log.Fatalf("connecting to control socket at %q: %v", p, err)
}
ctl := &ctl{conn: conn}
version := ctl.xread()
if version != "ctlv0" {
log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
}
return ctl
}
// Interpret msg as an error.
// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
func (c *ctl) xerror(msg string) {
if c.x == nil {
log.Fatalln(msg)
}
c.log.Debugx("ctl error", fmt.Errorf("%s", msg), mlog.Field("cmd", c.cmd))
c.xwrite(msg)
panic(c.x)
}
// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
// ctl.x is set, the error string is written to ctl, to be interpreted as an error
// by the command reading from ctl.
func (c *ctl) xcheck(err error, msg string) {
if err == nil {
return
}
if c.x == nil {
log.Fatalf("%s: %s", msg, err)
}
c.log.Debugx(msg, err, mlog.Field("cmd", c.cmd))
fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
panic(c.x)
}
// Read a line and return it without trailing newline.
func (c *ctl) xread() string {
if c.r == nil {
c.r = bufio.NewReader(c.conn)
}
line, err := c.r.ReadString('\n')
c.xcheck(err, "read from ctl")
return strings.TrimSuffix(line, "\n")
}
// Read a line. If not "ok", the string is interpreted as an error.
func (c *ctl) xreadok() {
line := c.xread()
if line != "ok" {
c.xerror(line)
}
}
// Write a string, typically a command or parameter.
func (c *ctl) xwrite(text string) {
_, err := fmt.Fprintln(c.conn, text)
c.xcheck(err, "write")
}
// Write "ok" to indicate success.
func (c *ctl) xwriteok() {
c.xwrite("ok")
}
// Copy data from a stream from ctl to dst.
func (c *ctl) xstreamto(dst io.Writer) {
_, err := io.Copy(dst, c.reader())
c.xcheck(err, "reading message")
}
// Copy data from src to a stream to ctl.
func (c *ctl) xstreamfrom(src io.Reader) {
w := c.writer()
_, err := io.Copy(w, src)
c.xcheck(err, "copying")
w.xclose()
}
// Writer returns an io.Writer for a data stream to ctl.
// When done writing, caller must call xclose to signal the end of the stream.
// Behaviour of "x" is copied from ctl.
func (c *ctl) writer() *ctlwriter {
return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
}
// Reader returns an io.Reader for a data stream from ctl.
// Behaviour of "x" is copied from ctl.
func (c *ctl) reader() *ctlreader {
if c.r == nil {
c.r = bufio.NewReader(c.conn)
}
return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
}
/*
Ctlwriter and ctlreader implement the writing and reading a data stream. They
implement the io.Writer and io.Reader interface. In the protocol below each
non-data message ends with a newline that is typically stripped when
interpreting.
Zero or more data transactions:
> "123" (for data size) or an error message
> data, 123 bytes
< "ok" or an error message
Followed by a end of stream indicated by zero data bytes message:
> "0"
*/
type ctlwriter struct {
cmd string // Set for server-side of commands.
conn net.Conn // Ctl socket from which messages are read.
buf []byte // Scratch buffer, for reading response.
x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
log *mlog.Log
}
func (s *ctlwriter) Write(buf []byte) (int, error) {
_, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
s.xcheck(err, "write count")
_, err = s.conn.Write(buf)
s.xcheck(err, "write data")
if s.buf == nil {
s.buf = make([]byte, 512)
}
n, err := s.conn.Read(s.buf)
s.xcheck(err, "reading response to write")
line := strings.TrimSuffix(string(s.buf[:n]), "\n")
if line != "ok" {
s.xerror(line)
}
return len(buf), nil
}
func (s *ctlwriter) xerror(msg string) {
if s.x == nil {
log.Fatalln(msg)
} else {
s.log.Debugx("error", fmt.Errorf("%s", msg), mlog.Field("cmd", s.cmd))
panic(s.x)
}
}
func (s *ctlwriter) xcheck(err error, msg string) {
if err == nil {
return
}
if s.x == nil {
log.Fatalf("%s: %s", msg, err)
} else {
s.log.Debugx(msg, err, mlog.Field("cmd", s.cmd))
panic(s.x)
}
}
func (s *ctlwriter) xclose() {
_, err := fmt.Fprintf(s.conn, "0\n")
s.xcheck(err, "write eof")
}
type ctlreader struct {
cmd string // Set for server-side of command.
conn net.Conn // For writing "ok" after reading.
r *bufio.Reader // Buffered ctl socket.
err error // If set, returned for each read. can also be io.EOF.
npending int // Number of bytes that can still be read until a new count line must be read.
x any // If set, errors are handled with panic(x) instead of log.Fatal.
log *mlog.Log // If x is set, logging goes to log.
}
func (s *ctlreader) Read(buf []byte) (N int, Err error) {
if s.err != nil {
return 0, s.err
}
if s.npending == 0 {
line, err := s.r.ReadString('\n')
s.xcheck(err, "reading count")
line = strings.TrimSuffix(line, "\n")
n, err := strconv.ParseInt(line, 10, 32)
if err != nil {
s.xerror(line)
}
if n == 0 {
s.err = io.EOF
return 0, s.err
}
s.npending = int(n)
}
rn := len(buf)
if rn > s.npending {
rn = s.npending
}
n, err := s.r.Read(buf[:rn])
s.xcheck(err, "read from ctl")
s.npending -= n
if s.npending == 0 {
_, err = fmt.Fprintln(s.conn, "ok")
s.xcheck(err, "writing ok after reading")
}
return n, err
}
func (s *ctlreader) xerror(msg string) {
if s.x == nil {
log.Fatalln(msg)
} else {
s.log.Debugx("error", fmt.Errorf("%s", msg), mlog.Field("cmd", s.cmd))
panic(s.x)
}
}
func (s *ctlreader) xcheck(err error, msg string) {
if err == nil {
return
}
if s.x == nil {
log.Fatalf("%s: %s", msg, err)
} else {
s.log.Debugx(msg, err, mlog.Field("cmd", s.cmd))
panic(s.x)
}
}
// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
func servectl(ctx context.Context, log *mlog.Log, conn net.Conn, shutdown func()) {
log.Debug("ctl connection")
var stop = struct{}{} // Sentinel value for panic and recover.
ctl := &ctl{conn: conn, x: stop, log: log}
defer func() {
x := recover()
if x == nil || x == stop {
return
}
log.Error("servectl panic", mlog.Field("err", x), mlog.Field("cmd", ctl.cmd))
debug.PrintStack()
metrics.PanicInc(metrics.Ctl)
}()
defer conn.Close()
ctl.xwrite("ctlv0")
for {
servectlcmd(ctx, ctl, shutdown)
}
}
func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
log := ctl.log
cmd := ctl.xread()
ctl.cmd = cmd
log.Info("ctl command", mlog.Field("cmd", cmd))
switch cmd {
case "stop":
shutdown()
os.Exit(0)
case "deliver":
/* The protocol, double quoted are literals.
> "deliver"
> address
< "ok"
> stream
< "ok"
*/
to := ctl.xread()
a, addr, err := store.OpenEmail(to)
ctl.xcheck(err, "lookup destination address")
msgFile, err := store.CreateMessageTemp("ctl-deliver")
ctl.xcheck(err, "creating temporary message file")
defer func() {
name := msgFile.Name()
err := msgFile.Close()
log.Check(err, "closing temporary message file")
err = os.Remove(name)
log.Check(err, "removing temporary message file", mlog.Field("path", name))
}()
mw := message.NewWriter(msgFile)
ctl.xwriteok()
ctl.xstreamto(mw)
err = msgFile.Sync()
ctl.xcheck(err, "syncing message to storage")
m := &store.Message{
Received: time.Now(),
Size: mw.Size,
}
a.WithWLock(func() {
err := a.DeliverDestination(log, addr, m, msgFile)
ctl.xcheck(err, "delivering message")
log.Info("message delivered through ctl", mlog.Field("to", to))
})
err = a.Close()
ctl.xcheck(err, "closing account")
ctl.xwriteok()
case "setaccountpassword":
/* protocol:
> "setaccountpassword"
> account
> password
< "ok" or error
*/
account := ctl.xread()
pw := ctl.xread()
acc, err := store.OpenAccount(account)
ctl.xcheck(err, "open account")
defer func() {
if acc != nil {
err := acc.Close()
log.Check(err, "closing account after setting password")
}
}()
err = acc.SetPassword(pw)
ctl.xcheck(err, "setting password")
err = acc.Close()
ctl.xcheck(err, "closing account")
acc = nil
ctl.xwriteok()
case "queue":
/* protocol:
> "queue"
< "ok"
< stream
*/
qmsgs, err := queue.List(ctx)
ctl.xcheck(err, "listing queue")
ctl.xwriteok()
xw := ctl.writer()
fmt.Fprintln(xw, "queue:")
for _, qm := range qmsgs {
var lastAttempt string
if qm.LastAttempt != nil {
lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
}
fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastError)
}
if len(qmsgs) == 0 {
fmt.Fprint(xw, "(empty)\n")
}
xw.xclose()
case "queuekick":
/* protocol:
> "queuekick"
> id
> todomain
> recipient
> transport // if empty, transport is left unchanged; in future, we may want to differtiate between "leave unchanged" and "set to empty string".
< count
< "ok" or error
*/
idstr := ctl.xread()
todomain := ctl.xread()
recipient := ctl.xread()
transport := ctl.xread()
id, err := strconv.ParseInt(idstr, 10, 64)
if err != nil {
ctl.xwrite("0")
ctl.xcheck(err, "parsing id")
}
var xtransport *string
if transport != "" {
xtransport = &transport
}
count, err := queue.Kick(ctx, id, todomain, recipient, xtransport)
ctl.xcheck(err, "kicking queue")
ctl.xwrite(fmt.Sprintf("%d", count))
ctl.xwriteok()
case "queuedrop":
/* protocol:
> "queuedrop"
> id
> todomain
> recipient
< count
< "ok" or error
*/
idstr := ctl.xread()
todomain := ctl.xread()
recipient := ctl.xread()
id, err := strconv.ParseInt(idstr, 10, 64)
if err != nil {
ctl.xwrite("0")
ctl.xcheck(err, "parsing id")
}
count, err := queue.Drop(ctx, id, todomain, recipient)
ctl.xcheck(err, "dropping messages from queue")
ctl.xwrite(fmt.Sprintf("%d", count))
ctl.xwriteok()
case "queuedump":
/* protocol:
> "queuedump"
> id
< "ok" or error
< stream
*/
idstr := ctl.xread()
id, err := strconv.ParseInt(idstr, 10, 64)
if err != nil {
ctl.xcheck(err, "parsing id")
}
mr, err := queue.OpenMessage(ctx, id)
ctl.xcheck(err, "opening message")
defer func() {
err := mr.Close()
log.Check(err, "closing message from queue")
}()
ctl.xwriteok()
ctl.xstreamfrom(mr)
case "importmaildir", "importmbox":
mbox := cmd == "importmbox"
importctl(ctx, ctl, mbox)
case "domainadd":
/* protocol:
> "domainadd"
> domain
> account
> localpart
< "ok" or error
*/
domain := ctl.xread()
account := ctl.xread()
localpart := ctl.xread()
d, err := dns.ParseDomain(domain)
ctl.xcheck(err, "parsing domain")
err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
ctl.xcheck(err, "adding domain")
ctl.xwriteok()
case "domainrm":
/* protocol:
> "domainrm"
> domain
< "ok" or error
*/
domain := ctl.xread()
d, err := dns.ParseDomain(domain)
ctl.xcheck(err, "parsing domain")
err = mox.DomainRemove(ctx, d)
ctl.xcheck(err, "removing domain")
ctl.xwriteok()
case "accountadd":
/* protocol:
> "accountadd"
> account
> address
< "ok" or error
*/
account := ctl.xread()
address := ctl.xread()
err := mox.AccountAdd(ctx, account, address)
ctl.xcheck(err, "adding account")
ctl.xwriteok()
case "accountrm":
/* protocol:
> "accountrm"
> account
< "ok" or error
*/
account := ctl.xread()
err := mox.AccountRemove(ctx, account)
ctl.xcheck(err, "removing account")
ctl.xwriteok()
case "addressadd":
/* protocol:
> "addressadd"
> address
> account
< "ok" or error
*/
address := ctl.xread()
account := ctl.xread()
err := mox.AddressAdd(ctx, address, account)
ctl.xcheck(err, "adding address")
ctl.xwriteok()
case "addressrm":
/* protocol:
> "addressrm"
> address
< "ok" or error
*/
address := ctl.xread()
err := mox.AddressRemove(ctx, address)
ctl.xcheck(err, "removing address")
ctl.xwriteok()
case "loglevels":
/* protocol:
> "loglevels"
< "ok"
< stream
*/
ctl.xwriteok()
l := mox.Conf.LogLevels()
keys := []string{}
for k := range l {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
s := ""
for _, k := range keys {
ks := k
if ks == "" {
ks = "(default)"
}
s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
}
ctl.xstreamfrom(strings.NewReader(s))
case "setloglevels":
/* protocol:
> "setloglevels"
> pkg
> level (if empty, log level for pkg will be unset)
< "ok" or error
*/
pkg := ctl.xread()
levelstr := ctl.xread()
if levelstr == "" {
mox.Conf.LogLevelRemove(pkg)
} else {
level, ok := mlog.Levels[levelstr]
if !ok {
ctl.xerror("bad level")
}
mox.Conf.LogLevelSet(pkg, level)
}
ctl.xwriteok()
case "retrain":
/* protocol:
> "retrain"
> account
< "ok" or error
*/
account := ctl.xread()
acc, err := store.OpenAccount(account)
ctl.xcheck(err, "open account")
defer func() {
if acc != nil {
err := acc.Close()
log.Check(err, "closing account after retraining")
}
}()
acc.WithWLock(func() {
conf, _ := acc.Conf()
if conf.JunkFilter == nil {
ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
}
// Remove existing junk filter files.
basePath := mox.DataDirPath("accounts")
dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
err := os.Remove(dbPath)
log.Check(err, "removing old junkfilter database file", mlog.Field("path", dbPath))
err = os.Remove(bloomPath)
log.Check(err, "removing old junkfilter bloom filter file", mlog.Field("path", bloomPath))
// Open junk filter, this creates new files.
jf, _, err := acc.OpenJunkFilter(ctx, ctl.log)
ctl.xcheck(err, "open new junk filter")
defer func() {
if jf == nil {
return
}
err := jf.Close()
log.Check(err, "closing junk filter during cleanup")
}()
// Read through messages with junk or nonjunk flag set, and train them.
var total, trained int
q := bstore.QueryDB[store.Message](ctx, acc.DB)
q.FilterEqual("Expunged", false)
err = q.ForEach(func(m store.Message) error {
total++
ok, err := acc.TrainMessage(ctx, ctl.log, jf, m)
if ok {
trained++
}
return err
})
ctl.xcheck(err, "training messages")
ctl.log.Info("retrained messages", mlog.Field("total", total), mlog.Field("trained", trained))
// Close junk filter, marking success.
err = jf.Close()
jf = nil
ctl.xcheck(err, "closing junk filter")
})
ctl.xwriteok()
case "recalculatemailboxcounts":
/* protocol:
> "recalculatemailboxcounts"
> account
< "ok" or error
< stream
*/
account := ctl.xread()
acc, err := store.OpenAccount(account)
ctl.xcheck(err, "open account")
defer func() {
if acc != nil {
err := acc.Close()
log.Check(err, "closing account after recalculating mailbox counts")
}
}()
ctl.xwriteok()
w := ctl.writer()
acc.WithWLock(func() {
var changes []store.Change
err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
return bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
mc, err := mb.CalculateCounts(tx)
if err != nil {
return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
}
if !mb.HaveCounts || mc != mb.MailboxCounts {
_, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
ctl.xcheck(err, "write")
mb.HaveCounts = true
mb.MailboxCounts = mc
if err := tx.Update(&mb); err != nil {
return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
}
changes = append(changes, mb.ChangeCounts())
}
return nil
})
})
ctl.xcheck(err, "write transaction for mailbox counts")
store.BroadcastChanges(acc, changes)
})
w.xclose()
case "fixmsgsize":
/* protocol:
> "fixmsgsize"
> account or empty
< "ok" or error
< stream
*/
accountOpt := ctl.xread()
ctl.xwriteok()
w := ctl.writer()
var foundProblem bool
const batchSize = 10000
xfixmsgsize := func(accName string) {
acc, err := store.OpenAccount(accName)
ctl.xcheck(err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account after fixing message sizes")
}()
total := 0
var lastID int64
for {
var n int
acc.WithRLock(func() {
mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
// Don't process all message in one transaction, we could block the account for too long.
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(batchSize)
q.SortAsc("ID")
return q.ForEach(func(m store.Message) error {
lastID = m.ID
n++
p := acc.MessagePath(m.ID)
st, err := os.Stat(p)
if err != nil {
mb := store.Mailbox{ID: m.MailboxID}
if xerr := tx.Get(&mb); xerr != nil {
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
ctl.xcheck(werr, "write")
}
_, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
ctl.xcheck(werr, "write")
return nil
}
filesize := st.Size()
correctSize := int64(len(m.MsgPrefix)) + filesize
if m.Size == correctSize {
return nil
}
foundProblem = true
mb := store.Mailbox{ID: m.MailboxID}
if err := tx.Get(&mb); err != nil {
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
ctl.xcheck(werr, "write")
}
_, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
ctl.xcheck(err, "write")
// We assume that the original message size was accounted as stored in the mailbox
// total size. If this isn't correct, the user can always run
// recalculatemailboxcounts.
mb.Size -= m.Size
mb.Size += correctSize
if err := tx.Update(&mb); err != nil {
return fmt.Errorf("update mailbox counts: %v", err)
}
mailboxCounts[mb.ID] = mb
m.Size = correctSize
mr := acc.MessageReader(m)
part, err := message.EnsurePart(log, false, mr, m.Size)
if err != nil {
_, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
ctl.xcheck(werr, "write")
}
m.ParsedBuf, err = json.Marshal(part)
if err != nil {
return fmt.Errorf("marshal parsed message: %v", err)
}
total++
if err := tx.Update(&m); err != nil {
return fmt.Errorf("update message: %v", err)
}
return nil
})
})
ctl.xcheck(err, "find and fix wrong message sizes")
var changes []store.Change
for _, mb := range mailboxCounts {
changes = append(changes, mb.ChangeCounts())
}
store.BroadcastChanges(acc, changes)
})
if n < batchSize {
break
}
}
_, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
ctl.xcheck(err, "write")
}
if accountOpt != "" {
xfixmsgsize(accountOpt)
} else {
for i, accName := range mox.Conf.Accounts() {
var line string
if i > 0 {
line = "\n"
}
_, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
ctl.xcheck(err, "write")
xfixmsgsize(accName)
}
}
if foundProblem {
_, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
ctl.xcheck(err, "write")
}
w.xclose()
case "reparse":
/* protocol:
> "reparse"
> account or empty
< "ok" or error
< stream
*/
accountOpt := ctl.xread()
ctl.xwriteok()
w := ctl.writer()
const batchSize = 100
xreparseAccount := func(accName string) {
acc, err := store.OpenAccount(accName)
ctl.xcheck(err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account after reparsing messages")
}()
total := 0
var lastID int64
for {
var n int
// Don't process all message in one transaction, we could block the account for too long.
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(batchSize)
q.SortAsc("ID")
return q.ForEach(func(m store.Message) error {
lastID = m.ID
mr := acc.MessageReader(m)
p, err := message.EnsurePart(log, false, mr, m.Size)
if err != nil {
_, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
ctl.xcheck(err, "write")
}
m.ParsedBuf, err = json.Marshal(p)
if err != nil {
return fmt.Errorf("marshal parsed message: %v", err)
}
total++
n++
if err := tx.Update(&m); err != nil {
return fmt.Errorf("update message: %v", err)
}
return nil
})
})
ctl.xcheck(err, "update messages with parsed mime structure")
if n < batchSize {
break
}
}
_, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
ctl.xcheck(err, "write")
}
if accountOpt != "" {
xreparseAccount(accountOpt)
} else {
for i, accName := range mox.Conf.Accounts() {
var line string
if i > 0 {
line = "\n"
}
_, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
ctl.xcheck(err, "write")
xreparseAccount(accName)
}
}
w.xclose()
case "reassignthreads":
/* protocol:
> "reassignthreads"
> account or empty
< "ok" or error
< stream
*/
accountOpt := ctl.xread()
ctl.xwriteok()
w := ctl.writer()
xreassignThreads := func(accName string) {
acc, err := store.OpenAccount(accName)
ctl.xcheck(err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account after reassigning threads")
}()
// We don't want to step on an existing upgrade process.
err = acc.ThreadingWait(ctl.log)
ctl.xcheck(err, "waiting for threading upgrade to finish")
// todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
// todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
const batchSize = 50000
total, err := acc.ResetThreading(ctx, ctl.log, batchSize, true)
ctl.xcheck(err, "resetting threading fields")
_, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
ctl.xcheck(err, "write")
// Assign threads again. Ideally we would do this in a single transaction, but
// bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
err = acc.AssignThreads(ctx, ctl.log, nil, 0, 50000, w)
ctl.xcheck(err, "reassign threads")
_, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
ctl.xcheck(err, "write")
}
if accountOpt != "" {
xreassignThreads(accountOpt)
} else {
for i, accName := range mox.Conf.Accounts() {
var line string
if i > 0 {
line = "\n"
}
_, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
ctl.xcheck(err, "write")
xreassignThreads(accName)
}
}
w.xclose()
case "backup":
backupctl(ctx, ctl)
default:
log.Info("unrecognized command", mlog.Field("cmd", cmd))
ctl.xwrite("unrecognized command")
return
}
}