mirror of
https://github.com/mjl-/mox.git
synced 2024-12-27 08:53:48 +03:00
40ade995a5
- add option to put messages in the queue "on hold", preventing delivery attempts until taken off hold again. - add "hold rules", to automatically mark some/all submitted messages as "on hold", e.g. from a specific account or to a specific domain. - add operation to "fail" a message, causing a DSN to be delivered to the sender. previously we could only drop a message from the queue. - update admin page & add new cli tools for these operations, with new filtering rules for selecting the messages to operate on. in the admin interface, add filtering and checkboxes to select a set of messages to operate on.
1151 lines
29 KiB
Go
1151 lines
29 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/dns"
|
|
"github.com/mjl-/mox/message"
|
|
"github.com/mjl-/mox/metrics"
|
|
"github.com/mjl-/mox/mlog"
|
|
"github.com/mjl-/mox/mox-"
|
|
"github.com/mjl-/mox/queue"
|
|
"github.com/mjl-/mox/smtp"
|
|
"github.com/mjl-/mox/store"
|
|
)
|
|
|
|
// ctl represents a connection to the ctl unix domain socket of a running mox instance.
|
|
// ctl provides functions to read/write commands/responses/data streams.
|
|
type ctl struct {
|
|
cmd string // Set for server-side of commands.
|
|
conn net.Conn
|
|
r *bufio.Reader // Set for first reader.
|
|
x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
|
|
log mlog.Log // If set, along with x, logging is done here.
|
|
}
|
|
|
|
// xctl opens a ctl connection.
|
|
func xctl() *ctl {
|
|
p := mox.DataDirPath("ctl")
|
|
conn, err := net.Dial("unix", p)
|
|
if err != nil {
|
|
log.Fatalf("connecting to control socket at %q: %v", p, err)
|
|
}
|
|
ctl := &ctl{conn: conn}
|
|
version := ctl.xread()
|
|
if version != "ctlv0" {
|
|
log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
|
|
}
|
|
return ctl
|
|
}
|
|
|
|
// Interpret msg as an error.
|
|
// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
|
|
func (c *ctl) xerror(msg string) {
|
|
if c.x == nil {
|
|
log.Fatalln(msg)
|
|
}
|
|
c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
|
|
c.xwrite(msg)
|
|
panic(c.x)
|
|
}
|
|
|
|
// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
|
|
// ctl.x is set, the error string is written to ctl, to be interpreted as an error
|
|
// by the command reading from ctl.
|
|
func (c *ctl) xcheck(err error, msg string) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
if c.x == nil {
|
|
log.Fatalf("%s: %s", msg, err)
|
|
}
|
|
c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
|
|
fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
|
|
panic(c.x)
|
|
}
|
|
|
|
// Read a line and return it without trailing newline.
|
|
func (c *ctl) xread() string {
|
|
if c.r == nil {
|
|
c.r = bufio.NewReader(c.conn)
|
|
}
|
|
line, err := c.r.ReadString('\n')
|
|
c.xcheck(err, "read from ctl")
|
|
return strings.TrimSuffix(line, "\n")
|
|
}
|
|
|
|
// Read a line. If not "ok", the string is interpreted as an error.
|
|
func (c *ctl) xreadok() {
|
|
line := c.xread()
|
|
if line != "ok" {
|
|
c.xerror(line)
|
|
}
|
|
}
|
|
|
|
// Write a string, typically a command or parameter.
|
|
func (c *ctl) xwrite(text string) {
|
|
_, err := fmt.Fprintln(c.conn, text)
|
|
c.xcheck(err, "write")
|
|
}
|
|
|
|
// Write "ok" to indicate success.
|
|
func (c *ctl) xwriteok() {
|
|
c.xwrite("ok")
|
|
}
|
|
|
|
// Copy data from a stream from ctl to dst.
|
|
func (c *ctl) xstreamto(dst io.Writer) {
|
|
_, err := io.Copy(dst, c.reader())
|
|
c.xcheck(err, "reading message")
|
|
}
|
|
|
|
// Copy data from src to a stream to ctl.
|
|
func (c *ctl) xstreamfrom(src io.Reader) {
|
|
w := c.writer()
|
|
_, err := io.Copy(w, src)
|
|
c.xcheck(err, "copying")
|
|
w.xclose()
|
|
}
|
|
|
|
// Writer returns an io.Writer for a data stream to ctl.
|
|
// When done writing, caller must call xclose to signal the end of the stream.
|
|
// Behaviour of "x" is copied from ctl.
|
|
func (c *ctl) writer() *ctlwriter {
|
|
return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
|
|
}
|
|
|
|
// Reader returns an io.Reader for a data stream from ctl.
|
|
// Behaviour of "x" is copied from ctl.
|
|
func (c *ctl) reader() *ctlreader {
|
|
if c.r == nil {
|
|
c.r = bufio.NewReader(c.conn)
|
|
}
|
|
return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
|
|
}
|
|
|
|
/*
|
|
Ctlwriter and ctlreader implement the writing and reading a data stream. They
|
|
implement the io.Writer and io.Reader interface. In the protocol below each
|
|
non-data message ends with a newline that is typically stripped when
|
|
interpreting.
|
|
|
|
Zero or more data transactions:
|
|
|
|
> "123" (for data size) or an error message
|
|
> data, 123 bytes
|
|
< "ok" or an error message
|
|
|
|
Followed by a end of stream indicated by zero data bytes message:
|
|
|
|
> "0"
|
|
*/
|
|
|
|
type ctlwriter struct {
|
|
cmd string // Set for server-side of commands.
|
|
conn net.Conn // Ctl socket from which messages are read.
|
|
buf []byte // Scratch buffer, for reading response.
|
|
x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
|
|
log mlog.Log
|
|
}
|
|
|
|
func (s *ctlwriter) Write(buf []byte) (int, error) {
|
|
_, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
|
|
s.xcheck(err, "write count")
|
|
_, err = s.conn.Write(buf)
|
|
s.xcheck(err, "write data")
|
|
if s.buf == nil {
|
|
s.buf = make([]byte, 512)
|
|
}
|
|
n, err := s.conn.Read(s.buf)
|
|
s.xcheck(err, "reading response to write")
|
|
line := strings.TrimSuffix(string(s.buf[:n]), "\n")
|
|
if line != "ok" {
|
|
s.xerror(line)
|
|
}
|
|
return len(buf), nil
|
|
}
|
|
|
|
func (s *ctlwriter) xerror(msg string) {
|
|
if s.x == nil {
|
|
log.Fatalln(msg)
|
|
} else {
|
|
s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
|
|
panic(s.x)
|
|
}
|
|
}
|
|
|
|
func (s *ctlwriter) xcheck(err error, msg string) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
if s.x == nil {
|
|
log.Fatalf("%s: %s", msg, err)
|
|
} else {
|
|
s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
|
|
panic(s.x)
|
|
}
|
|
}
|
|
|
|
func (s *ctlwriter) xclose() {
|
|
_, err := fmt.Fprintf(s.conn, "0\n")
|
|
s.xcheck(err, "write eof")
|
|
}
|
|
|
|
type ctlreader struct {
|
|
cmd string // Set for server-side of command.
|
|
conn net.Conn // For writing "ok" after reading.
|
|
r *bufio.Reader // Buffered ctl socket.
|
|
err error // If set, returned for each read. can also be io.EOF.
|
|
npending int // Number of bytes that can still be read until a new count line must be read.
|
|
x any // If set, errors are handled with panic(x) instead of log.Fatal.
|
|
log mlog.Log // If x is set, logging goes to log.
|
|
}
|
|
|
|
func (s *ctlreader) Read(buf []byte) (N int, Err error) {
|
|
if s.err != nil {
|
|
return 0, s.err
|
|
}
|
|
if s.npending == 0 {
|
|
line, err := s.r.ReadString('\n')
|
|
s.xcheck(err, "reading count")
|
|
line = strings.TrimSuffix(line, "\n")
|
|
n, err := strconv.ParseInt(line, 10, 32)
|
|
if err != nil {
|
|
s.xerror(line)
|
|
}
|
|
if n == 0 {
|
|
s.err = io.EOF
|
|
return 0, s.err
|
|
}
|
|
s.npending = int(n)
|
|
}
|
|
rn := len(buf)
|
|
if rn > s.npending {
|
|
rn = s.npending
|
|
}
|
|
n, err := s.r.Read(buf[:rn])
|
|
s.xcheck(err, "read from ctl")
|
|
s.npending -= n
|
|
if s.npending == 0 {
|
|
_, err = fmt.Fprintln(s.conn, "ok")
|
|
s.xcheck(err, "writing ok after reading")
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (s *ctlreader) xerror(msg string) {
|
|
if s.x == nil {
|
|
log.Fatalln(msg)
|
|
} else {
|
|
s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
|
|
panic(s.x)
|
|
}
|
|
}
|
|
|
|
func (s *ctlreader) xcheck(err error, msg string) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
if s.x == nil {
|
|
log.Fatalf("%s: %s", msg, err)
|
|
} else {
|
|
s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
|
|
panic(s.x)
|
|
}
|
|
}
|
|
|
|
// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
|
|
func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
|
|
log.Debug("ctl connection")
|
|
|
|
var stop = struct{}{} // Sentinel value for panic and recover.
|
|
ctl := &ctl{conn: conn, x: stop, log: log}
|
|
defer func() {
|
|
x := recover()
|
|
if x == nil || x == stop {
|
|
return
|
|
}
|
|
log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
|
|
debug.PrintStack()
|
|
metrics.PanicInc(metrics.Ctl)
|
|
}()
|
|
|
|
defer conn.Close()
|
|
|
|
ctl.xwrite("ctlv0")
|
|
for {
|
|
servectlcmd(ctx, ctl, shutdown)
|
|
}
|
|
}
|
|
|
|
func xparseFilters(ctl *ctl, s string) (f queue.Filter) {
|
|
dec := json.NewDecoder(strings.NewReader(s))
|
|
dec.DisallowUnknownFields()
|
|
err := dec.Decode(&f)
|
|
ctl.xcheck(err, "parsing filters")
|
|
return f
|
|
}
|
|
|
|
func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
|
|
log := ctl.log
|
|
cmd := ctl.xread()
|
|
ctl.cmd = cmd
|
|
log.Info("ctl command", slog.String("cmd", cmd))
|
|
switch cmd {
|
|
case "stop":
|
|
shutdown()
|
|
os.Exit(0)
|
|
|
|
case "deliver":
|
|
/* The protocol, double quoted are literals.
|
|
|
|
> "deliver"
|
|
> address
|
|
< "ok"
|
|
> stream
|
|
< "ok"
|
|
*/
|
|
|
|
to := ctl.xread()
|
|
a, addr, err := store.OpenEmail(log, to)
|
|
ctl.xcheck(err, "lookup destination address")
|
|
|
|
msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
|
|
ctl.xcheck(err, "creating temporary message file")
|
|
defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
|
|
mw := message.NewWriter(msgFile)
|
|
ctl.xwriteok()
|
|
|
|
ctl.xstreamto(mw)
|
|
err = msgFile.Sync()
|
|
ctl.xcheck(err, "syncing message to storage")
|
|
|
|
m := &store.Message{
|
|
Received: time.Now(),
|
|
Size: mw.Size,
|
|
}
|
|
|
|
a.WithWLock(func() {
|
|
err := a.DeliverDestination(log, addr, m, msgFile)
|
|
ctl.xcheck(err, "delivering message")
|
|
log.Info("message delivered through ctl", slog.Any("to", to))
|
|
})
|
|
|
|
err = a.Close()
|
|
ctl.xcheck(err, "closing account")
|
|
ctl.xwriteok()
|
|
|
|
case "setaccountpassword":
|
|
/* protocol:
|
|
> "setaccountpassword"
|
|
> account
|
|
> password
|
|
< "ok" or error
|
|
*/
|
|
|
|
account := ctl.xread()
|
|
pw := ctl.xread()
|
|
|
|
acc, err := store.OpenAccount(log, account)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
if acc != nil {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after setting password")
|
|
}
|
|
}()
|
|
|
|
err = acc.SetPassword(log, pw)
|
|
ctl.xcheck(err, "setting password")
|
|
err = acc.Close()
|
|
ctl.xcheck(err, "closing account")
|
|
acc = nil
|
|
ctl.xwriteok()
|
|
|
|
case "queueholdruleslist":
|
|
/* protocol:
|
|
> "queueholdruleslist"
|
|
< "ok"
|
|
< stream
|
|
*/
|
|
l, err := queue.HoldRuleList(ctx)
|
|
ctl.xcheck(err, "listing hold rules")
|
|
ctl.xwriteok()
|
|
xw := ctl.writer()
|
|
fmt.Fprintln(xw, "hold rules:")
|
|
for _, hr := range l {
|
|
var elems []string
|
|
if hr.Account != "" {
|
|
elems = append(elems, fmt.Sprintf("account %q", hr.Account))
|
|
}
|
|
var zerodom dns.Domain
|
|
if hr.SenderDomain != zerodom {
|
|
elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
|
|
}
|
|
if hr.RecipientDomain != zerodom {
|
|
elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
|
|
}
|
|
if len(elems) == 0 {
|
|
fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
|
|
} else {
|
|
fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
|
|
}
|
|
}
|
|
if len(l) == 0 {
|
|
fmt.Fprint(xw, "(none)\n")
|
|
}
|
|
xw.xclose()
|
|
|
|
case "queueholdrulesadd":
|
|
/* protocol:
|
|
> "queueholdrulesadd"
|
|
> account
|
|
> senderdomainstr
|
|
> recipientdomainstr
|
|
< "ok" or error
|
|
*/
|
|
var hr queue.HoldRule
|
|
hr.Account = ctl.xread()
|
|
senderdomstr := ctl.xread()
|
|
rcptdomstr := ctl.xread()
|
|
var err error
|
|
hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
|
|
ctl.xcheck(err, "parsing sender domain")
|
|
hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
|
|
ctl.xcheck(err, "parsing recipient domain")
|
|
hr, err = queue.HoldRuleAdd(ctx, log, hr)
|
|
ctl.xcheck(err, "add hold rule")
|
|
ctl.xwriteok()
|
|
|
|
case "queueholdrulesremove":
|
|
/* protocol:
|
|
> "queueholdrulesremove"
|
|
> id
|
|
< "ok" or error
|
|
*/
|
|
id, err := strconv.ParseInt(ctl.xread(), 10, 64)
|
|
ctl.xcheck(err, "parsing id")
|
|
err = queue.HoldRuleRemove(ctx, log, id)
|
|
ctl.xcheck(err, "remove hold rule")
|
|
ctl.xwriteok()
|
|
|
|
case "queuelist":
|
|
/* protocol:
|
|
> "queue"
|
|
> queuefilters as json
|
|
< "ok"
|
|
< stream
|
|
*/
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
qmsgs, err := queue.List(ctx, f)
|
|
ctl.xcheck(err, "listing queue")
|
|
ctl.xwriteok()
|
|
|
|
xw := ctl.writer()
|
|
fmt.Fprintln(xw, "messages:")
|
|
for _, qm := range qmsgs {
|
|
var lastAttempt string
|
|
if qm.LastAttempt != nil {
|
|
lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
|
|
}
|
|
fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastError)
|
|
}
|
|
if len(qmsgs) == 0 {
|
|
fmt.Fprint(xw, "(none)\n")
|
|
}
|
|
xw.xclose()
|
|
|
|
case "queueholdset":
|
|
/* protocol:
|
|
> "queueholdset"
|
|
> queuefilters as json
|
|
> "true" or "false"
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
hold := ctl.xread() == "true"
|
|
count, err := queue.HoldSet(ctx, f, hold)
|
|
ctl.xcheck(err, "setting on hold status for messages")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queueschedule":
|
|
/* protocol:
|
|
> "queueschedule"
|
|
> queuefilters as json
|
|
> relative to now
|
|
> duration
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
relnow := ctl.xread()
|
|
d, err := time.ParseDuration(ctl.xread())
|
|
ctl.xcheck(err, "parsing duration for next delivery attempt")
|
|
var count int
|
|
if relnow == "" {
|
|
count, err = queue.NextAttemptAdd(ctx, f, d)
|
|
} else {
|
|
count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
|
|
}
|
|
ctl.xcheck(err, "setting next delivery attempts in queue")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queuetransport":
|
|
/* protocol:
|
|
> "queuetransport"
|
|
> queuefilters as json
|
|
> transport
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
transport := ctl.xread()
|
|
count, err := queue.TransportSet(ctx, f, transport)
|
|
ctl.xcheck(err, "adding to next delivery attempts in queue")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queuerequiretls":
|
|
/* protocol:
|
|
> "queuerequiretls"
|
|
> queuefilters as json
|
|
> reqtls (empty string, "true" or "false")
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
reqtls := ctl.xread()
|
|
var req *bool
|
|
switch reqtls {
|
|
case "":
|
|
case "true":
|
|
v := true
|
|
req = &v
|
|
case "false":
|
|
v := false
|
|
req = &v
|
|
default:
|
|
ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
|
|
}
|
|
count, err := queue.RequireTLSSet(ctx, f, req)
|
|
ctl.xcheck(err, "setting tls requirements on messages in queue")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queuefail":
|
|
/* protocol:
|
|
> "queuefail"
|
|
> queuefilters as json
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
count, err := queue.Fail(ctx, log, f)
|
|
ctl.xcheck(err, "marking messages from queue as failed")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queuedrop":
|
|
/* protocol:
|
|
> "queuedrop"
|
|
> queuefilters as json
|
|
< "ok" or error
|
|
< count
|
|
*/
|
|
|
|
fs := ctl.xread()
|
|
f := xparseFilters(ctl, fs)
|
|
count, err := queue.Drop(ctx, log, f)
|
|
ctl.xcheck(err, "dropping messages from queue")
|
|
ctl.xwriteok()
|
|
ctl.xwrite(fmt.Sprintf("%d", count))
|
|
|
|
case "queuedump":
|
|
/* protocol:
|
|
> "queuedump"
|
|
> id
|
|
< "ok" or error
|
|
< stream
|
|
*/
|
|
|
|
idstr := ctl.xread()
|
|
id, err := strconv.ParseInt(idstr, 10, 64)
|
|
if err != nil {
|
|
ctl.xcheck(err, "parsing id")
|
|
}
|
|
mr, err := queue.OpenMessage(ctx, id)
|
|
ctl.xcheck(err, "opening message")
|
|
defer func() {
|
|
err := mr.Close()
|
|
log.Check(err, "closing message from queue")
|
|
}()
|
|
ctl.xwriteok()
|
|
ctl.xstreamfrom(mr)
|
|
|
|
case "importmaildir", "importmbox":
|
|
mbox := cmd == "importmbox"
|
|
importctl(ctx, ctl, mbox)
|
|
|
|
case "domainadd":
|
|
/* protocol:
|
|
> "domainadd"
|
|
> domain
|
|
> account
|
|
> localpart
|
|
< "ok" or error
|
|
*/
|
|
domain := ctl.xread()
|
|
account := ctl.xread()
|
|
localpart := ctl.xread()
|
|
d, err := dns.ParseDomain(domain)
|
|
ctl.xcheck(err, "parsing domain")
|
|
err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
|
|
ctl.xcheck(err, "adding domain")
|
|
ctl.xwriteok()
|
|
|
|
case "domainrm":
|
|
/* protocol:
|
|
> "domainrm"
|
|
> domain
|
|
< "ok" or error
|
|
*/
|
|
domain := ctl.xread()
|
|
d, err := dns.ParseDomain(domain)
|
|
ctl.xcheck(err, "parsing domain")
|
|
err = mox.DomainRemove(ctx, d)
|
|
ctl.xcheck(err, "removing domain")
|
|
ctl.xwriteok()
|
|
|
|
case "accountadd":
|
|
/* protocol:
|
|
> "accountadd"
|
|
> account
|
|
> address
|
|
< "ok" or error
|
|
*/
|
|
account := ctl.xread()
|
|
address := ctl.xread()
|
|
err := mox.AccountAdd(ctx, account, address)
|
|
ctl.xcheck(err, "adding account")
|
|
ctl.xwriteok()
|
|
|
|
case "accountrm":
|
|
/* protocol:
|
|
> "accountrm"
|
|
> account
|
|
< "ok" or error
|
|
*/
|
|
account := ctl.xread()
|
|
err := mox.AccountRemove(ctx, account)
|
|
ctl.xcheck(err, "removing account")
|
|
ctl.xwriteok()
|
|
|
|
case "addressadd":
|
|
/* protocol:
|
|
> "addressadd"
|
|
> address
|
|
> account
|
|
< "ok" or error
|
|
*/
|
|
address := ctl.xread()
|
|
account := ctl.xread()
|
|
err := mox.AddressAdd(ctx, address, account)
|
|
ctl.xcheck(err, "adding address")
|
|
ctl.xwriteok()
|
|
|
|
case "addressrm":
|
|
/* protocol:
|
|
> "addressrm"
|
|
> address
|
|
< "ok" or error
|
|
*/
|
|
address := ctl.xread()
|
|
err := mox.AddressRemove(ctx, address)
|
|
ctl.xcheck(err, "removing address")
|
|
ctl.xwriteok()
|
|
|
|
case "loglevels":
|
|
/* protocol:
|
|
> "loglevels"
|
|
< "ok"
|
|
< stream
|
|
*/
|
|
ctl.xwriteok()
|
|
l := mox.Conf.LogLevels()
|
|
keys := []string{}
|
|
for k := range l {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Slice(keys, func(i, j int) bool {
|
|
return keys[i] < keys[j]
|
|
})
|
|
s := ""
|
|
for _, k := range keys {
|
|
ks := k
|
|
if ks == "" {
|
|
ks = "(default)"
|
|
}
|
|
s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
|
|
}
|
|
ctl.xstreamfrom(strings.NewReader(s))
|
|
|
|
case "setloglevels":
|
|
/* protocol:
|
|
> "setloglevels"
|
|
> pkg
|
|
> level (if empty, log level for pkg will be unset)
|
|
< "ok" or error
|
|
*/
|
|
pkg := ctl.xread()
|
|
levelstr := ctl.xread()
|
|
if levelstr == "" {
|
|
mox.Conf.LogLevelRemove(log, pkg)
|
|
} else {
|
|
level, ok := mlog.Levels[levelstr]
|
|
if !ok {
|
|
ctl.xerror("bad level")
|
|
}
|
|
mox.Conf.LogLevelSet(log, pkg, level)
|
|
}
|
|
ctl.xwriteok()
|
|
|
|
case "retrain":
|
|
/* protocol:
|
|
> "retrain"
|
|
> account
|
|
< "ok" or error
|
|
*/
|
|
account := ctl.xread()
|
|
acc, err := store.OpenAccount(log, account)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
if acc != nil {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after retraining")
|
|
}
|
|
}()
|
|
|
|
acc.WithWLock(func() {
|
|
conf, _ := acc.Conf()
|
|
if conf.JunkFilter == nil {
|
|
ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
|
|
}
|
|
|
|
// Remove existing junk filter files.
|
|
basePath := mox.DataDirPath("accounts")
|
|
dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
|
|
bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
|
|
err := os.Remove(dbPath)
|
|
log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
|
|
err = os.Remove(bloomPath)
|
|
log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
|
|
|
|
// Open junk filter, this creates new files.
|
|
jf, _, err := acc.OpenJunkFilter(ctx, log)
|
|
ctl.xcheck(err, "open new junk filter")
|
|
defer func() {
|
|
if jf == nil {
|
|
return
|
|
}
|
|
err := jf.Close()
|
|
log.Check(err, "closing junk filter during cleanup")
|
|
}()
|
|
|
|
// Read through messages with junk or nonjunk flag set, and train them.
|
|
var total, trained int
|
|
q := bstore.QueryDB[store.Message](ctx, acc.DB)
|
|
q.FilterEqual("Expunged", false)
|
|
err = q.ForEach(func(m store.Message) error {
|
|
total++
|
|
ok, err := acc.TrainMessage(ctx, log, jf, m)
|
|
if ok {
|
|
trained++
|
|
}
|
|
return err
|
|
})
|
|
ctl.xcheck(err, "training messages")
|
|
log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
|
|
|
|
// Close junk filter, marking success.
|
|
err = jf.Close()
|
|
jf = nil
|
|
ctl.xcheck(err, "closing junk filter")
|
|
})
|
|
ctl.xwriteok()
|
|
|
|
case "recalculatemailboxcounts":
|
|
/* protocol:
|
|
> "recalculatemailboxcounts"
|
|
> account
|
|
< "ok" or error
|
|
< stream
|
|
*/
|
|
account := ctl.xread()
|
|
acc, err := store.OpenAccount(log, account)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
if acc != nil {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after recalculating mailbox counts")
|
|
}
|
|
}()
|
|
ctl.xwriteok()
|
|
|
|
w := ctl.writer()
|
|
|
|
acc.WithWLock(func() {
|
|
var changes []store.Change
|
|
err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
|
|
var totalSize int64
|
|
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
|
|
mc, err := mb.CalculateCounts(tx)
|
|
if err != nil {
|
|
return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
|
|
}
|
|
totalSize += mc.Size
|
|
|
|
if !mb.HaveCounts || mc != mb.MailboxCounts {
|
|
_, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
|
|
ctl.xcheck(err, "write")
|
|
mb.HaveCounts = true
|
|
mb.MailboxCounts = mc
|
|
if err := tx.Update(&mb); err != nil {
|
|
return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
|
|
}
|
|
changes = append(changes, mb.ChangeCounts())
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
du := store.DiskUsage{ID: 1}
|
|
if err := tx.Get(&du); err != nil {
|
|
return fmt.Errorf("get disk usage: %v", err)
|
|
}
|
|
if du.MessageSize != totalSize {
|
|
_, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
|
|
ctl.xcheck(err, "write")
|
|
du.MessageSize = totalSize
|
|
if err := tx.Update(&du); err != nil {
|
|
return fmt.Errorf("update disk usage: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
ctl.xcheck(err, "write transaction for mailbox counts")
|
|
|
|
store.BroadcastChanges(acc, changes)
|
|
})
|
|
w.xclose()
|
|
|
|
case "fixmsgsize":
|
|
/* protocol:
|
|
> "fixmsgsize"
|
|
> account or empty
|
|
< "ok" or error
|
|
< stream
|
|
*/
|
|
|
|
accountOpt := ctl.xread()
|
|
ctl.xwriteok()
|
|
w := ctl.writer()
|
|
|
|
var foundProblem bool
|
|
const batchSize = 10000
|
|
|
|
xfixmsgsize := func(accName string) {
|
|
acc, err := store.OpenAccount(log, accName)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after fixing message sizes")
|
|
}()
|
|
|
|
total := 0
|
|
var lastID int64
|
|
for {
|
|
var n int
|
|
|
|
acc.WithRLock(func() {
|
|
mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
|
|
|
|
// Don't process all message in one transaction, we could block the account for too long.
|
|
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
|
|
q := bstore.QueryTx[store.Message](tx)
|
|
q.FilterEqual("Expunged", false)
|
|
q.FilterGreater("ID", lastID)
|
|
q.Limit(batchSize)
|
|
q.SortAsc("ID")
|
|
return q.ForEach(func(m store.Message) error {
|
|
lastID = m.ID
|
|
n++
|
|
|
|
p := acc.MessagePath(m.ID)
|
|
st, err := os.Stat(p)
|
|
if err != nil {
|
|
mb := store.Mailbox{ID: m.MailboxID}
|
|
if xerr := tx.Get(&mb); xerr != nil {
|
|
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
|
|
ctl.xcheck(werr, "write")
|
|
}
|
|
_, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
|
|
ctl.xcheck(werr, "write")
|
|
return nil
|
|
}
|
|
filesize := st.Size()
|
|
correctSize := int64(len(m.MsgPrefix)) + filesize
|
|
if m.Size == correctSize {
|
|
return nil
|
|
}
|
|
|
|
foundProblem = true
|
|
|
|
mb := store.Mailbox{ID: m.MailboxID}
|
|
if err := tx.Get(&mb); err != nil {
|
|
_, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
|
|
ctl.xcheck(werr, "write")
|
|
}
|
|
_, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
|
|
ctl.xcheck(err, "write")
|
|
|
|
// We assume that the original message size was accounted as stored in the mailbox
|
|
// total size. If this isn't correct, the user can always run
|
|
// recalculatemailboxcounts.
|
|
mb.Size -= m.Size
|
|
mb.Size += correctSize
|
|
if err := tx.Update(&mb); err != nil {
|
|
return fmt.Errorf("update mailbox counts: %v", err)
|
|
}
|
|
mailboxCounts[mb.ID] = mb
|
|
|
|
m.Size = correctSize
|
|
|
|
mr := acc.MessageReader(m)
|
|
part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
|
|
if err != nil {
|
|
_, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
|
|
ctl.xcheck(werr, "write")
|
|
}
|
|
m.ParsedBuf, err = json.Marshal(part)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal parsed message: %v", err)
|
|
}
|
|
total++
|
|
if err := tx.Update(&m); err != nil {
|
|
return fmt.Errorf("update message: %v", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
})
|
|
ctl.xcheck(err, "find and fix wrong message sizes")
|
|
|
|
var changes []store.Change
|
|
for _, mb := range mailboxCounts {
|
|
changes = append(changes, mb.ChangeCounts())
|
|
}
|
|
store.BroadcastChanges(acc, changes)
|
|
})
|
|
if n < batchSize {
|
|
break
|
|
}
|
|
}
|
|
_, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
|
|
ctl.xcheck(err, "write")
|
|
}
|
|
|
|
if accountOpt != "" {
|
|
xfixmsgsize(accountOpt)
|
|
} else {
|
|
for i, accName := range mox.Conf.Accounts() {
|
|
var line string
|
|
if i > 0 {
|
|
line = "\n"
|
|
}
|
|
_, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
|
|
ctl.xcheck(err, "write")
|
|
xfixmsgsize(accName)
|
|
}
|
|
}
|
|
if foundProblem {
|
|
_, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
|
|
ctl.xcheck(err, "write")
|
|
}
|
|
|
|
w.xclose()
|
|
|
|
case "reparse":
|
|
/* protocol:
|
|
> "reparse"
|
|
> account or empty
|
|
< "ok" or error
|
|
< stream
|
|
*/
|
|
|
|
accountOpt := ctl.xread()
|
|
ctl.xwriteok()
|
|
w := ctl.writer()
|
|
|
|
const batchSize = 100
|
|
|
|
xreparseAccount := func(accName string) {
|
|
acc, err := store.OpenAccount(log, accName)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after reparsing messages")
|
|
}()
|
|
|
|
total := 0
|
|
var lastID int64
|
|
for {
|
|
var n int
|
|
// Don't process all message in one transaction, we could block the account for too long.
|
|
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
|
|
q := bstore.QueryTx[store.Message](tx)
|
|
q.FilterEqual("Expunged", false)
|
|
q.FilterGreater("ID", lastID)
|
|
q.Limit(batchSize)
|
|
q.SortAsc("ID")
|
|
return q.ForEach(func(m store.Message) error {
|
|
lastID = m.ID
|
|
mr := acc.MessageReader(m)
|
|
p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
|
|
if err != nil {
|
|
_, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
|
|
ctl.xcheck(err, "write")
|
|
}
|
|
m.ParsedBuf, err = json.Marshal(p)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal parsed message: %v", err)
|
|
}
|
|
total++
|
|
n++
|
|
if err := tx.Update(&m); err != nil {
|
|
return fmt.Errorf("update message: %v", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
})
|
|
ctl.xcheck(err, "update messages with parsed mime structure")
|
|
if n < batchSize {
|
|
break
|
|
}
|
|
}
|
|
_, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
|
|
ctl.xcheck(err, "write")
|
|
}
|
|
|
|
if accountOpt != "" {
|
|
xreparseAccount(accountOpt)
|
|
} else {
|
|
for i, accName := range mox.Conf.Accounts() {
|
|
var line string
|
|
if i > 0 {
|
|
line = "\n"
|
|
}
|
|
_, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
|
|
ctl.xcheck(err, "write")
|
|
xreparseAccount(accName)
|
|
}
|
|
}
|
|
w.xclose()
|
|
|
|
case "reassignthreads":
|
|
/* protocol:
|
|
> "reassignthreads"
|
|
> account or empty
|
|
< "ok" or error
|
|
< stream
|
|
*/
|
|
|
|
accountOpt := ctl.xread()
|
|
ctl.xwriteok()
|
|
w := ctl.writer()
|
|
|
|
xreassignThreads := func(accName string) {
|
|
acc, err := store.OpenAccount(log, accName)
|
|
ctl.xcheck(err, "open account")
|
|
defer func() {
|
|
err := acc.Close()
|
|
log.Check(err, "closing account after reassigning threads")
|
|
}()
|
|
|
|
// We don't want to step on an existing upgrade process.
|
|
err = acc.ThreadingWait(log)
|
|
ctl.xcheck(err, "waiting for threading upgrade to finish")
|
|
// todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
|
|
|
|
// todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
|
|
const batchSize = 50000
|
|
total, err := acc.ResetThreading(ctx, log, batchSize, true)
|
|
ctl.xcheck(err, "resetting threading fields")
|
|
_, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
|
|
ctl.xcheck(err, "write")
|
|
|
|
// Assign threads again. Ideally we would do this in a single transaction, but
|
|
// bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
|
|
err = acc.AssignThreads(ctx, log, nil, 0, 50000, w)
|
|
ctl.xcheck(err, "reassign threads")
|
|
|
|
_, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
|
|
ctl.xcheck(err, "write")
|
|
}
|
|
|
|
if accountOpt != "" {
|
|
xreassignThreads(accountOpt)
|
|
} else {
|
|
for i, accName := range mox.Conf.Accounts() {
|
|
var line string
|
|
if i > 0 {
|
|
line = "\n"
|
|
}
|
|
_, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
|
|
ctl.xcheck(err, "write")
|
|
xreassignThreads(accName)
|
|
}
|
|
}
|
|
w.xclose()
|
|
|
|
case "backup":
|
|
backupctl(ctx, ctl)
|
|
|
|
default:
|
|
log.Info("unrecognized command", slog.String("cmd", cmd))
|
|
ctl.xwrite("unrecognized command")
|
|
return
|
|
}
|
|
}
|