package webmail // todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed. import ( "compress/gzip" "context" cryptrand "crypto/rand" "encoding/base64" "encoding/json" "errors" "fmt" "log/slog" "net/http" "path/filepath" "reflect" "runtime/debug" "slices" "strconv" "strings" "sync" "time" "github.com/mjl-/bstore" "github.com/mjl-/sherpa" "github.com/mjl-/mox/dns" "github.com/mjl-/mox/message" "github.com/mjl-/mox/metrics" "github.com/mjl-/mox/mlog" "github.com/mjl-/mox/mox-" "github.com/mjl-/mox/moxio" "github.com/mjl-/mox/moxvar" "github.com/mjl-/mox/smtp" "github.com/mjl-/mox/store" ) // Request is a request to an SSE connection to send messages, either for a new // view, to continue with an existing view, or to a cancel an ongoing request. type Request struct { ID int64 SSEID int64 // SSE connection. // To indicate a request is a continuation (more results) of the previous view. // Echoed in events, client checks if it is getting results for the latest request. ViewID int64 // If set, this request and its view are canceled. A new view must be started. Cancel bool Query Query Page Page } type ThreadMode string const ( ThreadOff ThreadMode = "off" ThreadOn ThreadMode = "on" ThreadUnread ThreadMode = "unread" ) // Query is a request for messages that match filters, in a given order. type Query struct { OrderAsc bool // Order by received ascending or desending. Threading ThreadMode Filter Filter NotFilter NotFilter } // AttachmentType is for filtering by attachment type. type AttachmentType string const ( AttachmentIndifferent AttachmentType = "" AttachmentNone AttachmentType = "none" AttachmentAny AttachmentType = "any" AttachmentImage AttachmentType = "image" // png, jpg, gif, ... AttachmentPDF AttachmentType = "pdf" AttachmentArchive AttachmentType = "archive" // zip files, tgz, ... AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ... AttachmentDocument AttachmentType = "document" // odt, docx, ... AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ... ) // Filter selects the messages to return. Fields that are set must all match, // for slices each element by match ("and"). type Filter struct { // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0. MailboxID int64 // If true, also submailboxes are included in the search. MailboxChildrenIncluded bool // In case client doesn't know mailboxes and their IDs yet. Only used during sse // connection setup, where it is turned into a MailboxID. Filtering only looks at // MailboxID. MailboxName string Words []string // Case insensitive substring match for each string. From []string To []string // Including Cc and Bcc. Oldest *time.Time Newest *time.Time Subject []string Attachments AttachmentType Labels []string Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value. SizeMin int64 SizeMax int64 } // NotFilter matches messages that don't match these fields. type NotFilter struct { Words []string From []string To []string Subject []string Attachments AttachmentType Labels []string } // Page holds pagination parameters for a request. type Page struct { // Start returning messages after this ID, if > 0. For pagination, fetching the // next set of messages. AnchorMessageID int64 // Number of messages to return, must be >= 1, we never return more than 10000 for // one request. Count int // If > 0, return messages until DestMessageID is found. More than Count messages // can be returned. For long-running searches, it may take a while before this // message if found. DestMessageID int64 } // todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope. // MessageAddress is like message.Address, but with a dns.Domain, with unicode name // included. type MessageAddress struct { Name string // Free-form name for display in mail applications. User string // Localpart, encoded. Domain dns.Domain } // MessageEnvelope is like message.Envelope, as used in message.Part, but including // unicode host names for IDNA names. type MessageEnvelope struct { // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope. Date time.Time Subject string From []MessageAddress Sender []MessageAddress ReplyTo []MessageAddress To []MessageAddress CC []MessageAddress BCC []MessageAddress InReplyTo string MessageID string } // MessageItem is sent by queries, it has derived information analyzed from // message.Part, made for the needs of the message items in the message list. // messages. type MessageItem struct { Message store.Message // Without ParsedBuf and MsgPrefix, for size. Envelope MessageEnvelope Attachments []Attachment IsSigned bool IsEncrypted bool FirstLine string // Of message body, for showing as preview. MatchQuery bool // If message does not match query, it can still be included because of threading. } // ParsedMessage has more parsed/derived information about a message, intended // for rendering the (contents of the) message. Information from MessageItem is // not duplicated. type ParsedMessage struct { ID int64 Part message.Part Headers map[string][]string ViewMode store.ViewMode // Text parts, can be empty. Texts []string // Whether there is an HTML part. The webclient renders HTML message parts through // an iframe and a separate request with strict CSP headers to prevent script // execution and loading of external resources, which isn't possible when loading // in iframe with inline HTML because not all browsers support the iframe csp // attribute. HasHTML bool ListReplyAddress *MessageAddress // From List-Post. // Information used by MessageItem, not exported in this type. envelope MessageEnvelope attachments []Attachment isSigned bool isEncrypted bool firstLine string } // EventStart is the first message sent on an SSE connection, giving the client // basic data to populate its UI. After this event, messages will follow quickly in // an EventViewMsgs event. type EventStart struct { SSEID int64 LoginAddress MessageAddress Addresses []MessageAddress DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config. MailboxName string Mailboxes []store.Mailbox RejectsMailbox string Settings store.Settings AccountPath string // If nonempty, the path on same host to webaccount interface. Version string } // DomainAddressConfig has the address (localpart) configuration for a domain, so // the webmail client can decide if an address matches the addresses of the // account. type DomainAddressConfig struct { LocalpartCatchallSeparator string // Can be empty. LocalpartCaseSensitive bool } // EventViewMsgs contains messages for a view, possibly a continuation of an // earlier list of messages. type EventViewMsgs struct { ViewID int64 RequestID int64 // If empty, this was the last message for the request. If non-empty, a list of // thread messages. Each with the first message being the reason this thread is // included and can be used as AnchorID in followup requests. If the threading mode // is "off" in the query, there will always be only a single message. If a thread // is sent, all messages in the thread are sent, including those that don't match // the query (e.g. from another mailbox). Threads can be displayed based on the // ThreadParentIDs field, with possibly slightly different display based on field // ThreadMissingLink. MessageItems [][]MessageItem // If set, will match the target page.DestMessageID from the request. ParsedMessage *ParsedMessage // If set, there are no more messages in this view at this moment. Messages can be // added, typically via Change messages, e.g. for new deliveries. ViewEnd bool } // EventViewErr indicates an error during a query for messages. The request is // aborted, no more request-related messages will be sent until the next request. type EventViewErr struct { ViewID int64 RequestID int64 Err string // To be displayed in client. err error // Original message, for checking against context.Canceled. } // EventViewReset indicates that a request for the next set of messages in a few // could not be fulfilled, e.g. because the anchor message does not exist anymore. // The client should clear its list of messages. This can happen before // EventViewMsgs events are sent. type EventViewReset struct { ViewID int64 RequestID int64 } // EventViewChanges contain one or more changes relevant for the client, either // with new mailbox total/unseen message counts, or messages added/removed/modified // (flags) for the current view. type EventViewChanges struct { ViewID int64 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below. } // ChangeMsgAdd adds a new message and possibly its thread to the view. type ChangeMsgAdd struct { store.ChangeAddUID MessageItems []MessageItem } // ChangeMsgRemove removes one or more messages from the view. type ChangeMsgRemove struct { store.ChangeRemoveUIDs } // ChangeMsgFlags updates flags for one message. type ChangeMsgFlags struct { store.ChangeFlags } // ChangeMsgThread updates muted/collapsed fields for one message. type ChangeMsgThread struct { store.ChangeThread } // ChangeMailboxRemove indicates a mailbox was removed, including all its messages. type ChangeMailboxRemove struct { store.ChangeRemoveMailbox } // ChangeMailboxAdd indicates a new mailbox was added, initially without any messages. type ChangeMailboxAdd struct { Mailbox store.Mailbox } // ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same. // It could be under a new parent. type ChangeMailboxRename struct { store.ChangeRenameMailbox } // ChangeMailboxCounts set new total and unseen message counts for a mailbox. type ChangeMailboxCounts struct { store.ChangeMailboxCounts } // ChangeMailboxSpecialUse has updated special-use flags for a mailbox. type ChangeMailboxSpecialUse struct { store.ChangeMailboxSpecialUse } // ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after // a message was added with a keyword that wasn't in the mailbox yet. type ChangeMailboxKeywords struct { store.ChangeMailboxKeywords } // View holds the information about the returned data for a query. It is used to // determine whether mailbox changes should be sent to the client, we only send // addition/removal/flag-changes of messages that are in view, or would extend it // if the view is at the end of the results. type view struct { Request Request // Received of last message we sent to the client. We use it to decide if a newly // delivered message is within the view and the client should get a notification. LastMessageReceived time.Time // If set, the last message in the query view has been sent. There is no need to do // another query, it will not return more data. Used to decide if an event for a // new message should be sent. End bool // Whether message must or must not match mailboxIDs. matchMailboxIDs bool // Mailboxes to match, can be multiple, for matching children. If empty, there is // no filter on mailboxes. mailboxIDs map[int64]bool // Threads sent to client. New messages for this thread are also sent, regardless // of regular query matching, so also for other mailboxes. If the user (re)moved // all messages of a thread, they may still receive events for the thread. Only // filled when query with threading not off. threadIDs map[int64]struct{} } // sses tracks all sse connections, and access to them. var sses = struct { sync.Mutex gen int64 m map[int64]sse }{m: map[int64]sse{}} // sse represents an sse connection. type sse struct { ID int64 // Also returned in EventStart and used in Request to identify the request. AccountName string // Used to check the authenticated user has access to the SSE connection. Request chan Request // Goroutine will receive requests from here, coming from API calls. } // called by the goroutine when the connection is closed or breaks. func (sse sse) unregister() { sses.Lock() defer sses.Unlock() delete(sses.m, sse.ID) // Drain any pending requests, preventing blocked goroutines from API calls. for { select { case <-sse.Request: default: return } } } func sseRegister(accountName string) sse { sses.Lock() defer sses.Unlock() sses.gen++ v := sse{sses.gen, accountName, make(chan Request, 1)} sses.m[v.ID] = v return v } // sseGet returns a reference to an existing connection if it exists and user // has access. func sseGet(id int64, accountName string) (sse, bool) { sses.Lock() defer sses.Unlock() s := sses.m[id] if s.AccountName != accountName { return sse{}, false } return s, true } // ssetoken is a temporary token that has not yet been used to start an SSE // connection. Created by Token, consumed by a new SSE connection. type ssetoken struct { token string // Uniquely generated. accName string address string // Address used to authenticate in call that created the token. sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates. validUntil time.Time } // ssetokens maintains unused tokens. We have just one, but it's a type so we // can define methods. type ssetokens struct { sync.Mutex accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new. tokens map[string]ssetoken // Token to details, for finding account for a token. } var sseTokens = ssetokens{ accountTokens: map[string][]ssetoken{}, tokens: map[string]ssetoken{}, } // xgenerate creates and saves a new token. It ensures no more than 10 tokens // per account exist, removing old ones if needed. func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string { buf := make([]byte, 16) _, err := cryptrand.Read(buf) xcheckf(ctx, err, "generating token") st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)} x.Lock() defer x.Unlock() n := len(x.accountTokens[accName]) if n >= 10 { for _, ost := range x.accountTokens[accName][:n-9] { delete(x.tokens, ost.token) } copy(x.accountTokens[accName], x.accountTokens[accName][n-9:]) x.accountTokens[accName] = x.accountTokens[accName][:9] } x.accountTokens[accName] = append(x.accountTokens[accName], st) x.tokens[st.token] = st return st.token } // check verifies a token, and consumes it if valid. func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) { x.Lock() defer x.Unlock() st, ok := x.tokens[token] if !ok { return "", "", "", false, nil } delete(x.tokens, token) if i := slices.Index(x.accountTokens[st.accName], st); i < 0 { return "", "", "", false, errors.New("internal error, could not find token in account") } else { copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:]) x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1] if len(x.accountTokens[st.accName]) == 0 { delete(x.accountTokens, st.accName) } } if time.Now().After(st.validUntil) { return "", "", "", false, nil } return st.accName, st.address, st.sessionToken, true, nil } // ioErr is panicked on i/o errors in serveEvents and handled in a defer. type ioErr struct { err error } // serveEvents serves an SSE connection. Authentication is done through a query // string parameter "singleUseToken", a one-time-use token returned by the Token // API call. func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed) return } flusher, ok := w.(http.Flusher) if !ok { log.Error("internal error: ResponseWriter not a http.Flusher") http.Error(w, "500 - internal error - cannot sync to http connection", 500) return } q := r.URL.Query() token := q.Get("singleUseToken") if token == "" { http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest) return } accName, address, sessionToken, ok, err := sseTokens.check(token) if err != nil { http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError) return } if !ok { http.Error(w, "400 - bad request - bad token", http.StatusBadRequest) return } if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil { http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest) return } // We can simulate a slow SSE connection. It seems firefox doesn't slow down // incoming responses with its slow-network similation. var waitMin, waitMax time.Duration waitMinMsec := q.Get("waitMinMsec") waitMaxMsec := q.Get("waitMaxMsec") if waitMinMsec != "" && waitMaxMsec != "" { if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil { http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest) return } else { waitMin = time.Duration(v) * time.Millisecond } if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil { http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest) return } else { waitMax = time.Duration(v) * time.Millisecond } } // Parse the request with initial mailbox/search criteria. var req Request dec := json.NewDecoder(strings.NewReader(q.Get("request"))) dec.DisallowUnknownFields() if err := dec.Decode(&req); err != nil { http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest) return } else if req.Page.Count <= 0 { http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest) return } if req.Query.Threading == "" { req.Query.Threading = ThreadOff } var writer *eventWriter metricSSEConnections.Inc() defer metricSSEConnections.Dec() // Below here, error handling cause through xcheckf, which panics with // *sherpa.Error, after which we send an error event to the client. We can also get // an *ioErr when the connection is broken. defer func() { x := recover() if x == nil { return } if err, ok := x.(*sherpa.Error); ok { writer.xsendEvent(ctx, log, "fatalErr", err.Message) } else if _, ok := x.(ioErr); ok { return } else { log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x)) debug.PrintStack() metrics.PanicInc(metrics.Webmail) panic(x) } }() h := w.Header() h.Set("Content-Type", "text/event-stream") h.Set("Cache-Control", "no-cache") // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate // keys), so should be quite compressible. var out writeFlusher gz := mox.AcceptsGzip(r) if gz { h.Set("Content-Encoding", "gzip") out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed) } else { out = nopFlusher{w} } out = httpFlusher{out, flusher} // We'll be writing outgoing SSE events through writer. writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken) defer writer.close() // Fetch initial data. acc, err := store.OpenAccount(log, accName) xcheckf(ctx, err, "open account") defer func() { err := acc.Close() log.Check(err, "closing account") }() comm := store.RegisterComm(acc) defer comm.Unregister() // List addresses that the client can use to send email from. accConf, _ := acc.Conf() loginAddr, err := smtp.ParseAddress(address) xcheckf(ctx, err, "parsing login address") _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false) xcheckf(ctx, err, "looking up destination for login address") loginName := accConf.FullName if dest.FullName != "" { loginName = dest.FullName } loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain} var addresses []MessageAddress for a, dest := range accConf.Destinations { name := dest.FullName if name == "" { name = accConf.FullName } var ma MessageAddress if strings.HasPrefix(a, "@") { dom, err := dns.ParseDomain(a[1:]) xcheckf(ctx, err, "parsing destination address for account") ma = MessageAddress{Domain: dom} } else { addr, err := smtp.ParseAddress(a) xcheckf(ctx, err, "parsing destination address for account") ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain} } addresses = append(addresses, ma) } // User is allowed to send using alias address as message From address. Webmail // will choose it when replying to a message sent to that address. aliasAddrs := map[MessageAddress]bool{} for _, a := range accConf.Aliases { if a.Alias.AllowMsgFrom { ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain} if !aliasAddrs[ma] { addresses = append(addresses, ma) } aliasAddrs[ma] = true } } // We implicitly start a query. We use the reqctx for the transaction, because the // transaction is passed to the query, which can be canceled. reqctx, reqctxcancel := context.WithCancel(ctx) defer func() { // We also cancel in cancelDrain later on, but there is a brief window where the // context wouldn't be canceled. if reqctxcancel != nil { reqctxcancel() reqctxcancel = nil } }() // qtx is kept around during connection initialization, until we pass it off to the // goroutine that starts querying for messages. var qtx *bstore.Tx defer func() { if qtx != nil { err := qtx.Rollback() log.Check(err, "rolling back") } }() var mbl []store.Mailbox settings := store.Settings{ID: 1} // We only take the rlock when getting the tx. acc.WithRLock(func() { // Now a read-only transaction we'll use during the query. qtx, err = acc.DB.Begin(reqctx, false) xcheckf(ctx, err, "begin transaction") mbl, err = bstore.QueryTx[store.Mailbox](qtx).List() xcheckf(ctx, err, "list mailboxes") err = qtx.Get(&settings) xcheckf(ctx, err, "get settings") }) // Find the designated mailbox if a mailbox name is set, or there are no filters at all. var zerofilter Filter var zeronotfilter NotFilter var mailbox store.Mailbox var mailboxPrefixes []string var matchMailboxes bool mailboxIDs := map[int64]bool{} mailboxName := req.Query.Filter.MailboxName if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) { if mailboxName == "" { mailboxName = "Inbox" } var inbox store.Mailbox for _, e := range mbl { if e.Name == mailboxName { mailbox = e } if e.Name == "Inbox" { inbox = e } } if mailbox.ID == 0 { mailbox = inbox } if mailbox.ID == 0 { xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox") } req.Query.Filter.MailboxID = mailbox.ID req.Query.Filter.MailboxName = "" mailboxPrefixes = []string{mailbox.Name + "/"} matchMailboxes = true mailboxIDs[mailbox.ID] = true } else { matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox) } if req.Query.Filter.MailboxChildrenIncluded { xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes) } // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent. sse := sseRegister(acc.Name) defer sse.unregister() // Per-domain localpart config so webclient can decide if an address belongs to the account. domainAddressConfigs := map[string]DomainAddressConfig{} for _, a := range addresses { dom, _ := mox.Conf.Domain(a.Domain) domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive} } // Write first event, allowing client to fill its UI with mailboxes. start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version} writer.xsendEvent(ctx, log, "start", start) // The goroutine doing the querying will send messages on these channels, which // result in an event being written on the SSE connection. viewMsgsc := make(chan EventViewMsgs) viewErrc := make(chan EventViewErr) viewResetc := make(chan EventViewReset) donec := make(chan int64) // When request is done. // Start a view, it determines if we send a change to the client. And start an // implicit query for messages, we'll send the messages to the client which can // fill its ui with messages. v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}} go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec) qtx = nil // viewRequestTx closes qtx // When canceling a query, we must drain its messages until it says it is done. // Otherwise the sending goroutine would hang indefinitely on a channel send. cancelDrain := func() { if reqctxcancel != nil { // Cancel the goroutine doing the querying. reqctxcancel() reqctx = nil reqctxcancel = nil } else { return } // Drain events until done. for { select { case <-viewMsgsc: case <-viewErrc: case <-viewResetc: case <-donec: return } } } // If we stop and a query is in progress, we must drain the channel it will send on. defer cancelDrain() // Changes broadcasted by other connections on this account. If applicable for the // connection/view, we send events. xprocessChanges := func(changes []store.Change) { taggedChanges := [][2]any{} // We get a transaction first time we need it. var xtx *bstore.Tx defer func() { if xtx != nil { err := xtx.Rollback() log.Check(err, "rolling back transaction") } }() ensureTx := func() error { if xtx != nil { return nil } acc.RLock() defer acc.RUnlock() var err error xtx, err = acc.DB.Begin(ctx, false) return err } // This getmsg will now only be called mailboxID+UID, not with messageID set. // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch. getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) { if err := ensureTx(); err != nil { return store.Message{}, fmt.Errorf("transaction: %v", err) } return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get() } // Return uids that are within range in view. Because the end has been reached, or // because the UID is not after the last message. xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) { uidsAny := make([]any, len(uids)) for i, uid := range uids { uidsAny[i] = uid } err := ensureTx() xcheckf(ctx, err, "transaction") q := bstore.QueryTx[store.Message](xtx) q.FilterNonzero(store.Message{MailboxID: mailboxID}) q.FilterEqual("UID", uidsAny...) mbOK := v.matchesMailbox(mailboxID) err = q.ForEach(func(m store.Message) error { _, thread := v.threadIDs[m.ThreadID] if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) { changedUIDs = append(changedUIDs, m.UID) } return nil }) xcheckf(ctx, err, "fetching messages for change") return changedUIDs } // Forward changes that are relevant to the current view. for _, change := range changes { switch c := change.(type) { case store.ChangeAddUID: ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg) xcheckf(ctx, err, "matching new message against view") m, err := getmsg(0, c.MailboxID, c.UID) xcheckf(ctx, err, "get message") _, thread := v.threadIDs[m.ThreadID] if !ok && !thread { continue } state := msgState{acc: acc} mi, err := messageItem(log, m, &state) state.clear() xcheckf(ctx, err, "make messageitem") mi.MatchQuery = ok mil := []MessageItem{mi} if !thread && req.Query.Threading != ThreadOff { err := ensureTx() xcheckf(ctx, err, "transaction") more, _, err := gatherThread(log, xtx, acc, v, m, 0, false) xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID) mil = append(mil, more...) v.threadIDs[m.ThreadID] = struct{}{} } taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}}) // If message extends the view, store it as such. if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) { v.LastMessageReceived = m.Received } case store.ChangeRemoveUIDs: // We may send changes for uids the client doesn't know, that's fine. changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true) if len(changedUIDs) == 0 { continue } ch := ChangeMsgRemove{c} ch.UIDs = changedUIDs taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch}) case store.ChangeFlags: // We may send changes for uids the client doesn't know, that's fine. changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false) if len(changedUIDs) == 0 { continue } ch := ChangeMsgFlags{c} ch.UID = changedUIDs[0] taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch}) case store.ChangeThread: // Change in muted/collaped state, just always ship it. taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}}) case store.ChangeRemoveMailbox: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}}) case store.ChangeAddMailbox: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}}) case store.ChangeRenameMailbox: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}}) case store.ChangeMailboxCounts: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}}) case store.ChangeMailboxSpecialUse: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}}) case store.ChangeMailboxKeywords: taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}}) case store.ChangeAddSubscription: // Webmail does not care about subscriptions. default: panic(fmt.Sprintf("missing case for change %T", c)) } } if len(taggedChanges) > 0 { viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges} writer.xsendEvent(ctx, log, "viewChanges", viewChanges) } } timer := time.NewTimer(5 * time.Minute) // For keepalives. defer timer.Stop() for { if writer.wrote { timer.Reset(5 * time.Minute) writer.wrote = false } pending := comm.Pending if reqctx != nil { pending = nil } select { case <-mox.Shutdown.Done(): writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down") // Work around go vet, it doesn't see defer cancelDrain. if reqctxcancel != nil { reqctxcancel() } return case <-timer.C: _, err := fmt.Fprintf(out, ": keepalive\n\n") if err != nil { log.Errorx("write keepalive", err) // Work around go vet, it doesn't see defer cancelDrain. if reqctxcancel != nil { reqctxcancel() } return } out.Flush() writer.wrote = true case vm := <-viewMsgsc: if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID { panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID)) } if vm.ViewEnd { v.End = true } if len(vm.MessageItems) > 0 { v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received } writer.xsendEvent(ctx, log, "viewMsgs", vm) case ve := <-viewErrc: if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID { panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID)) } if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) { // Work around go vet, it doesn't see defer cancelDrain. if reqctxcancel != nil { reqctxcancel() } return } writer.xsendEvent(ctx, log, "viewErr", ve) case vr := <-viewResetc: if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID { panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID)) } writer.xsendEvent(ctx, log, "viewReset", vr) case id := <-donec: if id != v.Request.ID { panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID)) } if reqctxcancel != nil { reqctxcancel() } reqctx = nil reqctxcancel = nil case req := <-sse.Request: if reqctx != nil { cancelDrain() } if req.Cancel { v = view{req, time.Time{}, false, false, nil, nil} continue } reqctx, reqctxcancel = context.WithCancel(ctx) stop := func() (stop bool) { // rtx is handed off viewRequestTx below, but we must clean it up in case of errors. var rtx *bstore.Tx var err error defer func() { if rtx != nil { err = rtx.Rollback() log.Check(err, "rolling back transaction") } }() acc.WithRLock(func() { rtx, err = acc.DB.Begin(reqctx, false) }) if err != nil { reqctxcancel() reqctx = nil reqctxcancel = nil if errors.Is(err, context.Canceled) { return true } err := fmt.Errorf("begin transaction: %v", err) viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err} writer.xsendEvent(ctx, log, "viewErr", viewErr) return false } // Reset view state for new query. if req.ViewID != v.Request.ViewID { matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox) if req.Query.Filter.MailboxChildrenIncluded { xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes) } v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}} } else { v.Request = req } go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec) rtx = nil return false }() if stop { return } case <-pending: xprocessChanges(comm.Get()) case <-ctx.Done(): // Work around go vet, it doesn't see defer cancelDrain. if reqctxcancel != nil { reqctxcancel() } return } } } // xprepareMailboxIDs prepare the first half of filters for mailboxes, based on // f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in // mailboxIDs must or must not match. mailboxPrefixes is for use with // xgatherMailboxIDs to gather children of the mailboxIDs. func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) { matchMailboxes = true mailboxIDs = map[int64]bool{} if f.MailboxID == -1 { matchMailboxes = false // Add the trash, junk and account rejects mailbox. err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error { if mb.Trash || mb.Junk || mb.Name == rejectsMailbox { mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/") mailboxIDs[mb.ID] = true } return nil }) xcheckf(ctx, err, "finding trash/junk/rejects mailbox") } else if f.MailboxID > 0 { mb := store.Mailbox{ID: f.MailboxID} err := tx.Get(&mb) xcheckf(ctx, err, "get mailbox") mailboxIDs[f.MailboxID] = true mailboxPrefixes = []string{mb.Name + "/"} } return } // xgatherMailboxIDs adds all mailboxes with a prefix matching any of // mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes. func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) { // Gather more mailboxes to filter on, based on mailboxPrefixes. if len(mailboxPrefixes) == 0 { return } err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error { for _, p := range mailboxPrefixes { if strings.HasPrefix(mb.Name, p) { mailboxIDs[mb.ID] = true break } } return nil }) xcheckf(ctx, err, "gathering mailboxes") } // matchesMailbox returns whether a mailbox matches the view. func (v view) matchesMailbox(mailboxID int64) bool { return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID] } // inRange returns whether m is within the range for the view, whether a change for // this message should be sent to the client so it can update its state. func (v view) inRange(m store.Message) bool { return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived) } // matches checks if the message, identified by either messageID or mailboxID+UID, // is in the current "view" (i.e. passing the filters, and if checkRange is set // also if within the range of sent messages based on sort order and the last seen // message). getmsg retrieves the message, which may be necessary depending on the // active filters. Used to determine if a store.Change with a new message should be // sent, and for the destination and anchor messages in view requests. func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) { var m store.Message ensureMessage := func() bool { if m.ID == 0 && rerr == nil { m, rerr = getmsg(messageID, mailboxID, uid) } return rerr == nil } q := v.Request.Query // Warning: Filters must be kept in sync between queryMessage and view.matches. // Check filters. if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) { return false, rerr } // note: anchorMessageID is not relevant for matching. flagfilter := q.flagFilterFn() if flagfilter != nil && !flagfilter(flags, keywords) { return false, rerr } if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) { return false, rerr } if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) { return false, rerr } if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) { return false, rerr } if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) { return false, rerr } state := msgState{acc: acc} defer func() { if rerr == nil && state.err != nil { rerr = state.err } state.clear() }() attachmentFilter := q.attachmentFilterFn(log, acc, &state) if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) { return false, rerr } envFilter := q.envFilterFn(log, &state) if envFilter != nil && (!ensureMessage() || !envFilter(m)) { return false, rerr } headerFilter := q.headerFilterFn(log, &state) if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) { return false, rerr } wordsFilter := q.wordsFilterFn(log, &state) if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) { return false, rerr } // Now check that we are either within the sorting order, or "last" was sent. if !checkRange || v.End || ensureMessage() && v.inRange(m) { return true, rerr } return false, rerr } type msgResp struct { err error // If set, an error happened and fields below are not set. reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set. viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set. mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request. pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi. } // viewRequestTx executes a request (query with filters, pagination) by // launching a new goroutine with queryMessages, receiving results as msgResp, // and sending Event* to the SSE connection. // // It always closes tx. func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) { defer func() { err := tx.Rollback() log.Check(err, "rolling back query transaction") donec <- v.Request.ID x := recover() // Should not happen, but don't take program down if it does. if x != nil { log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x)) debug.PrintStack() metrics.PanicInc(metrics.Webmailrequest) } }() var msgitems [][]MessageItem // Gathering for 300ms, then flushing. var parsedMessage *ParsedMessage var viewEnd bool var immediate bool // No waiting, flush immediate. t := time.NewTimer(300 * time.Millisecond) defer t.Stop() sendViewMsgs := func(force bool) { if len(msgitems) == 0 && !force { return } immediate = false msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd} msgitems = nil parsedMessage = nil t.Reset(300 * time.Millisecond) } // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine. mrc := make(chan msgResp, 1) go queryMessages(ctx, log, acc, tx, v, mrc) for { select { case mr, ok := <-mrc: if !ok { sendViewMsgs(false) // Empty message list signals this query is done. msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false} return } if mr.err != nil { sendViewMsgs(false) errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err} return } if mr.reset { resetc <- EventViewReset{v.Request.ViewID, v.Request.ID} continue } if mr.viewEnd { viewEnd = true sendViewMsgs(true) return } msgitems = append(msgitems, mr.mil) if mr.pm != nil { parsedMessage = mr.pm } if immediate { sendViewMsgs(true) } case <-t.C: if len(msgitems) == 0 { // Nothing to send yet. We'll send immediately when the next message comes in. immediate = true } else { sendViewMsgs(false) } } } } // queryMessages executes a query, with filter, pagination, destination message id // to fetch (the message that the client had in view and wants to display again). // It sends on msgc, with several types of messages: errors, whether the view is // reset due to missing AnchorMessageID, and when the end of the view was reached // and/or for a message. func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) { defer func() { x := recover() // Should not happen, but don't take program down if it does. if x != nil { log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x)) debug.PrintStack() mrc <- msgResp{err: fmt.Errorf("query failed")} metrics.PanicInc(metrics.Webmailquery) } close(mrc) }() query := v.Request.Query page := v.Request.Page // Warning: Filters must be kept in sync between queryMessage and view.matches. checkMessage := func(id int64) (valid bool, rerr error) { m := store.Message{ID: id} err := tx.Get(&m) if err == bstore.ErrAbsent || err == nil && m.Expunged { return false, nil } else if err != nil { return false, err } else { return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) { return m, nil }) } } // Check if AnchorMessageID exists and matches filter. If not, we will reset the view. if page.AnchorMessageID > 0 { // Check if message exists and (still) matches the filter. // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages. if valid, err := checkMessage(page.AnchorMessageID); err != nil { mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)} return } else if !valid { mrc <- msgResp{reset: true} page.AnchorMessageID = 0 } } // Check if page.DestMessageID exists and matches filter. If not, we will ignore // it instead of continuing to send message till the end of the view. if page.DestMessageID > 0 { if valid, err := checkMessage(page.DestMessageID); err != nil { mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)} return } else if !valid { page.DestMessageID = 0 } } // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter. q := bstore.QueryTx[store.Message](tx) q.FilterEqual("Expunged", false) if len(v.mailboxIDs) > 0 { if len(v.mailboxIDs) == 1 && v.matchMailboxIDs { // Should result in fast indexed query. for mbID := range v.mailboxIDs { q.FilterNonzero(store.Message{MailboxID: mbID}) } } else { idsAny := make([]any, 0, len(v.mailboxIDs)) for mbID := range v.mailboxIDs { idsAny = append(idsAny, mbID) } if v.matchMailboxIDs { q.FilterEqual("MailboxID", idsAny...) } else { q.FilterNotEqual("MailboxID", idsAny...) } } } // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it. if page.AnchorMessageID > 0 { var seen = false q.FilterFn(func(m store.Message) bool { if seen { return true } seen = m.ID == page.AnchorMessageID return false }) } // We may be added filters the the query below. The FilterFn signature does not // implement reporting errors, or anything else, just a bool. So when making the // filter functions, we give them a place to store parsed message state, and an // error. We check the error during and after query execution. state := msgState{acc: acc} defer state.clear() flagfilter := query.flagFilterFn() if flagfilter != nil { q.FilterFn(func(m store.Message) bool { return flagfilter(m.Flags, m.Keywords) }) } if query.Filter.Oldest != nil { q.FilterGreaterEqual("Received", *query.Filter.Oldest) } if query.Filter.Newest != nil { q.FilterLessEqual("Received", *query.Filter.Newest) } if query.Filter.SizeMin > 0 { q.FilterGreaterEqual("Size", query.Filter.SizeMin) } if query.Filter.SizeMax > 0 { q.FilterLessEqual("Size", query.Filter.SizeMax) } attachmentFilter := query.attachmentFilterFn(log, acc, &state) if attachmentFilter != nil { q.FilterFn(attachmentFilter) } envFilter := query.envFilterFn(log, &state) if envFilter != nil { q.FilterFn(envFilter) } headerFilter := query.headerFilterFn(log, &state) if headerFilter != nil { q.FilterFn(headerFilter) } wordsFilter := query.wordsFilterFn(log, &state) if wordsFilter != nil { q.FilterFn(wordsFilter) } if query.OrderAsc { q.SortAsc("Received") } else { q.SortDesc("Received") } found := page.DestMessageID <= 0 end := true have := 0 err := q.ForEach(func(m store.Message) error { // Check for an error in one of the filters, propagate it. if state.err != nil { return state.err } if have >= page.Count && found || have > 10000 { end = false return bstore.StopForEach } if _, ok := v.threadIDs[m.ThreadID]; ok { // Message was already returned as part of a thread. return nil } var pm *ParsedMessage if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 { // For threads, if there was no DestMessageID, we may be getting the newest // message. For an initial view, this isn't necessarily the first the user is // expected to read first, that would be the first unread, which we'll get below // when gathering the thread. found = true xpm, err := parsedMessage(log, m, &state, true, false) if err != nil && errors.Is(err, message.ErrHeader) { log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err)) } else if err != nil { return fmt.Errorf("parsing message %d: %v", m.ID, err) } else { pm = &xpm } } mi, err := messageItem(log, m, &state) if err != nil { return fmt.Errorf("making messageitem for message %d: %v", m.ID, err) } mil := []MessageItem{mi} if query.Threading != ThreadOff { more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0) if err != nil { return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err) } if xpm != nil { pm = xpm found = true } mil = append(mil, more...) v.threadIDs[m.ThreadID] = struct{}{} // Calculate how many messages the frontend is going to show, and only count those as returned. collapsed := map[int64]bool{} for _, mi := range mil { collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed } unread := map[int64]bool{} // Propagated to thread root. if query.Threading == ThreadUnread { for _, mi := range mil { mm := mi.Message if mm.Seen { continue } unread[mm.ID] = true for _, id := range mm.ThreadParentIDs { unread[id] = true } } } for _, mi := range mil { mm := mi.Message threadRoot := true rootID := mm.ID for _, id := range mm.ThreadParentIDs { if _, ok := collapsed[id]; ok { threadRoot = false rootID = id } } if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) { have++ } } } else { have++ } if pm != nil && len(pm.envelope.From) == 1 { pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0]) if err != nil { return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err) } } mrc <- msgResp{mil: mil, pm: pm} return nil }) // Check for an error in one of the filters again. Check in ForEach would not // trigger if the last message has the error. if err == nil && state.err != nil { err = state.err } if err != nil { mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)} return } if end { mrc <- msgResp{viewEnd: true} } } func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) { if m.ThreadID == 0 { // If we would continue, FilterNonzero would fail because there are no non-zero fields. return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done") } // Fetch other messages for this thread. qt := bstore.QueryTx[store.Message](tx) qt.FilterNonzero(store.Message{ThreadID: m.ThreadID}) qt.FilterEqual("Expunged", false) qt.FilterNotEqual("ID", m.ID) qt.SortAsc("ID") tml, err := qt.List() if err != nil { return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err) } var mil []MessageItem var pm *ParsedMessage var firstUnread bool for _, tm := range tml { err := func() error { xstate := msgState{acc: acc} defer xstate.clear() mi, err := messageItem(log, tm, &xstate) if err != nil { return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err) } mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) { return tm, nil }) if err != nil { return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err) } mil = append(mil, mi) if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) { firstUnread = !tm.Seen xpm, err := parsedMessage(log, tm, &xstate, true, false) if err != nil && errors.Is(err, message.ErrHeader) { log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err)) } else if err != nil { return fmt.Errorf("parsing thread message %d: %v", tm.ID, err) } else { pm = &xpm } } return nil }() if err != nil { return nil, nil, err } } // Finally, the message that caused us to gather this thread (which is likely the // most recent message in the thread) could be the only unread message. if destMessageID == 0 && first && !m.Seen && !firstUnread { xstate := msgState{acc: acc} defer xstate.clear() xpm, err := parsedMessage(log, m, &xstate, true, false) if err != nil && errors.Is(err, message.ErrHeader) { log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err)) } else if err != nil { return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err) } else { pm = &xpm } } return mil, pm, nil } // While checking the filters on a message, we may need to get more message // details as each filter passes. We check the filters that need the basic // information first, and load and cache more details for the next filters. // msgState holds parsed details for a message, it is updated while filtering, // with more information or reset for a next message. type msgState struct { acc *store.Account // Never changes during lifetime. err error // Once set, doesn't get cleared. m store.Message part *message.Part // Will be without Reader when msgr is nil. msgr *store.MsgReader } func (ms *msgState) clear() { if ms.msgr != nil { ms.msgr.Close() ms.msgr = nil } *ms = msgState{acc: ms.acc, err: ms.err} } func (ms *msgState) ensureMsg(m store.Message) { if m.ID != ms.m.ID { ms.clear() } ms.m = m } func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool { ms.ensureMsg(m) if ms.err == nil { if ms.part == nil { if m.ParsedBuf == nil { ms.err = fmt.Errorf("message %d not parsed", m.ID) return false } var p message.Part if err := json.Unmarshal(m.ParsedBuf, &p); err != nil { ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err) return false } ms.part = &p } if withMsgReader && ms.msgr == nil { ms.msgr = ms.acc.MessageReader(m) ms.part.SetReaderAt(ms.msgr) } } return ms.part != nil } // flagFilterFn returns a function that applies the flag/keyword/"label"-related // filters for a query. A nil function is returned if there are no flags to filter // on. func (q Query) flagFilterFn() func(store.Flags, []string) bool { labels := map[string]bool{} for _, k := range q.Filter.Labels { labels[k] = true } for _, k := range q.NotFilter.Labels { labels[k] = false } if len(labels) == 0 { return nil } var mask, flags store.Flags systemflags := map[string][]*bool{ `\answered`: {&mask.Answered, &flags.Answered}, `\flagged`: {&mask.Flagged, &flags.Flagged}, `\deleted`: {&mask.Deleted, &flags.Deleted}, `\seen`: {&mask.Seen, &flags.Seen}, `\draft`: {&mask.Draft, &flags.Draft}, `$junk`: {&mask.Junk, &flags.Junk}, `$notjunk`: {&mask.Notjunk, &flags.Notjunk}, `$forwarded`: {&mask.Forwarded, &flags.Forwarded}, `$phishing`: {&mask.Phishing, &flags.Phishing}, `$mdnsent`: {&mask.MDNSent, &flags.MDNSent}, } keywords := map[string]bool{} for k, v := range labels { k = strings.ToLower(k) if mf, ok := systemflags[k]; ok { *mf[0] = true *mf[1] = v } else { keywords[k] = v } } return func(msgFlags store.Flags, msgKeywords []string) bool { var f store.Flags if f.Set(mask, msgFlags) != flags { return false } for k, v := range keywords { if slices.Contains(msgKeywords, k) != v { return false } } return true } } // attachmentFilterFn returns a function that filters for the attachment-related // filter from the query. A nil function is returned if there are attachment // filters. func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool { if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent { return nil } return func(m store.Message) bool { if !state.ensurePart(m, false) { return false } types, err := attachmentTypes(log, m, state) if err != nil { state.err = err return false } return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments]) } } var attachmentMimetypes = map[string]AttachmentType{ "application/pdf": AttachmentPDF, "application/zip": AttachmentArchive, "application/x-rar-compressed": AttachmentArchive, "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet, "application/vnd.ms-excel": AttachmentSpreadsheet, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet, "application/vnd.oasis.opendocument.text": AttachmentDocument, "application/vnd.oasis.opendocument.presentation": AttachmentPresentation, "application/vnd.ms-powerpoint": AttachmentPresentation, "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation, } var attachmentExtensions = map[string]AttachmentType{ ".pdf": AttachmentPDF, ".zip": AttachmentArchive, ".tar": AttachmentArchive, ".tgz": AttachmentArchive, ".tar.gz": AttachmentArchive, ".tbz2": AttachmentArchive, ".tar.bz2": AttachmentArchive, ".tar.lz": AttachmentArchive, ".tlz": AttachmentArchive, ".tar.xz": AttachmentArchive, ".txz": AttachmentArchive, ".tar.zst": AttachmentArchive, ".tar.lz4": AttachmentArchive, ".7z": AttachmentArchive, ".rar": AttachmentArchive, ".ods": AttachmentSpreadsheet, ".xls": AttachmentSpreadsheet, ".xlsx": AttachmentSpreadsheet, ".odt": AttachmentDocument, ".doc": AttachmentDocument, ".docx": AttachmentDocument, ".odp": AttachmentPresentation, ".ppt": AttachmentPresentation, ".pptx": AttachmentPresentation, } func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) { types := map[AttachmentType]bool{} pm, err := parsedMessage(log, m, state, false, false) if err != nil { return nil, fmt.Errorf("parsing message for attachments: %w", err) } for _, a := range pm.attachments { if a.Part.MediaType == "IMAGE" { types[AttachmentImage] = true continue } mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType) if t, ok := attachmentMimetypes[mt]; ok { types[t] = true continue } _, filename, err := a.Part.DispositionFilename() if err != nil && errors.Is(err, message.ErrParamEncoding) { log.Debugx("parsing disposition/filename", err) } else if err != nil { return nil, fmt.Errorf("reading disposition/filename: %v", err) } if ext := filepath.Ext(filename); ext != "" { if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok { types[t] = true } } } if len(types) == 0 { types[AttachmentNone] = true } else { types[AttachmentAny] = true } return types, nil } // envFilterFn returns a filter function for the "envelope" headers ("envelope" as // used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name // clash with SMTP envelope) for the query. A nil function is returned if no // filtering is needed. func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool { if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 { return nil } lower := func(l []string) []string { if len(l) == 0 { return nil } r := make([]string, len(l)) for i, s := range l { r[i] = strings.ToLower(s) } return r } filterSubject := lower(q.Filter.Subject) notFilterSubject := lower(q.NotFilter.Subject) filterFrom := lower(q.Filter.From) notFilterFrom := lower(q.NotFilter.From) filterTo := lower(q.Filter.To) notFilterTo := lower(q.NotFilter.To) return func(m store.Message) bool { if !state.ensurePart(m, false) { return false } var env message.Envelope if state.part.Envelope != nil { env = *state.part.Envelope } if len(filterSubject) > 0 || len(notFilterSubject) > 0 { subject := strings.ToLower(env.Subject) for _, s := range filterSubject { if !strings.Contains(subject, s) { return false } } for _, s := range notFilterSubject { if strings.Contains(subject, s) { return false } } } contains := func(textLower []string, l []message.Address, all bool) bool { next: for _, s := range textLower { for _, a := range l { name := strings.ToLower(a.Name) addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host)) if strings.Contains(name, s) || strings.Contains(addr, s) { if !all { return true } continue next } } if all { return false } } return all } if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) { return false } if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) { return false } if len(filterTo) > 0 || len(notFilterTo) > 0 { to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...) if len(filterTo) > 0 && !contains(filterTo, to, true) { return false } if len(notFilterTo) > 0 && contains(notFilterTo, to, false) { return false } } return true } } // headerFilterFn returns a function that filters for the header filters in the // query. A nil function is returned if there are no header filters. func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool { if len(q.Filter.Headers) == 0 { return nil } lowerValues := make([]string, len(q.Filter.Headers)) for i, t := range q.Filter.Headers { lowerValues[i] = strings.ToLower(t[1]) } return func(m store.Message) bool { if !state.ensurePart(m, true) { return false } hdr, err := state.part.Header() if err != nil { state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err) return false } next: for i, t := range q.Filter.Headers { k := t[0] v := lowerValues[i] l := hdr.Values(k) if v == "" && len(l) > 0 { continue } for _, e := range l { if strings.Contains(strings.ToLower(e), v) { continue next } } return false } return true } } // wordFiltersFn returns a function that applies the word filters of the query. A // nil function is returned when query does not contain a word filter. func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool { if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 { return nil } ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words) return func(m store.Message) bool { if !state.ensurePart(m, true) { return false } if ok, err := ws.MatchPart(log, state.part, true); err != nil { state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err) return false } else { return ok } } }