mirror of
https://github.com/mjl-/mox.git
synced 2025-01-14 01:06:27 +03:00
895 lines
24 KiB
Go
895 lines
24 KiB
Go
package webaccount
|
|
|
|
import (
|
|
"archive/tar"
|
|
"archive/zip"
|
|
"bufio"
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
cryptrand "crypto/rand"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"golang.org/x/exp/maps"
|
|
"golang.org/x/exp/slog"
|
|
"golang.org/x/text/unicode/norm"
|
|
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/message"
|
|
"github.com/mjl-/mox/metrics"
|
|
"github.com/mjl-/mox/mlog"
|
|
"github.com/mjl-/mox/mox-"
|
|
"github.com/mjl-/mox/store"
|
|
)
|
|
|
|
type importListener struct {
|
|
Token string
|
|
Events chan importEvent
|
|
Register chan bool // Whether register is successful.
|
|
}
|
|
|
|
type importEvent struct {
|
|
Token string
|
|
SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
|
|
Event any // nil, importCount, importProblem, importDone, importAborted
|
|
Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
|
|
}
|
|
|
|
type importAbortRequest struct {
|
|
Token string
|
|
Response chan error
|
|
}
|
|
|
|
var importers = struct {
|
|
Register chan *importListener
|
|
Unregister chan *importListener
|
|
Events chan importEvent
|
|
Abort chan importAbortRequest
|
|
}{
|
|
make(chan *importListener, 1),
|
|
make(chan *importListener, 1),
|
|
make(chan importEvent),
|
|
make(chan importAbortRequest),
|
|
}
|
|
|
|
// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
|
|
func ImportManage() {
|
|
log := mlog.New("httpimport", nil)
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
log.Error("import manage panic", slog.Any("err", x))
|
|
debug.PrintStack()
|
|
metrics.PanicInc(metrics.Importmanage)
|
|
}
|
|
}()
|
|
|
|
type state struct {
|
|
MailboxCounts map[string]int
|
|
Problems []string
|
|
Done *time.Time
|
|
Aborted *time.Time
|
|
Listeners map[*importListener]struct{}
|
|
Cancel func()
|
|
}
|
|
|
|
imports := map[string]state{} // Token to state.
|
|
for {
|
|
select {
|
|
case l := <-importers.Register:
|
|
// If we have state, send it so the client is up to date.
|
|
if s, ok := imports[l.Token]; ok {
|
|
l.Register <- true
|
|
s.Listeners[l] = struct{}{}
|
|
|
|
sendEvent := func(kind string, v any) {
|
|
buf, err := json.Marshal(v)
|
|
if err != nil {
|
|
log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
|
|
return
|
|
}
|
|
ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
|
|
|
|
select {
|
|
case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
|
|
default:
|
|
log.Debug("dropped initial import event to slow consumer")
|
|
}
|
|
}
|
|
|
|
for m, c := range s.MailboxCounts {
|
|
sendEvent("count", importCount{m, c})
|
|
}
|
|
for _, p := range s.Problems {
|
|
sendEvent("problem", importProblem{p})
|
|
}
|
|
if s.Done != nil {
|
|
sendEvent("done", importDone{})
|
|
} else if s.Aborted != nil {
|
|
sendEvent("aborted", importAborted{})
|
|
}
|
|
} else {
|
|
l.Register <- false
|
|
}
|
|
|
|
case l := <-importers.Unregister:
|
|
delete(imports[l.Token].Listeners, l)
|
|
|
|
case e := <-importers.Events:
|
|
s, ok := imports[e.Token]
|
|
if !ok {
|
|
s = state{
|
|
MailboxCounts: map[string]int{},
|
|
Listeners: map[*importListener]struct{}{},
|
|
Cancel: e.Cancel,
|
|
}
|
|
imports[e.Token] = s
|
|
}
|
|
for l := range s.Listeners {
|
|
select {
|
|
case l.Events <- e:
|
|
default:
|
|
log.Debug("dropped import event to slow consumer")
|
|
}
|
|
}
|
|
if e.Event != nil {
|
|
s := imports[e.Token]
|
|
switch x := e.Event.(type) {
|
|
case importCount:
|
|
s.MailboxCounts[x.Mailbox] = x.Count
|
|
case importProblem:
|
|
s.Problems = append(s.Problems, x.Message)
|
|
case importDone:
|
|
now := time.Now()
|
|
s.Done = &now
|
|
case importAborted:
|
|
now := time.Now()
|
|
s.Aborted = &now
|
|
}
|
|
imports[e.Token] = s
|
|
}
|
|
|
|
case a := <-importers.Abort:
|
|
s, ok := imports[a.Token]
|
|
if !ok {
|
|
a.Response <- errors.New("import not found")
|
|
return
|
|
}
|
|
if s.Done != nil {
|
|
a.Response <- errors.New("import already finished")
|
|
return
|
|
}
|
|
s.Cancel()
|
|
a.Response <- nil
|
|
}
|
|
|
|
// Cleanup old state.
|
|
for t, s := range imports {
|
|
if len(s.Listeners) > 0 {
|
|
continue
|
|
}
|
|
if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
|
|
delete(imports, t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type importCount struct {
|
|
Mailbox string
|
|
Count int
|
|
}
|
|
type importProblem struct {
|
|
Message string
|
|
}
|
|
type importDone struct{}
|
|
type importAborted struct{}
|
|
type importStep struct {
|
|
Title string
|
|
}
|
|
|
|
// importStart prepare the import and launches the goroutine to actually import.
|
|
// importStart is responsible for closing f and removing f.
|
|
func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
|
|
defer func() {
|
|
if f != nil {
|
|
store.CloseRemoveTempFile(log, f, "upload for import")
|
|
}
|
|
}()
|
|
|
|
buf := make([]byte, 16)
|
|
if _, err := cryptrand.Read(buf); err != nil {
|
|
return "", false, err
|
|
}
|
|
token := fmt.Sprintf("%x", buf)
|
|
|
|
if _, err := f.Seek(0, 0); err != nil {
|
|
return "", false, fmt.Errorf("seek to start of file: %v", err)
|
|
}
|
|
|
|
// Recognize file format.
|
|
var iszip bool
|
|
magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
|
|
magicGzip := []byte{0x1f, 0x8b}
|
|
magic := make([]byte, 4)
|
|
if _, err := f.ReadAt(magic, 0); err != nil {
|
|
return "", true, fmt.Errorf("detecting file format: %v", err)
|
|
}
|
|
if bytes.Equal(magic, magicZip) {
|
|
iszip = true
|
|
} else if !bytes.Equal(magic[:2], magicGzip) {
|
|
return "", true, fmt.Errorf("file is not a zip or gzip file")
|
|
}
|
|
|
|
var zr *zip.Reader
|
|
var tr *tar.Reader
|
|
if iszip {
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
|
|
}
|
|
zr, err = zip.NewReader(f, fi.Size())
|
|
if err != nil {
|
|
return "", true, fmt.Errorf("opening zip file: %v", err)
|
|
}
|
|
} else {
|
|
gzr, err := gzip.NewReader(f)
|
|
if err != nil {
|
|
return "", true, fmt.Errorf("gunzip: %v", err)
|
|
}
|
|
tr = tar.NewReader(gzr)
|
|
}
|
|
|
|
acc, err := store.OpenAccount(log, accName)
|
|
if err != nil {
|
|
return "", false, fmt.Errorf("open acount: %v", err)
|
|
}
|
|
acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
|
|
|
|
tx, err := acc.DB.Begin(context.Background(), true)
|
|
if err != nil {
|
|
acc.Unlock()
|
|
xerr := acc.Close()
|
|
log.Check(xerr, "closing account")
|
|
return "", false, fmt.Errorf("start transaction: %v", err)
|
|
}
|
|
|
|
// Ensure token is registered before returning, with context that can be canceled.
|
|
ctx, cancel := context.WithCancel(mox.Shutdown)
|
|
importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
|
|
|
|
log.Info("starting import")
|
|
go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
|
|
f = nil // importMessages is now responsible for closing and removing.
|
|
|
|
return token, false, nil
|
|
}
|
|
|
|
// importMessages imports the messages from zip/tgz file f.
|
|
// importMessages is responsible for unlocking and closing acc, and closing tx and f.
|
|
func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
|
|
// If a fatal processing error occurs, we panic with this type.
|
|
type importError struct{ Err error }
|
|
|
|
// During import we collect all changes and broadcast them at the end, when successful.
|
|
var changes []store.Change
|
|
|
|
// ID's of delivered messages. If we have to rollback, we have to remove this files.
|
|
var deliveredIDs []int64
|
|
|
|
sendEvent := func(kind string, v any) {
|
|
buf, err := json.Marshal(v)
|
|
if err != nil {
|
|
log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
|
|
return
|
|
}
|
|
ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
|
|
importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
|
|
}
|
|
|
|
canceled := func() bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
sendEvent("aborted", importAborted{})
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
problemf := func(format string, args ...any) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
sendEvent("problem", importProblem{Message: msg})
|
|
}
|
|
|
|
defer func() {
|
|
store.CloseRemoveTempFile(log, f, "uploaded messages")
|
|
|
|
for _, id := range deliveredIDs {
|
|
p := acc.MessagePath(id)
|
|
err := os.Remove(p)
|
|
log.Check(err, "closing message file after import error", slog.String("path", p))
|
|
}
|
|
if tx != nil {
|
|
err := tx.Rollback()
|
|
log.Check(err, "rolling back transaction")
|
|
}
|
|
if acc != nil {
|
|
acc.Unlock()
|
|
err := acc.Close()
|
|
log.Check(err, "closing account")
|
|
}
|
|
|
|
x := recover()
|
|
if x == nil {
|
|
return
|
|
}
|
|
if err, ok := x.(importError); ok {
|
|
log.Errorx("import error", err.Err)
|
|
problemf("%s (aborting)", err.Err)
|
|
sendEvent("aborted", importAborted{})
|
|
} else {
|
|
log.Error("import panic", slog.Any("err", x))
|
|
debug.PrintStack()
|
|
metrics.PanicInc(metrics.Importmessages)
|
|
}
|
|
}()
|
|
|
|
ximportcheckf := func(err error, format string, args ...any) {
|
|
if err != nil {
|
|
panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
|
|
}
|
|
}
|
|
|
|
err := acc.ThreadingWait(log)
|
|
ximportcheckf(err, "waiting for account thread upgrade")
|
|
|
|
conf, _ := acc.Conf()
|
|
|
|
jf, _, err := acc.OpenJunkFilter(ctx, log)
|
|
if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
|
|
ximportcheckf(err, "open junk filter")
|
|
}
|
|
defer func() {
|
|
if jf != nil {
|
|
err := jf.CloseDiscard()
|
|
log.Check(err, "closing junk filter")
|
|
}
|
|
}()
|
|
|
|
// Mailboxes we imported, and message counts.
|
|
mailboxes := map[string]store.Mailbox{}
|
|
messages := map[string]int{}
|
|
|
|
maxSize := acc.QuotaMessageSize()
|
|
du := store.DiskUsage{ID: 1}
|
|
err = tx.Get(&du)
|
|
ximportcheckf(err, "get disk usage")
|
|
var addSize int64
|
|
|
|
// For maildirs, we are likely to get a possible dovecot-keywords file after having
|
|
// imported the messages. Once we see the keywords, we use them. But before that
|
|
// time we remember which messages miss a keywords. Once the keywords become
|
|
// available, we'll fix up the flags for the unknown messages
|
|
mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
|
|
mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
|
|
|
|
// We keep the mailboxes we deliver to up to date with count and keywords (non-system flags).
|
|
destMailboxCounts := map[int64]store.MailboxCounts{}
|
|
destMailboxKeywords := map[int64]map[string]bool{}
|
|
|
|
// Previous mailbox an event was sent for. We send an event for new mailboxes, when
|
|
// another 100 messages were added, when adding a message to another mailbox, and
|
|
// finally at the end as a closing statement.
|
|
var prevMailbox string
|
|
|
|
var modseq store.ModSeq // Assigned on first message, used for all messages.
|
|
|
|
trainMessage := func(m *store.Message, p message.Part, pos string) {
|
|
words, err := jf.ParseMessage(p)
|
|
if err != nil {
|
|
problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
|
|
return
|
|
}
|
|
err = jf.Train(ctx, !m.Junk, words)
|
|
if err != nil {
|
|
problemf("training junk filter for message %s: %v (continuing)", pos, err)
|
|
return
|
|
}
|
|
m.TrainedJunk = &m.Junk
|
|
}
|
|
|
|
openTrainMessage := func(m *store.Message) {
|
|
path := acc.MessagePath(m.ID)
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
problemf("opening message again for training junk filter: %v (continuing)", err)
|
|
return
|
|
}
|
|
defer func() {
|
|
err := f.Close()
|
|
log.Check(err, "closing file after training junkfilter")
|
|
}()
|
|
p, err := m.LoadPart(f)
|
|
if err != nil {
|
|
problemf("loading parsed message again for training junk filter: %v (continuing)", err)
|
|
return
|
|
}
|
|
trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
|
|
}
|
|
|
|
xensureMailbox := func(name string) store.Mailbox {
|
|
name = norm.NFC.String(name)
|
|
if strings.ToLower(name) == "inbox" {
|
|
name = "Inbox"
|
|
}
|
|
|
|
if mb, ok := mailboxes[name]; ok {
|
|
return mb
|
|
}
|
|
|
|
var p string
|
|
var mb store.Mailbox
|
|
for i, e := range strings.Split(name, "/") {
|
|
if i == 0 {
|
|
p = e
|
|
} else {
|
|
p = path.Join(p, e)
|
|
}
|
|
if _, ok := mailboxes[p]; ok {
|
|
continue
|
|
}
|
|
|
|
q := bstore.QueryTx[store.Mailbox](tx)
|
|
q.FilterNonzero(store.Mailbox{Name: p})
|
|
var err error
|
|
mb, err = q.Get()
|
|
if err == bstore.ErrAbsent {
|
|
uidvalidity, err := acc.NextUIDValidity(tx)
|
|
ximportcheckf(err, "finding next uid validity")
|
|
mb = store.Mailbox{
|
|
Name: p,
|
|
UIDValidity: uidvalidity,
|
|
UIDNext: 1,
|
|
HaveCounts: true,
|
|
// Do not assign special-use flags. This existing account probably already has such mailboxes.
|
|
}
|
|
err = tx.Insert(&mb)
|
|
ximportcheckf(err, "inserting mailbox in database")
|
|
|
|
if tx.Get(&store.Subscription{Name: p}) != nil {
|
|
err := tx.Insert(&store.Subscription{Name: p})
|
|
ximportcheckf(err, "subscribing to imported mailbox")
|
|
}
|
|
changes = append(changes, store.ChangeAddMailbox{Mailbox: mb, Flags: []string{`\Subscribed`}})
|
|
} else if err != nil {
|
|
ximportcheckf(err, "creating mailbox %s (aborting)", p)
|
|
}
|
|
if prevMailbox != "" && mb.Name != prevMailbox {
|
|
sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
|
|
}
|
|
mailboxes[mb.Name] = mb
|
|
sendEvent("count", importCount{mb.Name, 0})
|
|
prevMailbox = mb.Name
|
|
}
|
|
return mb
|
|
}
|
|
|
|
xdeliver := func(mb store.Mailbox, m *store.Message, f *os.File, pos string) {
|
|
defer func() {
|
|
name := f.Name()
|
|
err = f.Close()
|
|
log.Check(err, "closing temporary message file for delivery")
|
|
err := os.Remove(name)
|
|
log.Check(err, "removing temporary message file for delivery")
|
|
}()
|
|
m.MailboxID = mb.ID
|
|
m.MailboxOrigID = mb.ID
|
|
|
|
addSize += m.Size
|
|
if maxSize > 0 && du.MessageSize+addSize > maxSize {
|
|
ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
|
|
}
|
|
|
|
if modseq == 0 {
|
|
var err error
|
|
modseq, err = acc.NextModSeq(tx)
|
|
ximportcheckf(err, "assigning next modseq")
|
|
}
|
|
m.CreateSeq = modseq
|
|
m.ModSeq = modseq
|
|
|
|
mc := destMailboxCounts[mb.ID]
|
|
mc.Add(m.MailboxCounts())
|
|
destMailboxCounts[mb.ID] = mc
|
|
|
|
if len(m.Keywords) > 0 {
|
|
if destMailboxKeywords[mb.ID] == nil {
|
|
destMailboxKeywords[mb.ID] = map[string]bool{}
|
|
}
|
|
for _, k := range m.Keywords {
|
|
destMailboxKeywords[mb.ID][k] = true
|
|
}
|
|
}
|
|
|
|
// Parse message and store parsed information for later fast retrieval.
|
|
p, err := message.EnsurePart(log.Logger, false, f, m.Size)
|
|
if err != nil {
|
|
problemf("parsing message %s: %s (continuing)", pos, err)
|
|
}
|
|
m.ParsedBuf, err = json.Marshal(p)
|
|
ximportcheckf(err, "marshal parsed message structure")
|
|
|
|
// Set fields needed for future threading. By doing it now, DeliverMessage won't
|
|
// have to parse the Part again.
|
|
p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
|
|
m.PrepareThreading(log, &p)
|
|
|
|
if m.Received.IsZero() {
|
|
if p.Envelope != nil && !p.Envelope.Date.IsZero() {
|
|
m.Received = p.Envelope.Date
|
|
} else {
|
|
m.Received = time.Now()
|
|
}
|
|
}
|
|
|
|
// We set the flags that Deliver would set now and train ourselves. This prevents
|
|
// Deliver from training, which would open the junk filter, change it, and write it
|
|
// back to disk, for each message (slow).
|
|
m.JunkFlagsForMailbox(mb, conf)
|
|
if jf != nil && m.NeedsTraining() {
|
|
trainMessage(m, p, pos)
|
|
}
|
|
|
|
const sync = false
|
|
const notrain = true
|
|
const nothreads = true
|
|
const updateDiskUsage = false
|
|
if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
|
|
problemf("delivering message %s: %s (continuing)", pos, err)
|
|
return
|
|
}
|
|
deliveredIDs = append(deliveredIDs, m.ID)
|
|
changes = append(changes, m.ChangeAddUID())
|
|
messages[mb.Name]++
|
|
if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
|
|
prevMailbox = mb.Name
|
|
sendEvent("count", importCount{mb.Name, messages[mb.Name]})
|
|
}
|
|
}
|
|
|
|
ximportMbox := func(mailbox, filename string, r io.Reader) {
|
|
if mailbox == "" {
|
|
problemf("empty mailbox name for mbox file %s (skipping)", filename)
|
|
return
|
|
}
|
|
mb := xensureMailbox(mailbox)
|
|
|
|
mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
|
|
for {
|
|
m, mf, pos, err := mr.Next()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
ximportcheckf(err, "next message in mbox file")
|
|
}
|
|
|
|
xdeliver(mb, m, mf, pos)
|
|
}
|
|
}
|
|
|
|
ximportMaildir := func(mailbox, filename string, r io.Reader) {
|
|
if mailbox == "" {
|
|
problemf("empty mailbox name for maildir file %s (skipping)", filename)
|
|
return
|
|
}
|
|
mb := xensureMailbox(mailbox)
|
|
|
|
f, err := store.CreateMessageTemp(log, "import")
|
|
ximportcheckf(err, "creating temp message")
|
|
defer func() {
|
|
if f != nil {
|
|
store.CloseRemoveTempFile(log, f, "message to import")
|
|
}
|
|
}()
|
|
|
|
// Copy data, changing bare \n into \r\n.
|
|
br := bufio.NewReader(r)
|
|
w := bufio.NewWriter(f)
|
|
var size int64
|
|
for {
|
|
line, err := br.ReadBytes('\n')
|
|
if err != nil && err != io.EOF {
|
|
ximportcheckf(err, "reading message")
|
|
}
|
|
if len(line) > 0 {
|
|
if !bytes.HasSuffix(line, []byte("\r\n")) {
|
|
line = append(line[:len(line)-1], "\r\n"...)
|
|
}
|
|
|
|
n, err := w.Write(line)
|
|
ximportcheckf(err, "writing message")
|
|
size += int64(n)
|
|
}
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
err = w.Flush()
|
|
ximportcheckf(err, "writing message")
|
|
|
|
var received time.Time
|
|
t := strings.SplitN(path.Base(filename), ".", 2)
|
|
if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
|
|
received = time.Unix(v, 0)
|
|
}
|
|
|
|
// Parse flags. See https://cr.yp.to/proto/maildir.html.
|
|
var keepFlags string
|
|
var flags store.Flags
|
|
keywords := map[string]bool{}
|
|
t = strings.SplitN(path.Base(filename), ":2,", 2)
|
|
if len(t) == 2 {
|
|
for _, c := range t[1] {
|
|
switch c {
|
|
case 'P':
|
|
// Passed, doesn't map to a common IMAP flag.
|
|
case 'R':
|
|
flags.Answered = true
|
|
case 'S':
|
|
flags.Seen = true
|
|
case 'T':
|
|
flags.Deleted = true
|
|
case 'D':
|
|
flags.Draft = true
|
|
case 'F':
|
|
flags.Flagged = true
|
|
default:
|
|
if c >= 'a' && c <= 'z' {
|
|
dovecotKeywords, ok := mailboxKeywords[mailbox]
|
|
if !ok {
|
|
// No keywords file seen yet, we'll try later if it comes in.
|
|
keepFlags += string(c)
|
|
} else if kw, ok := dovecotKeywords[c]; ok {
|
|
flagSet(&flags, keywords, kw)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
m := store.Message{
|
|
Received: received,
|
|
Flags: flags,
|
|
Keywords: maps.Keys(keywords),
|
|
Size: size,
|
|
}
|
|
xdeliver(mb, &m, f, filename)
|
|
f = nil
|
|
if keepFlags != "" {
|
|
if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
|
|
mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
|
|
}
|
|
mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
|
|
}
|
|
}
|
|
|
|
importFile := func(name string, r io.Reader) {
|
|
origName := name
|
|
|
|
if strings.HasPrefix(name, skipMailboxPrefix) {
|
|
name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
|
|
}
|
|
|
|
if strings.HasSuffix(name, "/") {
|
|
name = strings.TrimSuffix(name, "/")
|
|
dir := path.Dir(name)
|
|
switch path.Base(dir) {
|
|
case "new", "cur", "tmp":
|
|
// Maildir, ensure it exists.
|
|
mailbox := path.Dir(dir)
|
|
xensureMailbox(mailbox)
|
|
}
|
|
// Otherwise, this is just a directory that probably holds mbox files and maildirs.
|
|
return
|
|
}
|
|
|
|
if strings.HasSuffix(path.Base(name), ".mbox") {
|
|
mailbox := name[:len(name)-len(".mbox")]
|
|
ximportMbox(mailbox, origName, r)
|
|
return
|
|
}
|
|
dir := path.Dir(name)
|
|
dirbase := path.Base(dir)
|
|
switch dirbase {
|
|
case "new", "cur", "tmp":
|
|
mailbox := path.Dir(dir)
|
|
ximportMaildir(mailbox, origName, r)
|
|
default:
|
|
if path.Base(name) == "dovecot-keywords" {
|
|
mailbox := path.Dir(name)
|
|
dovecotKeywords := map[rune]string{}
|
|
words, err := store.ParseDovecotKeywordsFlags(r, log)
|
|
log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
|
|
for i, kw := range words {
|
|
dovecotKeywords['a'+rune(i)] = kw
|
|
}
|
|
mailboxKeywords[mailbox] = dovecotKeywords
|
|
|
|
for id, chars := range mailboxMissingKeywordMessages[mailbox] {
|
|
var flags, zeroflags store.Flags
|
|
keywords := map[string]bool{}
|
|
for _, c := range chars {
|
|
kw, ok := dovecotKeywords[c]
|
|
if !ok {
|
|
problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
|
|
continue
|
|
}
|
|
flagSet(&flags, keywords, kw)
|
|
}
|
|
if flags == zeroflags && len(keywords) == 0 {
|
|
continue
|
|
}
|
|
|
|
m := store.Message{ID: id}
|
|
err := tx.Get(&m)
|
|
ximportcheckf(err, "get imported message for flag update")
|
|
|
|
mc := destMailboxCounts[m.MailboxID]
|
|
mc.Sub(m.MailboxCounts())
|
|
|
|
oflags := m.Flags
|
|
m.Flags = m.Flags.Set(flags, flags)
|
|
m.Keywords = maps.Keys(keywords)
|
|
sort.Strings(m.Keywords)
|
|
|
|
mc.Add(m.MailboxCounts())
|
|
destMailboxCounts[m.MailboxID] = mc
|
|
|
|
if len(m.Keywords) > 0 {
|
|
if destMailboxKeywords[m.MailboxID] == nil {
|
|
destMailboxKeywords[m.MailboxID] = map[string]bool{}
|
|
}
|
|
for _, k := range m.Keywords {
|
|
destMailboxKeywords[m.MailboxID][k] = true
|
|
}
|
|
}
|
|
|
|
// We train before updating, training may set m.TrainedJunk.
|
|
if jf != nil && m.NeedsTraining() {
|
|
openTrainMessage(&m)
|
|
}
|
|
err = tx.Update(&m)
|
|
ximportcheckf(err, "updating message after flag update")
|
|
changes = append(changes, m.ChangeFlags(oflags))
|
|
}
|
|
delete(mailboxMissingKeywordMessages, mailbox)
|
|
} else {
|
|
problemf("unrecognized file %s (skipping)", origName)
|
|
}
|
|
}
|
|
}
|
|
|
|
if zr != nil {
|
|
for _, f := range zr.File {
|
|
if canceled() {
|
|
return
|
|
}
|
|
zf, err := f.Open()
|
|
if err != nil {
|
|
problemf("opening file %s in zip: %v", f.Name, err)
|
|
continue
|
|
}
|
|
importFile(f.Name, zf)
|
|
err = zf.Close()
|
|
log.Check(err, "closing file from zip")
|
|
}
|
|
} else {
|
|
for {
|
|
if canceled() {
|
|
return
|
|
}
|
|
h, err := tr.Next()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
problemf("reading next tar header: %v (aborting)", err)
|
|
return
|
|
}
|
|
importFile(h.Name, tr)
|
|
}
|
|
}
|
|
|
|
total := 0
|
|
for _, count := range messages {
|
|
total += count
|
|
}
|
|
log.Debug("messages imported", slog.Int("total", total))
|
|
|
|
// Send final update for count of last-imported mailbox.
|
|
if prevMailbox != "" {
|
|
sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
|
|
}
|
|
|
|
// Match threads.
|
|
if len(deliveredIDs) > 0 {
|
|
sendEvent("step", importStep{"matching messages with threads"})
|
|
err = acc.AssignThreads(ctx, log, tx, deliveredIDs[0], 0, io.Discard)
|
|
ximportcheckf(err, "assigning messages to threads")
|
|
}
|
|
|
|
// Update mailboxes with counts and keywords.
|
|
for mbID, mc := range destMailboxCounts {
|
|
mb := store.Mailbox{ID: mbID}
|
|
err := tx.Get(&mb)
|
|
ximportcheckf(err, "loading mailbox for counts and keywords")
|
|
|
|
if mb.MailboxCounts != mc {
|
|
mb.MailboxCounts = mc
|
|
changes = append(changes, mb.ChangeCounts())
|
|
}
|
|
|
|
keywords := destMailboxKeywords[mb.ID]
|
|
var mbKwChanged bool
|
|
mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(keywords))
|
|
|
|
err = tx.Update(&mb)
|
|
ximportcheckf(err, "updating mailbox count and keywords")
|
|
if mbKwChanged {
|
|
changes = append(changes, mb.ChangeKeywords())
|
|
}
|
|
}
|
|
|
|
err = acc.AddMessageSize(log, tx, addSize)
|
|
ximportcheckf(err, "updating disk usage after import")
|
|
|
|
err = tx.Commit()
|
|
tx = nil
|
|
ximportcheckf(err, "commit")
|
|
deliveredIDs = nil
|
|
|
|
if jf != nil {
|
|
if err := jf.Close(); err != nil {
|
|
problemf("saving changes of training junk filter: %v (continuing)", err)
|
|
log.Errorx("saving changes of training junk filter", err)
|
|
}
|
|
jf = nil
|
|
}
|
|
|
|
store.BroadcastChanges(acc, changes)
|
|
acc.Unlock()
|
|
err = acc.Close()
|
|
log.Check(err, "closing account after import")
|
|
acc = nil
|
|
|
|
sendEvent("done", importDone{})
|
|
}
|
|
|
|
func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
|
|
switch word {
|
|
case "forwarded", "$forwarded":
|
|
flags.Forwarded = true
|
|
case "junk", "$junk":
|
|
flags.Junk = true
|
|
case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
|
|
flags.Notjunk = true
|
|
case "phishing", "$phishing":
|
|
flags.Phishing = true
|
|
case "mdnsent", "$mdnsent":
|
|
flags.MDNSent = true
|
|
default:
|
|
if err := store.CheckKeyword(word); err == nil {
|
|
keywords[word] = true
|
|
}
|
|
}
|
|
}
|