mox/webmail/view.go
Mechiel Lukkien 849b4ec9e9
add webmail
it was far down on the roadmap, but implemented earlier, because it's
interesting, and to help prepare for a jmap implementation. for jmap we need to
implement more client-like functionality than with just imap. internal data
structures need to change. jmap has lots of other requirements, so it's already
a big project. by implementing a webmail now, some of the required data
structure changes become clear and can be made now, so the later jmap
implementation can do things similarly to the webmail code. the webmail
frontend and webmail are written together, making their interface/api much
smaller and simpler than jmap.

one of the internal changes is that we now keep track of per-mailbox
total/unread/unseen/deleted message counts and mailbox sizes.  keeping this
data consistent after any change to the stored messages (through the code base)
is tricky, so mox now has a consistency check that verifies the counts are
correct, which runs only during tests, each time an internal account reference
is closed. we have a few more internal "changes" that are propagated for the
webmail frontend (that imap doesn't have a way to propagate on a connection),
like changes to the special-use flags on mailboxes, and used keywords in a
mailbox. more changes that will be required have revealed themselves while
implementing the webmail, and will be implemented next.

the webmail user interface is modeled after the mail clients i use or have
used: thunderbird, macos mail, mutt; and webmails i normally only use for
testing: gmail, proton, yahoo, outlook. a somewhat technical user is assumed,
but still the goal is to make this webmail client easy to use for everyone. the
user interface looks like most other mail clients: a list of mailboxes, a
search bar, a message list view, and message details. there is a top/bottom and
a left/right layout for the list/message view, default is automatic based on
screen size. the panes can be resized by the user. buttons for actions are just
text, not icons. clicking a button briefly shows the shortcut for the action in
the bottom right, helping with learning to operate quickly. any text that is
underdotted has a title attribute that causes more information to be displayed,
e.g. what a button does or a field is about. to highlight potential phishing
attempts, any text (anywhere in the webclient) that switches unicode "blocks"
(a rough approximation to (language) scripts) within a word is underlined
orange. multiple messages can be selected with familiar ui interaction:
clicking while holding control and/or shift keys.  keyboard navigation works
with arrows/page up/down and home/end keys, and also with a few basic vi-like
keys for list/message navigation. we prefer showing the text instead of
html (with inlined images only) version of a message. html messages are shown
in an iframe served from an endpoint with CSP headers to prevent dangerous
resources (scripts, external images) from being loaded. the html is also
sanitized, with javascript removed. a user can choose to load external
resources (e.g. images for tracking purposes).

the frontend is just (strict) typescript, no external frameworks. all
incoming/outgoing data is typechecked, both the api request parameters and
response types, and the data coming in over SSE. the types and checking code
are generated with sherpats, which uses the api definitions generated by
sherpadoc based on the Go code. so types from the backend are automatically
propagated to the frontend.  since there is no framework to automatically
propagate properties and rerender components, changes coming in over the SSE
connection are propagated explicitly with regular function calls.  the ui is
separated into "views", each with a "root" dom element that is added to the
visible document. these views have additional functions for getting changes
propagated, often resulting in the view updating its (internal) ui state (dom).
we keep the frontend compilation simple, it's just a few typescript files that
get compiled (combined and types stripped) into a single js file, no additional
runtime code needed or complicated build processes used.  the webmail is served
is served from a compressed, cachable html file that includes style and the
javascript, currently just over 225kb uncompressed, under 60kb compressed (not
minified, including comments). we include the generated js files in the
repository, to keep Go's easily buildable self-contained binaries.

authentication is basic http, as with the account and admin pages. most data
comes in over one long-term SSE connection to the backend. api requests signal
which mailbox/search/messages are requested over the SSE connection. fetching
individual messages, and making changes, are done through api calls. the
operations are similar to imap, so some code has been moved from package
imapserver to package store. the future jmap implementation will benefit from
these changes too. more functionality will probably be moved to the store
package in the future.

the quickstart enables webmail on the internal listener by default (for new
installs). users can enable it on the public listener if they want to. mox
localserve enables it too. to enable webmail on existing installs, add settings
like the following to the listeners in mox.conf, similar to AccountHTTP(S):

	WebmailHTTP:
		Enabled: true
	WebmailHTTPS:
		Enabled: true

special thanks to liesbeth, gerben, andrii for early user feedback.

there is plenty still to do, see the list at the top of webmail/webmail.ts.
feedback welcome as always.
2023-08-07 21:57:03 +02:00

1789 lines
55 KiB
Go

package webmail
// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
import (
"compress/gzip"
"context"
cryptrand "crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"path/filepath"
"reflect"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/exp/slices"
"github.com/mjl-/bstore"
"github.com/mjl-/sherpa"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/store"
)
// Request is a request to an SSE connection to send messages, either for a new
// view, to continue with an existing view, or to a cancel an ongoing request.
type Request struct {
ID int64
SSEID int64 // SSE connection.
// To indicate a request is a continuation (more results) of the previous view.
// Echoed in events, client checks if it is getting results for the latest request.
ViewID int64
// If set, this request and its view are canceled. A new view must be started.
Cancel bool
Query Query
Page Page
}
// Query is a request for messages that match filters, in a given order.
type Query struct {
OrderAsc bool // Order by received ascending or desending.
Filter Filter
NotFilter NotFilter
}
// AttachmentType is for filtering by attachment type.
type AttachmentType string
const (
AttachmentIndifferent AttachmentType = ""
AttachmentNone AttachmentType = "none"
AttachmentAny AttachmentType = "any"
AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
AttachmentPDF AttachmentType = "pdf"
AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
AttachmentDocument AttachmentType = "document" // odt, docx, ...
AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
)
// Filter selects the messages to return. Fields that are set must all match,
// for slices each element by match ("and").
type Filter struct {
// If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
MailboxID int64
// If true, also submailboxes are included in the search.
MailboxChildrenIncluded bool
// In case client doesn't know mailboxes and their IDs yet. Only used during sse
// connection setup, where it is turned into a MailboxID. Filtering only looks at
// MailboxID.
MailboxName string
Words []string // Case insensitive substring match for each string.
From []string
To []string // Including Cc and Bcc.
Oldest *time.Time
Newest *time.Time
Subject []string
Attachments AttachmentType
Labels []string
Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
SizeMin int64
SizeMax int64
}
// NotFilter matches messages that don't match these fields.
type NotFilter struct {
Words []string
From []string
To []string
Subject []string
Attachments AttachmentType
Labels []string
}
// Page holds pagination parameters for a request.
type Page struct {
// Start returning messages after this ID, if > 0. For pagination, fetching the
// next set of messages.
AnchorMessageID int64
// Number of messages to return, must be >= 1, we never return more than 10000 for
// one request.
Count int
// If > 0, return messages until DestMessageID is found. More than Count messages
// can be returned. For long-running searches, it may take a while before this
// message if found.
DestMessageID int64
}
// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
// included.
type MessageAddress struct {
Name string // Free-form name for display in mail applications.
User string // Localpart, encoded.
Domain dns.Domain
}
// MessageEnvelope is like message.Envelope, as used in message.Part, but including
// unicode host names for IDNA names.
type MessageEnvelope struct {
// todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
Date time.Time
Subject string
From []MessageAddress
Sender []MessageAddress
ReplyTo []MessageAddress
To []MessageAddress
CC []MessageAddress
BCC []MessageAddress
InReplyTo string
MessageID string
}
// MessageItem is sent by queries, it has derived information analyzed from
// message.Part, made for the needs of the message items in the message list.
// messages.
type MessageItem struct {
Message store.Message // Without ParsedBuf and MsgPrefix, for size.
Envelope MessageEnvelope
Attachments []Attachment
IsSigned bool
IsEncrypted bool
FirstLine string // Of message body, for showing as preview.
}
// ParsedMessage has more parsed/derived information about a message, intended
// for rendering the (contents of the) message. Information from MessageItem is
// not duplicated.
type ParsedMessage struct {
ID int64
Part message.Part
Headers map[string][]string
// Text parts, can be empty.
Texts []string
// Whether there is an HTML part. The webclient renders HTML message parts through
// an iframe and a separate request with strict CSP headers to prevent script
// execution and loading of external resources, which isn't possible when loading
// in iframe with inline HTML because not all browsers support the iframe csp
// attribute.
HasHTML bool
ListReplyAddress *MessageAddress // From List-Post.
// Information used by MessageItem, not exported in this type.
envelope MessageEnvelope
attachments []Attachment
isSigned bool
isEncrypted bool
firstLine string
}
// EventStart is the first message sent on an SSE connection, giving the client
// basic data to populate its UI. After this event, messages will follow quickly in
// an EventViewMsgs event.
type EventStart struct {
SSEID int64
LoginAddress MessageAddress
Addresses []MessageAddress
DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
MailboxName string
Mailboxes []store.Mailbox
}
// DomainAddressConfig has the address (localpart) configuration for a domain, so
// the webmail client can decide if an address matches the addresses of the
// account.
type DomainAddressConfig struct {
LocalpartCatchallSeparator string // Can be empty.
LocalpartCaseSensitive bool
}
// EventViewMsgs contains messages for a view, possibly a continuation of an
// earlier list of messages.
type EventViewMsgs struct {
ViewID int64
RequestID int64
MessageItems []MessageItem // If empty, this was the last message for the request.
ParsedMessage *ParsedMessage // If set, will match the target page.DestMessageID from the request.
// If set, there are no more messages in this view at this moment. Messages can be
// added, typically via Change messages, e.g. for new deliveries.
ViewEnd bool
}
// EventViewErr indicates an error during a query for messages. The request is
// aborted, no more request-related messages will be sent until the next request.
type EventViewErr struct {
ViewID int64
RequestID int64
Err string // To be displayed in client.
err error // Original message, for checking against context.Canceled.
}
// EventViewReset indicates that a request for the next set of messages in a few
// could not be fulfilled, e.g. because the anchor message does not exist anymore.
// The client should clear its list of messages. This can happen before
// EventViewMsgs events are sent.
type EventViewReset struct {
ViewID int64
RequestID int64
}
// EventViewChanges contain one or more changes relevant for the client, either
// with new mailbox total/unseen message counts, or messages added/removed/modified
// (flags) for the current view.
type EventViewChanges struct {
ViewID int64
Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
}
// ChangeMsgAdd adds a new message to the view.
type ChangeMsgAdd struct {
store.ChangeAddUID
MessageItem MessageItem
}
// ChangeMsgRemove removes one or more messages from the view.
type ChangeMsgRemove struct {
store.ChangeRemoveUIDs
}
// ChangeMsgFlags updates flags for one message.
type ChangeMsgFlags struct {
store.ChangeFlags
}
// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
type ChangeMailboxRemove struct {
store.ChangeRemoveMailbox
}
// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
type ChangeMailboxAdd struct {
Mailbox store.Mailbox
}
// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
// It could be under a new parent.
type ChangeMailboxRename struct {
store.ChangeRenameMailbox
}
// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
type ChangeMailboxCounts struct {
store.ChangeMailboxCounts
}
// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
type ChangeMailboxSpecialUse struct {
store.ChangeMailboxSpecialUse
}
// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
// a message was added with a keyword that wasn't in the mailbox yet.
type ChangeMailboxKeywords struct {
store.ChangeMailboxKeywords
}
// View holds the information about the returned data for a query. It is used to
// determine whether mailbox changes should be sent to the client, we only send
// addition/removal/flag-changes of messages that are in view, or would extend it
// if the view is at the end of the results.
type view struct {
Request Request
// Last message we sent to the client. We use it to decide if a newly delivered
// message is within the view and the client should get a notification.
LastMessage store.Message
// If set, the last message in the query view has been sent. There is no need to do
// another query, it will not return more data. Used to decide if an event for a
// new message should be sent.
End bool
// Whether message must or must not match mailboxIDs.
matchMailboxIDs bool
// Mailboxes to match, can be multiple, for matching children. If empty, there is
// no filter on mailboxes.
mailboxIDs map[int64]bool
}
// sses tracks all sse connections, and access to them.
var sses = struct {
sync.Mutex
gen int64
m map[int64]sse
}{m: map[int64]sse{}}
// sse represents an sse connection.
type sse struct {
ID int64 // Also returned in EventStart and used in Request to identify the request.
AccountName string // Used to check the authenticated user has access to the SSE connection.
Request chan Request // Goroutine will receive requests from here, coming from API calls.
}
// called by the goroutine when the connection is closed or breaks.
func (sse sse) unregister() {
sses.Lock()
defer sses.Unlock()
delete(sses.m, sse.ID)
// Drain any pending requests, preventing blocked goroutines from API calls.
for {
select {
case <-sse.Request:
default:
return
}
}
}
func sseRegister(accountName string) sse {
sses.Lock()
defer sses.Unlock()
sses.gen++
v := sse{sses.gen, accountName, make(chan Request, 1)}
sses.m[v.ID] = v
return v
}
// sseGet returns a reference to an existing connection if it exists and user
// has access.
func sseGet(id int64, accountName string) (sse, bool) {
sses.Lock()
defer sses.Unlock()
s := sses.m[id]
if s.AccountName != accountName {
return sse{}, false
}
return s, true
}
// ssetoken is a temporary token that has not yet been used to start an SSE
// connection. Created by Token, consumed by a new SSE connection.
type ssetoken struct {
token string // Uniquely generated.
accName string
address string // Address used to authenticate in call that created the token.
validUntil time.Time
}
// ssetokens maintains unused tokens. We have just one, but it's a type so we
// can define methods.
type ssetokens struct {
sync.Mutex
accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
tokens map[string]ssetoken // Token to details, for finding account for a token.
}
var sseTokens = ssetokens{
accountTokens: map[string][]ssetoken{},
tokens: map[string]ssetoken{},
}
// xgenerate creates and saves a new token. It ensures no more than 10 tokens
// per account exist, removing old ones if needed.
func (x *ssetokens) xgenerate(ctx context.Context, accName, address string) string {
buf := make([]byte, 16)
_, err := cryptrand.Read(buf)
xcheckf(ctx, err, "generating token")
st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, time.Now().Add(time.Minute)}
x.Lock()
defer x.Unlock()
n := len(x.accountTokens[accName])
if n >= 10 {
for _, ost := range x.accountTokens[accName][:n-9] {
delete(x.tokens, ost.token)
}
copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
x.accountTokens[accName] = x.accountTokens[accName][:9]
}
x.accountTokens[accName] = append(x.accountTokens[accName], st)
x.tokens[st.token] = st
return st.token
}
// check verifies a token, and consumes it if valid.
func (x *ssetokens) check(token string) (string, string, bool, error) {
x.Lock()
defer x.Unlock()
st, ok := x.tokens[token]
if !ok {
return "", "", false, nil
}
delete(x.tokens, token)
if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
return "", "", false, errors.New("internal error, could not find token in account")
} else {
copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
if len(x.accountTokens[st.accName]) == 0 {
delete(x.accountTokens, st.accName)
}
}
if time.Now().After(st.validUntil) {
return "", "", false, nil
}
return st.accName, st.address, true, nil
}
// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
type ioErr struct {
err error
}
// serveEvents serves an SSE connection. Authentication is done through a query
// string parameter "token", a one-time-use token returned by the Token API call.
func serveEvents(ctx context.Context, log *mlog.Log, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("internal error: ResponseWriter not a http.Flusher")
http.Error(w, "500 - internal error - cannot sync to http connection", 500)
return
}
q := r.URL.Query()
token := q.Get("token")
if token == "" {
http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
return
}
accName, address, ok, err := sseTokens.check(token)
if err != nil {
http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
return
}
if !ok {
http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
return
}
// We can simulate a slow SSE connection. It seems firefox doesn't slow down
// incoming responses with its slow-network similation.
var waitMin, waitMax time.Duration
waitMinMsec := q.Get("waitMinMsec")
waitMaxMsec := q.Get("waitMaxMsec")
if waitMinMsec != "" && waitMaxMsec != "" {
if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMin = time.Duration(v) * time.Millisecond
}
if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
return
} else {
waitMax = time.Duration(v) * time.Millisecond
}
}
// Parse the request with initial mailbox/search criteria.
var req Request
dec := json.NewDecoder(strings.NewReader(q.Get("request")))
dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil {
http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
return
} else if req.Page.Count <= 0 {
http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
return
}
var writer *eventWriter
metricSSEConnections.Inc()
defer metricSSEConnections.Dec()
// Below here, error handling cause through xcheckf, which panics with
// *sherpa.Error, after which we send an error event to the client. We can also get
// an *ioErr when the connection is broken.
defer func() {
x := recover()
if x == nil {
return
}
if err, ok := x.(*sherpa.Error); ok {
writer.xsendEvent(ctx, log, "fatalErr", err.Message)
} else if _, ok := x.(ioErr); ok {
return
} else {
log.WithContext(ctx).Error("serveEvents panic", mlog.Field("err", x))
debug.PrintStack()
metrics.PanicInc("webmail")
panic(x)
}
}()
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
// We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
// keys), so should be quite compressible.
var out writeFlusher
gz := acceptsGzip(r)
if gz {
h.Set("Content-Encoding", "gzip")
out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
} else {
out = nopFlusher{w}
}
out = httpFlusher{out, flusher}
// We'll be writing outgoing SSE events through writer.
writer = newEventWriter(out, waitMin, waitMax)
defer writer.close()
// Fetch initial data.
acc, err := store.OpenAccount(accName)
xcheckf(ctx, err, "open account")
defer func() {
err := acc.Close()
log.Check(err, "closing account")
}()
comm := store.RegisterComm(acc)
defer comm.Unregister()
// List addresses that the client can use to send email from.
accConf, _ := acc.Conf()
loginAddr, err := smtp.ParseAddress(address)
xcheckf(ctx, err, "parsing login address")
_, _, dest, err := mox.FindAccount(loginAddr.Localpart, loginAddr.Domain, false)
xcheckf(ctx, err, "looking up destination for login address")
loginName := accConf.FullName
if dest.FullName != "" {
loginName = dest.FullName
}
loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
var addresses []MessageAddress
for a, dest := range accConf.Destinations {
name := dest.FullName
if name == "" {
name = accConf.FullName
}
var ma MessageAddress
if strings.HasPrefix(a, "@") {
dom, err := dns.ParseDomain(a[1:])
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Domain: dom}
} else {
addr, err := smtp.ParseAddress(a)
xcheckf(ctx, err, "parsing destination address for account")
ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
}
addresses = append(addresses, ma)
}
// We implicitly start a query. We use the reqctx for the transaction, because the
// transaction is passed to the query, which can be canceled.
reqctx, reqctxcancel := context.WithCancel(ctx)
defer func() {
// We also cancel in cancelDrain later on, but there is a brief window where the
// context wouldn't be canceled.
if reqctxcancel != nil {
reqctxcancel()
reqctxcancel = nil
}
}()
// qtx is kept around during connection initialization, until we pass it off to the
// goroutine that starts querying for messages.
var qtx *bstore.Tx
defer func() {
if qtx != nil {
err := qtx.Rollback()
log.Check(err, "rolling back")
}
}()
var mbl []store.Mailbox
// We only take the rlock when getting the tx.
acc.WithRLock(func() {
// Now a read-only transaction we'll use during the query.
qtx, err = acc.DB.Begin(reqctx, false)
xcheckf(ctx, err, "begin transaction")
mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
xcheckf(ctx, err, "list mailboxes")
})
// Find the designated mailbox if a mailbox name is set, or there are no filters at all.
var zerofilter Filter
var zeronotfilter NotFilter
var mailbox store.Mailbox
var mailboxPrefixes []string
var matchMailboxes bool
mailboxIDs := map[int64]bool{}
mailboxName := req.Query.Filter.MailboxName
if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
if mailboxName == "" {
mailboxName = "Inbox"
}
var inbox store.Mailbox
for _, e := range mbl {
if e.Name == mailboxName {
mailbox = e
}
if e.Name == "Inbox" {
inbox = e
}
}
if mailbox.ID == 0 {
mailbox = inbox
}
if mailbox.ID == 0 {
xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
}
req.Query.Filter.MailboxID = mailbox.ID
req.Query.Filter.MailboxName = ""
mailboxPrefixes = []string{mailbox.Name + "/"}
matchMailboxes = true
mailboxIDs[mailbox.ID] = true
} else {
matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
}
if req.Query.Filter.MailboxChildrenIncluded {
xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
}
// todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
sse := sseRegister(acc.Name)
defer sse.unregister()
// Per-domain localpart config so webclient can decide if an address belongs to the account.
domainAddressConfigs := map[string]DomainAddressConfig{}
for _, a := range addresses {
dom, _ := mox.Conf.Domain(a.Domain)
domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
}
// Write first event, allowing client to fill its UI with mailboxes.
start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl}
writer.xsendEvent(ctx, log, "start", start)
// The goroutine doing the querying will send messages on these channels, which
// result in an event being written on the SSE connection.
viewMsgsc := make(chan EventViewMsgs)
viewErrc := make(chan EventViewErr)
viewResetc := make(chan EventViewReset)
donec := make(chan int64) // When request is done.
// Start a view, it determines if we send a change to the client. And start an
// implicit query for messages, we'll send the messages to the client which can
// fill its ui with messages.
v := view{req, store.Message{}, false, matchMailboxes, mailboxIDs}
go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
qtx = nil // viewRequestTx closes qtx
// When canceling a query, we must drain its messages until it says it is done.
// Otherwise the sending goroutine would hang indefinitely on a channel send.
cancelDrain := func() {
if reqctxcancel != nil {
// Cancel the goroutine doing the querying.
reqctxcancel()
reqctx = nil
reqctxcancel = nil
} else {
return
}
// Drain events until done.
for {
select {
case <-viewMsgsc:
case <-viewErrc:
case <-viewResetc:
case <-donec:
return
}
}
}
// If we stop and a query is in progress, we must drain the channel it will send on.
defer cancelDrain()
// Changes broadcasted by other connections on this account. If applicable for the
// connection/view, we send events.
xprocessChanges := func(changes []store.Change) {
taggedChanges := [][2]any{}
// We get a transaction first time we need it.
var xtx *bstore.Tx
defer func() {
if xtx != nil {
err := xtx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
ensureTx := func() error {
if xtx != nil {
return nil
}
acc.RLock()
defer acc.RUnlock()
var err error
xtx, err = acc.DB.Begin(ctx, false)
return err
}
// This getmsg will now only be called mailboxID+UID, not with messageID set.
// todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
if err := ensureTx(); err != nil {
return store.Message{}, fmt.Errorf("transaction: %v", err)
}
return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
}
// Return uids that are within range in view. Because the end has been reached, or
// because the UID is not after the last message.
xchangedUIDs := func(mailboxID int64, uids []store.UID) (changedUIDs []store.UID) {
uidsAny := make([]any, len(uids))
for i, uid := range uids {
uidsAny[i] = uid
}
err := ensureTx()
xcheckf(ctx, err, "transaction")
q := bstore.QueryTx[store.Message](xtx)
q.FilterNonzero(store.Message{MailboxID: mailboxID})
q.FilterEqual("UID", uidsAny...)
err = q.ForEach(func(m store.Message) error {
if v.inRange(m) {
changedUIDs = append(changedUIDs, m.UID)
}
return nil
})
xcheckf(ctx, err, "fetching messages for change")
return changedUIDs
}
// Forward changes that are relevant to the current view.
for _, change := range changes {
switch c := change.(type) {
case store.ChangeAddUID:
if ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg); err != nil {
xcheckf(ctx, err, "matching new message against view")
} else if !ok {
continue
}
m, err := getmsg(0, c.MailboxID, c.UID)
xcheckf(ctx, err, "get message")
state := msgState{acc: acc}
mi, err := messageItem(log, m, &state)
state.clear()
xcheckf(ctx, err, "make messageitem")
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mi}})
// If message extends the view, store it as such.
if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessage.Received) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessage.Received) {
v.LastMessage = m
}
case store.ChangeRemoveUIDs:
// We do a quick filter over changes, not sending UID updates for unselected
// mailboxes or when the message is outside the range of the view. But we still may
// send messages that don't apply to the filter. If client doesn't recognize the
// messages, that's fine.
if !v.matchesMailbox(c.MailboxID) {
continue
}
changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs)
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgRemove{c}
ch.UIDs = changedUIDs
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
case store.ChangeFlags:
// As with ChangeRemoveUIDs above, we send more changes than strictly needed.
if !v.matchesMailbox(c.MailboxID) {
continue
}
changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID})
if len(changedUIDs) == 0 {
continue
}
ch := ChangeMsgFlags{c}
ch.UID = changedUIDs[0]
taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
case store.ChangeRemoveMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
case store.ChangeAddMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
case store.ChangeRenameMailbox:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
case store.ChangeMailboxCounts:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
case store.ChangeMailboxSpecialUse:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
case store.ChangeMailboxKeywords:
taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
case store.ChangeAddSubscription:
// Webmail does not care about subscriptions.
default:
panic(fmt.Sprintf("missing case for change %T", c))
}
}
if len(taggedChanges) > 0 {
viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
}
}
timer := time.NewTimer(5 * time.Minute) // For keepalives.
defer timer.Stop()
for {
if writer.wrote {
timer.Reset(5 * time.Minute)
writer.wrote = false
}
pending := comm.Pending
if reqctx != nil {
pending = nil
}
select {
case <-mox.Shutdown.Done():
writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
case <-timer.C:
_, err := fmt.Fprintf(out, ": keepalive\n\n")
if err != nil {
log.Errorx("write keepalive", err)
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
out.Flush()
writer.wrote = true
case vm := <-viewMsgsc:
if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
}
if vm.ViewEnd {
v.End = true
}
if len(vm.MessageItems) > 0 {
v.LastMessage = vm.MessageItems[len(vm.MessageItems)-1].Message
}
writer.xsendEvent(ctx, log, "viewMsgs", vm)
case ve := <-viewErrc:
if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
}
if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
writer.xsendEvent(ctx, log, "viewErr", ve)
case vr := <-viewResetc:
if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
}
writer.xsendEvent(ctx, log, "viewReset", vr)
case id := <-donec:
if id != v.Request.ID {
panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
}
if reqctxcancel != nil {
reqctxcancel()
}
reqctx = nil
reqctxcancel = nil
case req := <-sse.Request:
if reqctx != nil {
cancelDrain()
}
if req.Cancel {
v = view{req, store.Message{}, false, false, nil}
continue
}
reqctx, reqctxcancel = context.WithCancel(ctx)
stop := func() (stop bool) {
// rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
var rtx *bstore.Tx
var err error
defer func() {
if rtx != nil {
err = rtx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
acc.WithRLock(func() {
rtx, err = acc.DB.Begin(reqctx, false)
})
if err != nil {
reqctxcancel()
reqctx = nil
reqctxcancel = nil
if errors.Is(err, context.Canceled) {
return true
}
err := fmt.Errorf("begin transaction: %v", err)
viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
writer.xsendEvent(ctx, log, "viewErr", viewErr)
return false
}
// Reset view state for new query.
if req.ViewID != v.Request.ViewID {
matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
if req.Query.Filter.MailboxChildrenIncluded {
xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
}
v = view{req, store.Message{}, false, matchMailboxes, mailboxIDs}
} else {
v.Request = req
}
go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
rtx = nil
return false
}()
if stop {
return
}
case <-pending:
xprocessChanges(comm.Get())
case <-ctx.Done():
// Work around go vet, it doesn't see defer cancelDrain.
if reqctxcancel != nil {
reqctxcancel()
}
return
}
}
}
// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
// mailboxIDs must or must not match. mailboxPrefixes is for use with
// xgatherMailboxIDs to gather children of the mailboxIDs.
func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
matchMailboxes = true
mailboxIDs = map[int64]bool{}
if f.MailboxID == -1 {
matchMailboxes = false
// Add the trash, junk and account rejects mailbox.
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
mailboxIDs[mb.ID] = true
}
return nil
})
xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
} else if f.MailboxID > 0 {
mb := store.Mailbox{ID: f.MailboxID}
err := tx.Get(&mb)
xcheckf(ctx, err, "get mailbox")
mailboxIDs[f.MailboxID] = true
mailboxPrefixes = []string{mb.Name + "/"}
}
return
}
// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
// Gather more mailboxes to filter on, based on mailboxPrefixes.
if len(mailboxPrefixes) == 0 {
return
}
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
for _, p := range mailboxPrefixes {
if strings.HasPrefix(mb.Name, p) {
mailboxIDs[mb.ID] = true
break
}
}
return nil
})
xcheckf(ctx, err, "gathering mailboxes")
}
// matchesMailbox returns whether a mailbox matches the view.
func (v view) matchesMailbox(mailboxID int64) bool {
return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
}
// inRange returns whether m is within the range for the view, whether a change for
// this message should be sent to the client so it can update its state.
func (v view) inRange(m store.Message) bool {
return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessage.Received) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessage.Received)
}
// matches checks if the message, identified by either messageID or mailboxID+UID,
// is in the current "view" (i.e. passing the filters, and if checkRange is set
// also if within the range of sent messages based on sort order and the last seen
// message). getmsg retrieves the message, which may be necessary depending on the
// active filters. Used to determine if a store.Change with a new message should be
// sent, and for the destination and anchor messages in view requests.
func (v view) matches(log *mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
var m store.Message
ensureMessage := func() bool {
if m.ID == 0 && rerr == nil {
m, rerr = getmsg(messageID, mailboxID, uid)
}
return rerr == nil
}
q := v.Request.Query
// Warning: Filters must be kept in sync between queryMessage and view.matches.
// Check filters.
if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
return false, rerr
}
// note: anchorMessageID is not relevant for matching.
flagfilter := q.flagFilterFn()
if flagfilter != nil && !flagfilter(flags, keywords) {
return false, rerr
}
if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
return false, rerr
}
if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
return false, rerr
}
if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
return false, rerr
}
if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
return false, rerr
}
state := msgState{acc: acc}
defer func() {
if rerr == nil && state.err != nil {
rerr = state.err
}
state.clear()
}()
attachmentFilter := q.attachmentFilterFn(log, acc, &state)
if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
return false, rerr
}
envFilter := q.envFilterFn(log, &state)
if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
return false, rerr
}
headerFilter := q.headerFilterFn(log, &state)
if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
return false, rerr
}
wordsFilter := q.wordsFilterFn(log, &state)
if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
return false, rerr
}
// Now check that we are either within the sorting order, or "last" was sent.
if !checkRange || v.End || ensureMessage() && v.inRange(m) {
return true, rerr
}
return false, rerr
}
type msgResp struct {
err error // If set, an error happened and fields below are not set.
reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
mi MessageItem // If none of the cases above apply, the message that was found matching the query.
pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
}
// viewRequestTx executes a request (query with filters, pagination) by
// launching a new goroutine with queryMessages, receiving results as msgResp,
// and sending Event* to the SSE connection.
//
// It always closes tx.
func viewRequestTx(ctx context.Context, log *mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
defer func() {
err := tx.Rollback()
log.Check(err, "rolling back query transaction")
donec <- v.Request.ID
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
log.WithContext(ctx).Error("viewRequestTx panic", mlog.Field("err", x))
debug.PrintStack()
metrics.PanicInc("webmail-request")
}
}()
var msgitems []MessageItem // Gathering for 300ms, then flushing.
var parsedMessage *ParsedMessage
var viewEnd bool
var immediate bool // No waiting, flush immediate.
t := time.NewTimer(300 * time.Millisecond)
defer t.Stop()
sendViewMsgs := func(force bool) {
if len(msgitems) == 0 && !force {
return
}
immediate = false
msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
msgitems = nil
parsedMessage = nil
t.Reset(300 * time.Millisecond)
}
// todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
mrc := make(chan msgResp, 1)
go queryMessages(ctx, log, acc, tx, v, mrc)
for {
select {
case mr, ok := <-mrc:
if !ok {
sendViewMsgs(false)
// Empty message list signals this query is done.
msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
return
}
if mr.err != nil {
sendViewMsgs(false)
errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
return
}
if mr.reset {
resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
continue
}
if mr.viewEnd {
viewEnd = true
sendViewMsgs(true)
return
}
msgitems = append(msgitems, mr.mi)
if mr.pm != nil {
parsedMessage = mr.pm
}
if immediate {
sendViewMsgs(true)
}
case <-t.C:
if len(msgitems) == 0 {
// Nothing to send yet. We'll send immediately when the next message comes in.
immediate = true
} else {
sendViewMsgs(false)
}
}
}
}
// queryMessages executes a query, with filter, pagination, destination message id
// to fetch (the message that the client had in view and wants to display again).
// It sends on msgc, with several types of messages: errors, whether the view is
// reset due to missing AnchorMessageID, and when the end of the view was reached
// and/or for a message.
func queryMessages(ctx context.Context, log *mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
defer func() {
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
log.WithContext(ctx).Error("queryMessages panic", mlog.Field("err", x))
debug.PrintStack()
mrc <- msgResp{err: fmt.Errorf("query failed")}
metrics.PanicInc("webmail-query")
}
close(mrc)
}()
query := v.Request.Query
page := v.Request.Page
// Warning: Filters must be kept in sync between queryMessage and view.matches.
checkMessage := func(id int64) (valid bool, rerr error) {
m := store.Message{ID: id}
err := tx.Get(&m)
if err == bstore.ErrAbsent || err == nil && m.Expunged {
return false, nil
} else if err != nil {
return false, err
} else {
return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
return m, nil
})
}
}
// Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
if page.AnchorMessageID > 0 {
// Check if message exists and (still) matches the filter.
// todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
if valid, err := checkMessage(page.AnchorMessageID); err != nil {
mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
return
} else if !valid {
mrc <- msgResp{reset: true}
page.AnchorMessageID = 0
}
}
// Check if page.DestMessageID exists and matches filter. If not, we will ignore
// it instead of continuing to send message till the end of the view.
if page.DestMessageID > 0 {
if valid, err := checkMessage(page.DestMessageID); err != nil {
mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
return
} else if !valid {
page.DestMessageID = 0
}
}
// todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
if len(v.mailboxIDs) > 0 {
if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
// Should result in fast indexed query.
for mbID := range v.mailboxIDs {
q.FilterNonzero(store.Message{MailboxID: mbID})
}
} else {
idsAny := make([]any, 0, len(v.mailboxIDs))
for mbID := range v.mailboxIDs {
idsAny = append(idsAny, mbID)
}
if v.matchMailboxIDs {
q.FilterEqual("MailboxID", idsAny...)
} else {
q.FilterNotEqual("MailboxID", idsAny...)
}
}
}
// If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
if page.AnchorMessageID > 0 {
var seen = false
q.FilterFn(func(m store.Message) bool {
if seen {
return true
}
seen = m.ID == page.AnchorMessageID
return false
})
}
// We may be added filters the the query below. The FilterFn signature does not
// implement reporting errors, or anything else, just a bool. So when making the
// filter functions, we give them a place to store parsed message state, and an
// error. We check the error during and after query execution.
state := msgState{acc: acc}
defer state.clear()
flagfilter := query.flagFilterFn()
if flagfilter != nil {
q.FilterFn(func(m store.Message) bool {
return flagfilter(m.Flags, m.Keywords)
})
}
if query.Filter.Oldest != nil {
q.FilterGreaterEqual("Received", *query.Filter.Oldest)
}
if query.Filter.Newest != nil {
q.FilterLessEqual("Received", *query.Filter.Newest)
}
if query.Filter.SizeMin > 0 {
q.FilterGreaterEqual("Size", query.Filter.SizeMin)
}
if query.Filter.SizeMax > 0 {
q.FilterLessEqual("Size", query.Filter.SizeMax)
}
attachmentFilter := query.attachmentFilterFn(log, acc, &state)
if attachmentFilter != nil {
q.FilterFn(attachmentFilter)
}
envFilter := query.envFilterFn(log, &state)
if envFilter != nil {
q.FilterFn(envFilter)
}
headerFilter := query.headerFilterFn(log, &state)
if headerFilter != nil {
q.FilterFn(headerFilter)
}
wordsFilter := query.wordsFilterFn(log, &state)
if wordsFilter != nil {
q.FilterFn(wordsFilter)
}
if query.OrderAsc {
q.SortAsc("Received")
} else {
q.SortDesc("Received")
}
found := page.DestMessageID <= 0
end := true
have := 0
err := q.ForEach(func(m store.Message) error {
// Check for an error in one of the filters, propagate it.
if state.err != nil {
return state.err
}
if have >= page.Count && found || have > 10000 {
end = false
return bstore.StopForEach
}
var pm *ParsedMessage
if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
found = true
xpm, err := parsedMessage(log, m, &state, true, false)
if err != nil {
return fmt.Errorf("parsing message %d: %v", m.ID, err)
}
pm = &xpm
}
mi, err := messageItem(log, m, &state)
if err != nil {
return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
}
mrc <- msgResp{mi: mi, pm: pm}
have++
return nil
})
// Check for an error in one of the filters again. Check in ForEach would not
// trigger if the last message has the error.
if err == nil && state.err != nil {
err = state.err
}
if err != nil {
mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
return
}
if end {
mrc <- msgResp{viewEnd: true}
}
}
// While checking the filters on a message, we may need to get more message
// details as each filter passes. We check the filters that need the basic
// information first, and load and cache more details for the next filters.
// msgState holds parsed details for a message, it is updated while filtering,
// with more information or reset for a next message.
type msgState struct {
acc *store.Account // Never changes during lifetime.
err error // Once set, doesn't get cleared.
m store.Message
part *message.Part // Will be without Reader when msgr is nil.
msgr *store.MsgReader
}
func (ms *msgState) clear() {
if ms.msgr != nil {
ms.msgr.Close()
ms.msgr = nil
}
*ms = msgState{acc: ms.acc, err: ms.err}
}
func (ms *msgState) ensureMsg(m store.Message) {
if m.ID != ms.m.ID {
ms.clear()
}
ms.m = m
}
func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
ms.ensureMsg(m)
if ms.err == nil {
if ms.part == nil {
if m.ParsedBuf == nil {
ms.err = fmt.Errorf("message %d not parsed", m.ID)
return false
}
var p message.Part
if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
return false
}
ms.part = &p
}
if withMsgReader && ms.msgr == nil {
ms.msgr = ms.acc.MessageReader(m)
ms.part.SetReaderAt(ms.msgr)
}
}
return ms.part != nil
}
// flagFilterFn returns a function that applies the flag/keyword/"label"-related
// filters for a query. A nil function is returned if there are no flags to filter
// on.
func (q Query) flagFilterFn() func(store.Flags, []string) bool {
labels := map[string]bool{}
for _, k := range q.Filter.Labels {
labels[k] = true
}
for _, k := range q.NotFilter.Labels {
labels[k] = false
}
if len(labels) == 0 {
return nil
}
var mask, flags store.Flags
systemflags := map[string][]*bool{
`\answered`: {&mask.Answered, &flags.Answered},
`\flagged`: {&mask.Flagged, &flags.Flagged},
`\deleted`: {&mask.Deleted, &flags.Deleted},
`\seen`: {&mask.Seen, &flags.Seen},
`\draft`: {&mask.Draft, &flags.Draft},
`$junk`: {&mask.Junk, &flags.Junk},
`$notjunk`: {&mask.Notjunk, &flags.Notjunk},
`$forwarded`: {&mask.Forwarded, &flags.Forwarded},
`$phishing`: {&mask.Phishing, &flags.Phishing},
`$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
}
keywords := map[string]bool{}
for k, v := range labels {
k = strings.ToLower(k)
if mf, ok := systemflags[k]; ok {
*mf[0] = true
*mf[1] = v
} else {
keywords[k] = v
}
}
return func(msgFlags store.Flags, msgKeywords []string) bool {
var f store.Flags
if f.Set(mask, msgFlags) != flags {
return false
}
for k, v := range keywords {
if slices.Contains(msgKeywords, k) != v {
return false
}
}
return true
}
}
// attachmentFilterFn returns a function that filters for the attachment-related
// filter from the query. A nil function is returned if there are attachment
// filters.
func (q Query) attachmentFilterFn(log *mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
return nil
}
return func(m store.Message) bool {
if !state.ensurePart(m, false) {
return false
}
types, err := attachmentTypes(log, m, state)
if err != nil {
state.err = err
return false
}
return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
}
}
var attachmentMimetypes = map[string]AttachmentType{
"application/pdf": AttachmentPDF,
"application/zip": AttachmentArchive,
"application/x-rar-compressed": AttachmentArchive,
"application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
"application/vnd.ms-excel": AttachmentSpreadsheet,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
"application/vnd.oasis.opendocument.text": AttachmentDocument,
"application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
"application/vnd.ms-powerpoint": AttachmentPresentation,
"application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
}
var attachmentExtensions = map[string]AttachmentType{
".pdf": AttachmentPDF,
".zip": AttachmentArchive,
".tar": AttachmentArchive,
".tgz": AttachmentArchive,
".tar.gz": AttachmentArchive,
".tbz2": AttachmentArchive,
".tar.bz2": AttachmentArchive,
".tar.lz": AttachmentArchive,
".tlz": AttachmentArchive,
".tar.xz": AttachmentArchive,
".txz": AttachmentArchive,
".tar.zst": AttachmentArchive,
".tar.lz4": AttachmentArchive,
".7z": AttachmentArchive,
".rar": AttachmentArchive,
".ods": AttachmentSpreadsheet,
".xls": AttachmentSpreadsheet,
".xlsx": AttachmentSpreadsheet,
".odt": AttachmentDocument,
".doc": AttachmentDocument,
".docx": AttachmentDocument,
".odp": AttachmentPresentation,
".ppt": AttachmentPresentation,
".pptx": AttachmentPresentation,
}
func attachmentTypes(log *mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
types := map[AttachmentType]bool{}
pm, err := parsedMessage(log, m, state, false, false)
if err != nil {
return nil, fmt.Errorf("parsing message for attachments: %w", err)
}
for _, a := range pm.attachments {
if a.Part.MediaType == "IMAGE" {
types[AttachmentImage] = true
continue
}
mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
if t, ok := attachmentMimetypes[mt]; ok {
types[t] = true
} else if ext := filepath.Ext(a.Part.ContentTypeParams["name"]); ext != "" {
if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
types[t] = true
} else {
continue
}
}
}
if len(types) == 0 {
types[AttachmentNone] = true
} else {
types[AttachmentAny] = true
}
return types, nil
}
// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
// clash with SMTP envelope) for the query. A nil function is returned if no
// filtering is needed.
func (q Query) envFilterFn(log *mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
return nil
}
lower := func(l []string) []string {
if len(l) == 0 {
return nil
}
r := make([]string, len(l))
for i, s := range l {
r[i] = strings.ToLower(s)
}
return r
}
filterSubject := lower(q.Filter.Subject)
notFilterSubject := lower(q.NotFilter.Subject)
filterFrom := lower(q.Filter.From)
notFilterFrom := lower(q.NotFilter.From)
filterTo := lower(q.Filter.To)
notFilterTo := lower(q.NotFilter.To)
return func(m store.Message) bool {
if !state.ensurePart(m, false) {
return false
}
var env message.Envelope
if state.part.Envelope != nil {
env = *state.part.Envelope
}
if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
subject := strings.ToLower(env.Subject)
for _, s := range filterSubject {
if !strings.Contains(subject, s) {
return false
}
}
for _, s := range notFilterSubject {
if strings.Contains(subject, s) {
return false
}
}
}
contains := func(textLower []string, l []message.Address, all bool) bool {
next:
for _, s := range textLower {
for _, a := range l {
name := strings.ToLower(a.Name)
addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
if strings.Contains(name, s) || strings.Contains(addr, s) {
if !all {
return true
}
continue next
}
}
if all {
return false
}
}
return all
}
if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
return false
}
if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
return false
}
if len(filterTo) > 0 || len(notFilterTo) > 0 {
to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
if len(filterTo) > 0 && !contains(filterTo, to, true) {
return false
}
if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
return false
}
}
return true
}
}
// headerFilterFn returns a function that filters for the header filters in the
// query. A nil function is returned if there are no header filters.
func (q Query) headerFilterFn(log *mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.Headers) == 0 {
return nil
}
lowerValues := make([]string, len(q.Filter.Headers))
for i, t := range q.Filter.Headers {
lowerValues[i] = strings.ToLower(t[1])
}
return func(m store.Message) bool {
if !state.ensurePart(m, true) {
return false
}
hdr, err := state.part.Header()
if err != nil {
state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
return false
}
next:
for i, t := range q.Filter.Headers {
k := t[0]
v := lowerValues[i]
l := hdr.Values(k)
if v == "" && len(l) > 0 {
continue
}
for _, e := range l {
if strings.Contains(strings.ToLower(e), v) {
continue next
}
}
return false
}
return true
}
}
// wordFiltersFn returns a function that applies the word filters of the query. A
// nil function is returned when query does not contain a word filter.
func (q Query) wordsFilterFn(log *mlog.Log, state *msgState) func(m store.Message) bool {
if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
return nil
}
ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
return func(m store.Message) bool {
if !state.ensurePart(m, true) {
return false
}
if ok, err := ws.MatchPart(log, state.part, true); err != nil {
state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
return false
} else {
return ok
}
}
}