mox/webmail/view.go
Mechiel Lukkien d1b87cdb0d
replace packages slog and slices from golang.org/x/exp with stdlib
since we are now at go1.21 as minimum.
2024-02-08 14:49:01 +01:00

1966 lines
62 KiB
Go

package webmail
// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
import (
"compress/gzip"
"context"
cryptrand "crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"path/filepath"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/sherpa"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
"github.com/mjl-/mox/moxvar"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/store"
)
// Request is a request to an SSE connection to send messages, either for a new
// view, to continue with an existing view, or to a cancel an ongoing request.
type Request struct {
ID int64
SSEID int64 // SSE connection.
// To indicate a request is a continuation (more results) of the previous view.
// Echoed in events, client checks if it is getting results for the latest request.
ViewID int64
// If set, this request and its view are canceled. A new view must be started.
Cancel bool
Query Query
Page Page
}
type ThreadMode string
const (
ThreadOff ThreadMode = "off"
ThreadOn ThreadMode = "on"
ThreadUnread ThreadMode = "unread"
)
// Query is a request for messages that match filters, in a given order.
type Query struct {
OrderAsc bool // Order by received ascending or desending.
Threading ThreadMode
Filter Filter
NotFilter NotFilter
}
// AttachmentType is for filtering by attachment type.
type AttachmentType string
const (
AttachmentIndifferent AttachmentType = ""
AttachmentNone AttachmentType = "none"
AttachmentAny AttachmentType = "any"
AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
AttachmentPDF AttachmentType = "pdf"
AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
AttachmentDocument AttachmentType = "document" // odt, docx, ...
AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
)
// Filter selects the messages to return. Fields that are set must all match,
// for slices each element by match ("and").
type Filter struct {
// If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
MailboxID int64
// If true, also submailboxes are included in the search.
MailboxChildrenIncluded bool
// In case client doesn't know mailboxes and their IDs yet. Only used during sse
// connection setup, where it is turned into a MailboxID. Filtering only looks at
// MailboxID.
MailboxName string
Words []string // Case insensitive substring match for each string.
From []string
To []string // Including Cc and Bcc.
Oldest *time.Time
Newest *time.Time
Subject []string
Attachments AttachmentType
Labels []string
Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
SizeMin int64
SizeMax int64
}
// NotFilter matches messages that don't match these fields.
type NotFilter struct {
Words []string
From []string
To []string
Subject []string
Attachments AttachmentType
Labels []string
}
// Page holds pagination parameters for a request.
type Page struct {
// Start returning messages after this ID, if > 0. For pagination, fetching the
// next set of messages.
AnchorMessageID int64
// Number of messages to return, must be >= 1, we never return more than 10000 for
// one request.
Count int
// If > 0, return messages until DestMessageID is found. More than Count messages
// can be returned. For long-running searches, it may take a while before this
// message if found.
DestMessageID int64
}
// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
// included.
type MessageAddress struct {
Name string // Free-form name for display in mail applications.
User string // Localpart, encoded.
Domain dns.Domain
}
// MessageEnvelope is like message.Envelope, as used in message.Part, but including
// unicode host names for IDNA names.
type MessageEnvelope struct {
// todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
Date time.Time
Subject string
From []MessageAddress
Sender []MessageAddress
ReplyTo []MessageAddress
To []MessageAddress
CC []MessageAddress
BCC []MessageAddress
InReplyTo string
MessageID string
}
// MessageItem is sent by queries, it has derived information analyzed from
// message.Part, made for the needs of the message items in the message list.
// messages.
type MessageItem struct {
Message store.Message // Without ParsedBuf and MsgPrefix, for size.
Envelope MessageEnvelope
Attachments []Attachment
IsSigned bool
IsEncrypted bool
FirstLine string // Of message body, for showing as preview.
MatchQuery bool // If message does not match query, it can still be included because of threading.
}
// ParsedMessage has more parsed/derived information about a message, intended
// for rendering the (contents of the) message. Information from MessageItem is
// not duplicated.
type ParsedMessage struct {
ID int64
Part message.Part
Headers map[string][]string
// Text parts, can be empty.
Texts []string
// Whether there is an HTML part. The webclient renders HTML message parts through
// an iframe and a separate request with strict CSP headers to prevent script
// execution and loading of external resources, which isn't possible when loading
// in iframe with inline HTML because not all browsers support the iframe csp
// attribute.
HasHTML bool
ListReplyAddress *MessageAddress // From List-Post.
// Information used by MessageItem, not exported in this type.
envelope MessageEnvelope
attachments []Attachment
isSigned bool
isEncrypted bool
firstLine string
}
// EventStart is the first message sent on an SSE connection, giving the client
// basic data to populate its UI. After this event, messages will follow quickly in
// an EventViewMsgs event.
type EventStart struct {
SSEID int64
LoginAddress MessageAddress
Addresses []MessageAddress
DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
MailboxName string
Mailboxes []store.Mailbox
RejectsMailbox string
Version string
}
// DomainAddressConfig has the address (localpart) configuration for a domain, so
// the webmail client can decide if an address matches the addresses of the
// account.
type DomainAddressConfig struct {
LocalpartCatchallSeparator string // Can be empty.
LocalpartCaseSensitive bool
}
// EventViewMsgs contains messages for a view, possibly a continuation of an
// earlier list of messages.
type EventViewMsgs struct {
ViewID int64
RequestID int64
// If empty, this was the last message for the request. If non-empty, a list of
// thread messages. Each with the first message being the reason this thread is
// included and can be used as AnchorID in followup requests. If the threading mode
// is "off" in the query, there will always be only a single message. If a thread
// is sent, all messages in the thread are sent, including those that don't match
// the query (e.g. from another mailbox). Threads can be displayed based on the
// ThreadParentIDs field, with possibly slightly different display based on field
// ThreadMissingLink.
MessageItems [][]MessageItem
// If set, will match the target page.DestMessageID from the request.
ParsedMessage *ParsedMessage
// If set, there are no more messages in this view at this moment. Messages can be
// added, typically via Change messages, e.g. for new deliveries.
ViewEnd bool
}
// EventViewErr indicates an error during a query for messages. The request is
// aborted, no more request-related messages will be sent until the next request.
type EventViewErr struct {
ViewID int64
RequestID int64
Err string // To be displayed in client.
err error // Original message, for checking against context.Canceled.
}
// EventViewReset indicates that a request for the next set of messages in a few
// could not be fulfilled, e.g. because the anchor message does not exist anymore.
// The client should clear its list of messages. This can happen before
// EventViewMsgs events are sent.
type EventViewReset struct {
ViewID int64
RequestID int64
}
// EventViewChanges contain one or more changes relevant for the client, either
// with new mailbox total/unseen message counts, or messages added/removed/modified
// (flags) for the current view.
type EventViewChanges struct {
ViewID int64
Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
}
// ChangeMsgAdd adds a new message and possibly its thread to the view.
type ChangeMsgAdd struct {
store.ChangeAddUID
MessageItems []MessageItem
}
// ChangeMsgRemove removes one or more messages from the view.
type ChangeMsgRemove struct {
store.ChangeRemoveUIDs
}
// ChangeMsgFlags updates flags for one message.
type ChangeMsgFlags struct {
store.ChangeFlags
}
// ChangeMsgThread updates muted/collapsed fields for one message.
type ChangeMsgThread struct {
store.ChangeThread
}
// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
type ChangeMailboxRemove struct {
store.ChangeRemoveMailbox
}
// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
type ChangeMailboxAdd struct {
Mailbox store.Mailbox
}
// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
// It could be under a new parent.
type ChangeMailboxRename struct {
store.ChangeRenameMailbox
}
// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
type ChangeMailboxCounts struct {
store.ChangeMailboxCounts
}
// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
type ChangeMailboxSpecialUse struct {
store.ChangeMailboxSpecialUse
}
// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
// a message was added with a keyword that wasn't in the mailbox yet.
type ChangeMailboxKeywords struct {
store.ChangeMailboxKeywords
}
// View holds the information about the returned data for a query. It is used to
// determine whether mailbox changes should be sent to the client, we only send
// addition/removal/flag-changes of messages that are in view, or would extend it
// if the view is at the end of the results.
type view struct {
Request Request
// Received of last message we sent to the client. We use it to decide if a newly
// delivered message is within the view and the client should get a notification.
LastMessageReceived time.Time
// If set, the last message in the query view has been sent. There is no need to do
// another query, it will not return more data. Used to decide if an event for a
// new message should be sent.
End bool
// Whether message must or must not match mailboxIDs.
matchMailboxIDs bool
// Mailboxes to match, can be multiple, for matching children. If empty, there is
// no filter on mailboxes.
mailboxIDs map[int64]bool
// Threads sent to client. New messages for this thread are also sent, regardless
// of regular query matching, so also for other mailboxes. If the user (re)moved
// all messages of a thread, they may still receive events for the thread. Only
// filled when query with threading not off.
threadIDs map[int64]struct{}
}
// sses tracks all sse connections, and access to them.
var sses = struct {
sync.Mutex
gen int64
m map[int64]sse
}{m: map[int64]sse{}}
// sse represents an sse connection.
type sse struct {
ID int64 // Also returned in EventStart and used in Request to identify the request.
AccountName string // Used to check the authenticated user has access to the SSE connection.
Request chan Request // Goroutine will receive requests from here, coming from API calls.
}
// called by the goroutine when the connection is closed or breaks.
func (sse sse) unregister() {
sses.Lock()
defer sses.Unlock()
delete(sses.m, sse.ID)
// Drain any pending requests, preventing blocked goroutines from API calls.
for {
select {
case <-sse.Request:
default:
return
}
}
}
func sseRegister(accountName string) sse {
sses.Lock()
defer sses.Unlock()
sses.gen++
v := sse{sses.gen, accountName, make(chan Request, 1)}
sses.m[v.ID] = v
return v
}
// sseGet returns a reference to an existing connection if it exists and user
// has access.
func sseGet(id int64, accountName string) (sse, bool) {
sses.Lock()
defer sses.Unlock()
s := sses.m[id]
if s.AccountName != accountName {
return sse{}, false
}
return s, true
}
// ssetoken is a temporary token that has not yet been used to start an SSE
// connection. Created by Token, consumed by a new SSE connection.
type ssetoken struct {
token string // Uniquely generated.
accName string
address string // Address used to authenticate in call that created the token.
sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
validUntil time.Time
}
// ssetokens maintains unused tokens. We have just one, but it's a type so we
// can define methods.
type ssetokens struct {
sync.Mutex
accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
tokens map[string]ssetoken // Token to details, for finding account for a token.
}
var sseTokens = ssetokens{
accountTokens: map[string][]ssetoken{},
tokens: map[string]ssetoken{},
}
// xgenerate creates and saves a new token. It ensures no more than 10 tokens
// per account exist, removing old ones if needed.
func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
buf := make([]byte, 16)
_, err := cryptrand.Read(buf)
xcheckf(ctx, err, "generating token")
st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
x.Lock()
defer x.Unlock()
n := len(x.accountTokens[accName])
if n >= 10 {
for _, ost := range x.accountTokens[accName][:n-9] {
delete(x.tokens, ost.token)
}
copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
x.accountTokens[accName] = x.accountTokens[accName][:9]
}
x.accountTokens[accName] = append(x.accountTokens[accName], st)
x.tokens[st.token] = st
return st.token
}
// check verifies a token, and consumes it if valid.
func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
x.Lock()
defer x.Unlock()
st, ok := x.tokens[token]
if !ok {
return "", "", "", false, nil
}
delete(x.tokens, token)
if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
return "", "", "", false, errors.New("internal error, could not find token in account")
} else {
copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
if len(x.accountTokens[st.accName]) == 0 {
delete(x.accountTokens, st.accName)
}
}
if time.Now().After(st.validUntil) {
return "", "", "", false, nil
}
return st.accName, st.address, st.sessionToken, true, nil
}
// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
type ioErr struct {
err error
}
// serveEvents serves an SSE connection. Authentication is done through a query
// string parameter "token", a one-time-use token returned by the Token API call.
func serveEvents(ctx context.Context, log mlog.Log, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("internal error: ResponseWriter not a http.Flusher")
http.Error(w, "500 - internal error - cannot sync to http connection", 500)
return
}
q := r.URL.Query()
token := q.Get("token")
if token == "" {
http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
return
}
accName, address, sessionToken, ok, err := sseTokens.check(token)
if err != nil {
http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
return
}
if !ok {
http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
return
}
if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
return
}
// We can simulate a slow SSE connection. It seems firefox doesn't slow down
// incoming responses with its slow-network similation.
var waitMin, waitMax time.Duration
waitMinMsec := q.Get("waitMinMsec")
waitMaxMsec := q.Get("waitMaxMsec")
if waitMinMsec != "" && waitMaxMsec != "" {
if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMin = time.Duration(v) * time.Millisecond
}
if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMax = time.Duration(v) * time.Millisecond
}
}
// Parse the request with initial mailbox/search criteria.
var req Request
dec := json.NewDecoder(strings.NewReader(q.Get("request")))
dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil {
http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
return
} else if req.Page.Count <= 0 {
http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
return
}
if req.Query.Threading == "" {
req.Query.Threading = ThreadOff
}
var writer *eventWriter
metricSSEConnections.Inc()
defer metricSSEConnections.Dec()
// Below here, error handling cause through xcheckf, which panics with
// *sherpa.Error, after which we send an error event to the client. We can also get
// an *ioErr when the connection is broken.
defer func() {
x := recover()
if x == nil {
return
}
if err, ok := x.(*sherpa.Error); ok {
writer.xsendEvent(ctx, log, "fatalErr", err.Message)
} else if _, ok := x.(ioErr); ok {
return
} else {
log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Webmail)
panic(x)
}
}()
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
// We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
// keys), so should be quite compressible.
var out writeFlusher
gz := mox.AcceptsGzip(r)
if gz {
h.Set("Content-Encoding", "gzip")
out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
} else {
out = nopFlusher{w}
}
out = httpFlusher{out, flusher}
// We'll be writing outgoing SSE events through writer.
writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
defer writer.close()
// Fetch initial data.
acc, err := store.OpenAccount(log, accName)
xcheckf(ctx, err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account")
}()
comm := store.RegisterComm(acc)
defer comm.Unregister()
// List addresses that the client can use to send email from.
accConf, _ := acc.Conf()
loginAddr, err := smtp.ParseAddress(address)
xcheckf(ctx, err, "parsing login address")
_, _, dest, err := mox.FindAccount(loginAddr.Localpart, loginAddr.Domain, false)
xcheckf(ctx, err, "looking up destination for login address")
loginName := accConf.FullName
if dest.FullName != "" {
loginName = dest.FullName
}
loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
var addresses []MessageAddress
for a, dest := range accConf.Destinations {
name := dest.FullName
if name == "" {
name = accConf.FullName
}
var ma MessageAddress
if strings.HasPrefix(a, "@") {
dom, err := dns.ParseDomain(a[1:])
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Domain: dom}
} else {
addr, err := smtp.ParseAddress(a)
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
}
addresses = append(addresses, ma)
}
// We implicitly start a query. We use the reqctx for the transaction, because the
// transaction is passed to the query, which can be canceled.
reqctx, reqctxcancel := context.WithCancel(ctx)
defer func() {
// We also cancel in cancelDrain later on, but there is a brief window where the
// context wouldn't be canceled.
if reqctxcancel != nil {
reqctxcancel()
reqctxcancel = nil
}
}()
// qtx is kept around during connection initialization, until we pass it off to the
// goroutine that starts querying for messages.
var qtx *bstore.Tx
defer func() {
if qtx != nil {
err := qtx.Rollback()
log.Check(err, "rolling back")
}
}()
var mbl []store.Mailbox
// We only take the rlock when getting the tx.
acc.WithRLock(func() {
// Now a read-only transaction we'll use during the query.
qtx, err = acc.DB.Begin(reqctx, false)
xcheckf(ctx, err, "begin transaction")
mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
xcheckf(ctx, err, "list mailboxes")
})
// Find the designated mailbox if a mailbox name is set, or there are no filters at all.
var zerofilter Filter
var zeronotfilter NotFilter
var mailbox store.Mailbox
var mailboxPrefixes []string
var matchMailboxes bool
mailboxIDs := map[int64]bool{}
mailboxName := req.Query.Filter.MailboxName
if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
if mailboxName == "" {
mailboxName = "Inbox"
}
var inbox store.Mailbox
for _, e := range mbl {
if e.Name == mailboxName {
mailbox = e
}
if e.Name == "Inbox" {
inbox = e
}
}
if mailbox.ID == 0 {
mailbox = inbox
}
if mailbox.ID == 0 {
xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
}
req.Query.Filter.MailboxID = mailbox.ID
req.Query.Filter.MailboxName = ""
mailboxPrefixes = []string{mailbox.Name + "/"}
matchMailboxes = true
mailboxIDs[mailbox.ID] = true
} else {
matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
}
if req.Query.Filter.MailboxChildrenIncluded {
xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
}
// todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
sse := sseRegister(acc.Name)
defer sse.unregister()
// Per-domain localpart config so webclient can decide if an address belongs to the account.
domainAddressConfigs := map[string]DomainAddressConfig{}
for _, a := range addresses {
dom, _ := mox.Conf.Domain(a.Domain)
domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
}
// Write first event, allowing client to fill its UI with mailboxes.
start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, moxvar.Version}
writer.xsendEvent(ctx, log, "start", start)
// The goroutine doing the querying will send messages on these channels, which
// result in an event being written on the SSE connection.
viewMsgsc := make(chan EventViewMsgs)
viewErrc := make(chan EventViewErr)
viewResetc := make(chan EventViewReset)
donec := make(chan int64) // When request is done.
// Start a view, it determines if we send a change to the client. And start an
// implicit query for messages, we'll send the messages to the client which can
// fill its ui with messages.
v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
qtx = nil // viewRequestTx closes qtx
// When canceling a query, we must drain its messages until it says it is done.
// Otherwise the sending goroutine would hang indefinitely on a channel send.
cancelDrain := func() {
if reqctxcancel != nil {
// Cancel the goroutine doing the querying.
reqctxcancel()
reqctx = nil
reqctxcancel = nil
} else {
return
}
// Drain events until done.
for {
select {
case <-viewMsgsc:
case <-viewErrc:
case <-viewResetc:
case <-donec:
return
}
}
}
// If we stop and a query is in progress, we must drain the channel it will send on.
defer cancelDrain()
// Changes broadcasted by other connections on this account. If applicable for the
// connection/view, we send events.
xprocessChanges := func(changes []store.Change) {
taggedChanges := [][2]any{}
// We get a transaction first time we need it.
var xtx *bstore.Tx
defer func() {
if xtx != nil {
err := xtx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
ensureTx := func() error {
if xtx != nil {
return nil
}
acc.RLock()
defer acc.RUnlock()
var err error
xtx, err = acc.DB.Begin(ctx, false)
return err
}
// This getmsg will now only be called mailboxID+UID, not with messageID set.
// todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
if err := ensureTx(); err != nil {
return store.Message{}, fmt.Errorf("transaction: %v", err)
}
return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
}
// Return uids that are within range in view. Because the end has been reached, or
// because the UID is not after the last message.
xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
uidsAny := make([]any, len(uids))
for i, uid := range uids {
uidsAny[i] = uid
}
err := ensureTx()
xcheckf(ctx, err, "transaction")
q := bstore.QueryTx[store.Message](xtx)
q.FilterNonzero(store.Message{MailboxID: mailboxID})
q.FilterEqual("UID", uidsAny...)
mbOK := v.matchesMailbox(mailboxID)
err = q.ForEach(func(m store.Message) error {
_, thread := v.threadIDs[m.ThreadID]
if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
changedUIDs = append(changedUIDs, m.UID)
}
return nil
})
xcheckf(ctx, err, "fetching messages for change")
return changedUIDs
}
// Forward changes that are relevant to the current view.
for _, change := range changes {
switch c := change.(type) {
case store.ChangeAddUID:
ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
xcheckf(ctx, err, "matching new message against view")
m, err := getmsg(0, c.MailboxID, c.UID)
xcheckf(ctx, err, "get message")
_, thread := v.threadIDs[m.ThreadID]
if !ok && !thread {
continue
}
state := msgState{acc: acc}
mi, err := messageItem(log, m, &state)
state.clear()
xcheckf(ctx, err, "make messageitem")
mi.MatchQuery = ok
mil := []MessageItem{mi}
if !thread && req.Query.Threading != ThreadOff {
err := ensureTx()
xcheckf(ctx, err, "transaction")
more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
mil = append(mil, more...)
v.threadIDs[m.ThreadID] = struct{}{}
}
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
// If message extends the view, store it as such.
if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
v.LastMessageReceived = m.Received
}
case store.ChangeRemoveUIDs:
// We may send changes for uids the client doesn't know, that's fine.
changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgRemove{c}
ch.UIDs = changedUIDs
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
case store.ChangeFlags:
// We may send changes for uids the client doesn't know, that's fine.
changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgFlags{c}
ch.UID = changedUIDs[0]
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
case store.ChangeThread:
// Change in muted/collaped state, just always ship it.
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
case store.ChangeRemoveMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
case store.ChangeAddMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
case store.ChangeRenameMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
case store.ChangeMailboxCounts:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
case store.ChangeMailboxSpecialUse:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
case store.ChangeMailboxKeywords:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
case store.ChangeAddSubscription:
// Webmail does not care about subscriptions.
default:
panic(fmt.Sprintf("missing case for change %T", c))
}
}
if len(taggedChanges) > 0 {
viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
}
}
timer := time.NewTimer(5 * time.Minute) // For keepalives.
defer timer.Stop()
for {
if writer.wrote {
timer.Reset(5 * time.Minute)
writer.wrote = false
}
pending := comm.Pending
if reqctx != nil {
pending = nil
}
select {
case <-mox.Shutdown.Done():
writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
case <-timer.C:
_, err := fmt.Fprintf(out, ": keepalive\n\n")
if err != nil {
log.Errorx("write keepalive", err)
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
out.Flush()
writer.wrote = true
case vm := <-viewMsgsc:
if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
}
if vm.ViewEnd {
v.End = true
}
if len(vm.MessageItems) > 0 {
v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
}
writer.xsendEvent(ctx, log, "viewMsgs", vm)
case ve := <-viewErrc:
if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
}
if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
writer.xsendEvent(ctx, log, "viewErr", ve)
case vr := <-viewResetc:
if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
}
writer.xsendEvent(ctx, log, "viewReset", vr)
case id := <-donec:
if id != v.Request.ID {
panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
}
if reqctxcancel != nil {
reqctxcancel()
}
reqctx = nil
reqctxcancel = nil
case req := <-sse.Request:
if reqctx != nil {
cancelDrain()
}
if req.Cancel {
v = view{req, time.Time{}, false, false, nil, nil}
continue
}
reqctx, reqctxcancel = context.WithCancel(ctx)
stop := func() (stop bool) {
// rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
var rtx *bstore.Tx
var err error
defer func() {
if rtx != nil {
err = rtx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
acc.WithRLock(func() {
rtx, err = acc.DB.Begin(reqctx, false)
})
if err != nil {
reqctxcancel()
reqctx = nil
reqctxcancel = nil
if errors.Is(err, context.Canceled) {
return true
}
err := fmt.Errorf("begin transaction: %v", err)
viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
writer.xsendEvent(ctx, log, "viewErr", viewErr)
return false
}
// Reset view state for new query.
if req.ViewID != v.Request.ViewID {
matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
if req.Query.Filter.MailboxChildrenIncluded {
xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
}
v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
} else {
v.Request = req
}
go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
rtx = nil
return false
}()
if stop {
return
}
case <-pending:
xprocessChanges(comm.Get())
case <-ctx.Done():
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
}
}
// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
// mailboxIDs must or must not match. mailboxPrefixes is for use with
// xgatherMailboxIDs to gather children of the mailboxIDs.
func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
matchMailboxes = true
mailboxIDs = map[int64]bool{}
if f.MailboxID == -1 {
matchMailboxes = false
// Add the trash, junk and account rejects mailbox.
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
mailboxIDs[mb.ID] = true
}
return nil
})
xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
} else if f.MailboxID > 0 {
mb := store.Mailbox{ID: f.MailboxID}
err := tx.Get(&mb)
xcheckf(ctx, err, "get mailbox")
mailboxIDs[f.MailboxID] = true
mailboxPrefixes = []string{mb.Name + "/"}
}
return
}
// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
// Gather more mailboxes to filter on, based on mailboxPrefixes.
if len(mailboxPrefixes) == 0 {
return
}
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
for _, p := range mailboxPrefixes {
if strings.HasPrefix(mb.Name, p) {
mailboxIDs[mb.ID] = true
break
}
}
return nil
})
xcheckf(ctx, err, "gathering mailboxes")
}
// matchesMailbox returns whether a mailbox matches the view.
func (v view) matchesMailbox(mailboxID int64) bool {
return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
}
// inRange returns whether m is within the range for the view, whether a change for
// this message should be sent to the client so it can update its state.
func (v view) inRange(m store.Message) bool {
return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
}
// matches checks if the message, identified by either messageID or mailboxID+UID,
// is in the current "view" (i.e. passing the filters, and if checkRange is set
// also if within the range of sent messages based on sort order and the last seen
// message). getmsg retrieves the message, which may be necessary depending on the
// active filters. Used to determine if a store.Change with a new message should be
// sent, and for the destination and anchor messages in view requests.
func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
var m store.Message
ensureMessage := func() bool {
if m.ID == 0 && rerr == nil {
m, rerr = getmsg(messageID, mailboxID, uid)
}
return rerr == nil
}
q := v.Request.Query
// Warning: Filters must be kept in sync between queryMessage and view.matches.
// Check filters.
if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
return false, rerr
}
// note: anchorMessageID is not relevant for matching.
flagfilter := q.flagFilterFn()
if flagfilter != nil && !flagfilter(flags, keywords) {
return false, rerr
}
if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
return false, rerr
}
if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
return false, rerr
}
if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
return false, rerr
}
if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
return false, rerr
}
state := msgState{acc: acc}
defer func() {
if rerr == nil && state.err != nil {
rerr = state.err
}
state.clear()
}()
attachmentFilter := q.attachmentFilterFn(log, acc, &state)
if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
return false, rerr
}
envFilter := q.envFilterFn(log, &state)
if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
return false, rerr
}
headerFilter := q.headerFilterFn(log, &state)
if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
return false, rerr
}
wordsFilter := q.wordsFilterFn(log, &state)
if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
return false, rerr
}
// Now check that we are either within the sorting order, or "last" was sent.
if !checkRange || v.End || ensureMessage() && v.inRange(m) {
return true, rerr
}
return false, rerr
}
type msgResp struct {
err error // If set, an error happened and fields below are not set.
reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
}
// viewRequestTx executes a request (query with filters, pagination) by
// launching a new goroutine with queryMessages, receiving results as msgResp,
// and sending Event* to the SSE connection.
//
// It always closes tx.
func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
defer func() {
err := tx.Rollback()
log.Check(err, "rolling back query transaction")
donec <- v.Request.ID
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Webmailrequest)
}
}()
var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
var parsedMessage *ParsedMessage
var viewEnd bool
var immediate bool // No waiting, flush immediate.
t := time.NewTimer(300 * time.Millisecond)
defer t.Stop()
sendViewMsgs := func(force bool) {
if len(msgitems) == 0 && !force {
return
}
immediate = false
msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
msgitems = nil
parsedMessage = nil
t.Reset(300 * time.Millisecond)
}
// todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
mrc := make(chan msgResp, 1)
go queryMessages(ctx, log, acc, tx, v, mrc)
for {
select {
case mr, ok := <-mrc:
if !ok {
sendViewMsgs(false)
// Empty message list signals this query is done.
msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
return
}
if mr.err != nil {
sendViewMsgs(false)
errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
return
}
if mr.reset {
resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
continue
}
if mr.viewEnd {
viewEnd = true
sendViewMsgs(true)
return
}
msgitems = append(msgitems, mr.mil)
if mr.pm != nil {
parsedMessage = mr.pm
}
if immediate {
sendViewMsgs(true)
}
case <-t.C:
if len(msgitems) == 0 {
// Nothing to send yet. We'll send immediately when the next message comes in.
immediate = true
} else {
sendViewMsgs(false)
}
}
}
}
// queryMessages executes a query, with filter, pagination, destination message id
// to fetch (the message that the client had in view and wants to display again).
// It sends on msgc, with several types of messages: errors, whether the view is
// reset due to missing AnchorMessageID, and when the end of the view was reached
// and/or for a message.
func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
defer func() {
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
debug.PrintStack()
mrc <- msgResp{err: fmt.Errorf("query failed")}
metrics.PanicInc(metrics.Webmailquery)
}
close(mrc)
}()
query := v.Request.Query
page := v.Request.Page
// Warning: Filters must be kept in sync between queryMessage and view.matches.
checkMessage := func(id int64) (valid bool, rerr error) {
m := store.Message{ID: id}
err := tx.Get(&m)
if err == bstore.ErrAbsent || err == nil && m.Expunged {
return false, nil
} else if err != nil {
return false, err
} else {
return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
return m, nil
})
}
}
// Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
if page.AnchorMessageID > 0 {
// Check if message exists and (still) matches the filter.
// todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
if valid, err := checkMessage(page.AnchorMessageID); err != nil {
mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
return
} else if !valid {
mrc <- msgResp{reset: true}
page.AnchorMessageID = 0
}
}
// Check if page.DestMessageID exists and matches filter. If not, we will ignore
// it instead of continuing to send message till the end of the view.
if page.DestMessageID > 0 {
if valid, err := checkMessage(page.DestMessageID); err != nil {
mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
return
} else if !valid {
page.DestMessageID = 0
}
}
// todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
if len(v.mailboxIDs) > 0 {
if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
// Should result in fast indexed query.
for mbID := range v.mailboxIDs {
q.FilterNonzero(store.Message{MailboxID: mbID})
}
} else {
idsAny := make([]any, 0, len(v.mailboxIDs))
for mbID := range v.mailboxIDs {
idsAny = append(idsAny, mbID)
}
if v.matchMailboxIDs {
q.FilterEqual("MailboxID", idsAny...)
} else {
q.FilterNotEqual("MailboxID", idsAny...)
}
}
}
// If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
if page.AnchorMessageID > 0 {
var seen = false
q.FilterFn(func(m store.Message) bool {
if seen {
return true
}
seen = m.ID == page.AnchorMessageID
return false
})
}
// We may be added filters the the query below. The FilterFn signature does not
// implement reporting errors, or anything else, just a bool. So when making the
// filter functions, we give them a place to store parsed message state, and an
// error. We check the error during and after query execution.
state := msgState{acc: acc}
defer state.clear()
flagfilter := query.flagFilterFn()
if flagfilter != nil {
q.FilterFn(func(m store.Message) bool {
return flagfilter(m.Flags, m.Keywords)
})
}
if query.Filter.Oldest != nil {
q.FilterGreaterEqual("Received", *query.Filter.Oldest)
}
if query.Filter.Newest != nil {
q.FilterLessEqual("Received", *query.Filter.Newest)
}
if query.Filter.SizeMin > 0 {
q.FilterGreaterEqual("Size", query.Filter.SizeMin)
}
if query.Filter.SizeMax > 0 {
q.FilterLessEqual("Size", query.Filter.SizeMax)
}
attachmentFilter := query.attachmentFilterFn(log, acc, &state)
if attachmentFilter != nil {
q.FilterFn(attachmentFilter)
}
envFilter := query.envFilterFn(log, &state)
if envFilter != nil {
q.FilterFn(envFilter)
}
headerFilter := query.headerFilterFn(log, &state)
if headerFilter != nil {
q.FilterFn(headerFilter)
}
wordsFilter := query.wordsFilterFn(log, &state)
if wordsFilter != nil {
q.FilterFn(wordsFilter)
}
if query.OrderAsc {
q.SortAsc("Received")
} else {
q.SortDesc("Received")
}
found := page.DestMessageID <= 0
end := true
have := 0
err := q.ForEach(func(m store.Message) error {
// Check for an error in one of the filters, propagate it.
if state.err != nil {
return state.err
}
if have >= page.Count && found || have > 10000 {
end = false
return bstore.StopForEach
}
if _, ok := v.threadIDs[m.ThreadID]; ok {
// Message was already returned as part of a thread.
return nil
}
var pm *ParsedMessage
if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
// For threads, if there was not DestMessageID, we may be getting the newest
// message. For an initial view, this isn't necessarily the first the user is
// expected to read first, that would be the first unread, which we'll get below
// when gathering the thread.
found = true
xpm, err := parsedMessage(log, m, &state, true, false)
if err != nil {
return fmt.Errorf("parsing message %d: %v", m.ID, err)
}
pm = &xpm
}
mi, err := messageItem(log, m, &state)
if err != nil {
return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
}
mil := []MessageItem{mi}
if query.Threading != ThreadOff {
more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
if err != nil {
return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
}
if xpm != nil {
pm = xpm
found = true
}
mil = append(mil, more...)
v.threadIDs[m.ThreadID] = struct{}{}
// Calculate how many messages the frontend is going to show, and only count those as returned.
collapsed := map[int64]bool{}
for _, mi := range mil {
collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
}
unread := map[int64]bool{} // Propagated to thread root.
if query.Threading == ThreadUnread {
for _, mi := range mil {
m := mi.Message
if m.Seen {
continue
}
unread[m.ID] = true
for _, id := range m.ThreadParentIDs {
unread[id] = true
}
}
}
for _, mi := range mil {
m := mi.Message
threadRoot := true
rootID := m.ID
for _, id := range m.ThreadParentIDs {
if _, ok := collapsed[id]; ok {
threadRoot = false
rootID = id
}
}
if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
have++
}
}
} else {
have++
}
mrc <- msgResp{mil: mil, pm: pm}
return nil
})
// Check for an error in one of the filters again. Check in ForEach would not
// trigger if the last message has the error.
if err == nil && state.err != nil {
err = state.err
}
if err != nil {
mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
return
}
if end {
mrc <- msgResp{viewEnd: true}
}
}
func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
if m.ThreadID == 0 {
// If we would continue, FilterNonzero would fail because there are no non-zero fields.
return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
}
// Fetch other messages for this thread.
qt := bstore.QueryTx[store.Message](tx)
qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
qt.FilterEqual("Expunged", false)
qt.FilterNotEqual("ID", m.ID)
qt.SortAsc("ID")
tml, err := qt.List()
if err != nil {
return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
}
var mil []MessageItem
var pm *ParsedMessage
var firstUnread bool
for _, tm := range tml {
err := func() error {
xstate := msgState{acc: acc}
defer xstate.clear()
mi, err := messageItem(log, tm, &xstate)
if err != nil {
return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
}
mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
return tm, nil
})
if err != nil {
return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
}
mil = append(mil, mi)
if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
firstUnread = !tm.Seen
xpm, err := parsedMessage(log, tm, &xstate, true, false)
if err != nil {
return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
}
pm = &xpm
}
return nil
}()
if err != nil {
return nil, nil, err
}
}
// Finally, the message that caused us to gather this thread (which is likely the
// most recent message in the thread) could be the only unread message.
if destMessageID == 0 && first && !m.Seen && !firstUnread {
xstate := msgState{acc: acc}
defer xstate.clear()
xpm, err := parsedMessage(log, m, &xstate, true, false)
if err != nil {
return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
}
pm = &xpm
}
return mil, pm, nil
}
// While checking the filters on a message, we may need to get more message
// details as each filter passes. We check the filters that need the basic
// information first, and load and cache more details for the next filters.
// msgState holds parsed details for a message, it is updated while filtering,
// with more information or reset for a next message.
type msgState struct {
acc *store.Account // Never changes during lifetime.
err error // Once set, doesn't get cleared.
m store.Message
part *message.Part // Will be without Reader when msgr is nil.
msgr *store.MsgReader
}
func (ms *msgState) clear() {
if ms.msgr != nil {
ms.msgr.Close()
ms.msgr = nil
}
*ms = msgState{acc: ms.acc, err: ms.err}
}
func (ms *msgState) ensureMsg(m store.Message) {
if m.ID != ms.m.ID {
ms.clear()
}
ms.m = m
}
func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
ms.ensureMsg(m)
if ms.err == nil {
if ms.part == nil {
if m.ParsedBuf == nil {
ms.err = fmt.Errorf("message %d not parsed", m.ID)
return false
}
var p message.Part
if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
return false
}
ms.part = &p
}
if withMsgReader && ms.msgr == nil {
ms.msgr = ms.acc.MessageReader(m)
ms.part.SetReaderAt(ms.msgr)
}
}
return ms.part != nil
}
// flagFilterFn returns a function that applies the flag/keyword/"label"-related
// filters for a query. A nil function is returned if there are no flags to filter
// on.
func (q Query) flagFilterFn() func(store.Flags, []string) bool {
labels := map[string]bool{}
for _, k := range q.Filter.Labels {
labels[k] = true
}
for _, k := range q.NotFilter.Labels {
labels[k] = false
}
if len(labels) == 0 {
return nil
}
var mask, flags store.Flags
systemflags := map[string][]*bool{
`\answered`: {&mask.Answered, &flags.Answered},
`\flagged`: {&mask.Flagged, &flags.Flagged},
`\deleted`: {&mask.Deleted, &flags.Deleted},
`\seen`: {&mask.Seen, &flags.Seen},
`\draft`: {&mask.Draft, &flags.Draft},
`$junk`: {&mask.Junk, &flags.Junk},
`$notjunk`: {&mask.Notjunk, &flags.Notjunk},
`$forwarded`: {&mask.Forwarded, &flags.Forwarded},
`$phishing`: {&mask.Phishing, &flags.Phishing},
`$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
}
keywords := map[string]bool{}
for k, v := range labels {
k = strings.ToLower(k)
if mf, ok := systemflags[k]; ok {
*mf[0] = true
*mf[1] = v
} else {
keywords[k] = v
}
}
return func(msgFlags store.Flags, msgKeywords []string) bool {
var f store.Flags
if f.Set(mask, msgFlags) != flags {
return false
}
for k, v := range keywords {
if slices.Contains(msgKeywords, k) != v {
return false
}
}
return true
}
}
// attachmentFilterFn returns a function that filters for the attachment-related
// filter from the query. A nil function is returned if there are attachment
// filters.
func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
return nil
}
return func(m store.Message) bool {
if !state.ensurePart(m, false) {
return false
}
types, err := attachmentTypes(log, m, state)
if err != nil {
state.err = err
return false
}
return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
}
}
var attachmentMimetypes = map[string]AttachmentType{
"application/pdf": AttachmentPDF,
"application/zip": AttachmentArchive,
"application/x-rar-compressed": AttachmentArchive,
"application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
"application/vnd.ms-excel": AttachmentSpreadsheet,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
"application/vnd.oasis.opendocument.text": AttachmentDocument,
"application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
"application/vnd.ms-powerpoint": AttachmentPresentation,
"application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
}
var attachmentExtensions = map[string]AttachmentType{
".pdf": AttachmentPDF,
".zip": AttachmentArchive,
".tar": AttachmentArchive,
".tgz": AttachmentArchive,
".tar.gz": AttachmentArchive,
".tbz2": AttachmentArchive,
".tar.bz2": AttachmentArchive,
".tar.lz": AttachmentArchive,
".tlz": AttachmentArchive,
".tar.xz": AttachmentArchive,
".txz": AttachmentArchive,
".tar.zst": AttachmentArchive,
".tar.lz4": AttachmentArchive,
".7z": AttachmentArchive,
".rar": AttachmentArchive,
".ods": AttachmentSpreadsheet,
".xls": AttachmentSpreadsheet,
".xlsx": AttachmentSpreadsheet,
".odt": AttachmentDocument,
".doc": AttachmentDocument,
".docx": AttachmentDocument,
".odp": AttachmentPresentation,
".ppt": AttachmentPresentation,
".pptx": AttachmentPresentation,
}
func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
types := map[AttachmentType]bool{}
pm, err := parsedMessage(log, m, state, false, false)
if err != nil {
return nil, fmt.Errorf("parsing message for attachments: %w", err)
}
for _, a := range pm.attachments {
if a.Part.MediaType == "IMAGE" {
types[AttachmentImage] = true
continue
}
mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
if t, ok := attachmentMimetypes[mt]; ok {
types[t] = true
} else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
types[t] = true
} else {
continue
}
}
}
if len(types) == 0 {
types[AttachmentNone] = true
} else {
types[AttachmentAny] = true
}
return types, nil
}
// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
// clash with SMTP envelope) for the query. A nil function is returned if no
// filtering is needed.
func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
return nil
}
lower := func(l []string) []string {
if len(l) == 0 {
return nil
}
r := make([]string, len(l))
for i, s := range l {
r[i] = strings.ToLower(s)
}
return r
}
filterSubject := lower(q.Filter.Subject)
notFilterSubject := lower(q.NotFilter.Subject)
filterFrom := lower(q.Filter.From)
notFilterFrom := lower(q.NotFilter.From)
filterTo := lower(q.Filter.To)
notFilterTo := lower(q.NotFilter.To)
return func(m store.Message) bool {
if !state.ensurePart(m, false) {
return false
}
var env message.Envelope
if state.part.Envelope != nil {
env = *state.part.Envelope
}
if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
subject := strings.ToLower(env.Subject)
for _, s := range filterSubject {
if !strings.Contains(subject, s) {
return false
}
}
for _, s := range notFilterSubject {
if strings.Contains(subject, s) {
return false
}
}
}
contains := func(textLower []string, l []message.Address, all bool) bool {
next:
for _, s := range textLower {
for _, a := range l {
name := strings.ToLower(a.Name)
addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
if strings.Contains(name, s) || strings.Contains(addr, s) {
if !all {
return true
}
continue next
}
}
if all {
return false
}
}
return all
}
if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
return false
}
if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
return false
}
if len(filterTo) > 0 || len(notFilterTo) > 0 {
to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
if len(filterTo) > 0 && !contains(filterTo, to, true) {
return false
}
if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
return false
}
}
return true
}
}
// headerFilterFn returns a function that filters for the header filters in the
// query. A nil function is returned if there are no header filters.
func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.Headers) == 0 {
return nil
}
lowerValues := make([]string, len(q.Filter.Headers))
for i, t := range q.Filter.Headers {
lowerValues[i] = strings.ToLower(t[1])
}
return func(m store.Message) bool {
if !state.ensurePart(m, true) {
return false
}
hdr, err := state.part.Header()
if err != nil {
state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
return false
}
next:
for i, t := range q.Filter.Headers {
k := t[0]
v := lowerValues[i]
l := hdr.Values(k)
if v == "" && len(l) > 0 {
continue
}
for _, e := range l {
if strings.Contains(strings.ToLower(e), v) {
continue next
}
}
return false
}
return true
}
}
// wordFiltersFn returns a function that applies the word filters of the query. A
// nil function is returned when query does not contain a word filter.
func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
return nil
}
ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
return func(m store.Message) bool {
if !state.ensurePart(m, true) {
return false
}
if ok, err := ws.MatchPart(log, state.part, true); err != nil {
state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
return false
} else {
return ok
}
}
}