From e943e0c65d5663b204e9fb00dcb7120b2098abc8 Mon Sep 17 00:00:00 2001 From: Mechiel Lukkien Date: Sun, 23 Jul 2023 15:28:37 +0200 Subject: [PATCH] fix delay with propagating mailbox changes to other imap (idle) connections when broadcasting a change, we would try to send the changes on a channel, non-blocking. if we couldn't send (because there was no pending blocked receive), we would wait until the potential receiver would explicitly request the changes. however, the imap idle handler would not explicitly request the changes, but do a receive on the changes channel. since there was no pending blocked send on the channel, that receive would block. only when another event would come in, would both the pending and the new changes be sent. we now use a channel only for signaling there are pending changes. the channel is buffered, so when broadcasting we can just set the signal by a non-blocking send and continue with the next listener. the receiver will get the buffered signal. it can then get the changes directly, but lock-protected. found when looking at a missing/delayed new message notification in thunderbird when two messages arrive immediately after each other. this doesn't fix that problem though: it seems thunderbird just ignores imap untagged "exists" messages (indicating a new message arrived) during the "uid fetch" command that it issued after notifications from an "idle" command. --- http/import.go | 4 +-- imapserver/server.go | 4 +-- import.go | 4 +-- store/account.go | 12 ++----- store/state.go | 75 +++++++++++++++++++++++++++++--------------- 5 files changed, 57 insertions(+), 42 deletions(-) diff --git a/http/import.go b/http/import.go index 55a0369..1a91b33 100644 --- a/http/import.go +++ b/http/import.go @@ -802,9 +802,7 @@ func importMessages(ctx context.Context, log *mlog.Log, token string, acc *store jf = nil } - comm := store.RegisterComm(acc) - defer comm.Unregister() - comm.Broadcast(changes) + store.BroadcastChanges(acc, changes) acc.Unlock() err = acc.Close() log.Check(err, "closing account after import") diff --git a/imapserver/server.go b/imapserver/server.go index 312fec3..cd8284d 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -2660,8 +2660,8 @@ wait: xcheckf(le.err, "get line") line = le.line break wait - case changes := <-c.comm.Changes: - c.applyChanges(changes, false) + case <-c.comm.Pending: + c.applyChanges(c.comm.Get(), false) c.xflush() case <-mox.Shutdown.Done(): // ../rfc/9051:5375 diff --git a/import.go b/import.go index a6e900f..342b9d3 100644 --- a/import.go +++ b/import.go @@ -390,9 +390,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { ctl.log.Info("delivered messages through import", mlog.Field("count", len(deliveredIDs))) deliveredIDs = nil - comm := store.RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + store.BroadcastChanges(a, changes) }) err = a.Close() diff --git a/store/account.go b/store/account.go index 3012a96..e839c45 100644 --- a/store/account.go +++ b/store/account.go @@ -1039,9 +1039,7 @@ func (a *Account) DeliverMailbox(log *mlog.Log, mailbox string, m *Message, msgF } changes = append(changes, ChangeAddUID{m.MailboxID, m.UID, m.Flags, m.Keywords}) - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return nil } @@ -1104,9 +1102,7 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS return false, err } - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return hasSpace, nil } @@ -1198,9 +1194,7 @@ func (a *Account) RejectsRemove(log *mlog.Log, rejectsMailbox, messageID string) return err } - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return nil } diff --git a/store/state.go b/store/state.go index 8a30c7b..6d0d4aa 100644 --- a/store/state.go +++ b/store/state.go @@ -1,6 +1,7 @@ package store import ( + "sync" "sync/atomic" ) @@ -8,12 +9,13 @@ var ( register = make(chan *Comm) unregister = make(chan *Comm) broadcast = make(chan changeReq) - get = make(chan *Comm) ) type changeReq struct { - comm *Comm + acc *Account + comm *Comm // Can be nil. changes []Change + done chan struct{} } type UID uint32 // IMAP UID. @@ -72,7 +74,7 @@ var switchboardBusy atomic.Bool // Switchboard distributes changes to accounts to interested listeners. See Comm and Change. func Switchboard() chan struct{} { - regs := map[*Account]map[*Comm][]Change{} + regs := map[*Account]map[*Comm]struct{}{} done := make(chan struct{}) if !switchboardBusy.CompareAndSwap(false, true) { @@ -84,32 +86,36 @@ func Switchboard() chan struct{} { select { case c := <-register: if _, ok := regs[c.acc]; !ok { - regs[c.acc] = map[*Comm][]Change{} + regs[c.acc] = map[*Comm]struct{}{} } - regs[c.acc][c] = nil + regs[c.acc][c] = struct{}{} + case c := <-unregister: delete(regs[c.acc], c) if len(regs[c.acc]) == 0 { delete(regs, c.acc) } + case chReq := <-broadcast: - acc := chReq.comm.acc - for c, changes := range regs[acc] { - // Do not send the broadcaster back their own changes. + acc := chReq.acc + for c := range regs[acc] { + // Do not send the broadcaster back their own changes. chReq.comm is nil if not + // originating from a comm, so won't match in that case. if c == chReq.comm { continue } - regs[acc][c] = append(changes, chReq.changes...) + + c.Lock() + c.changes = append(c.changes, chReq.changes...) + c.Unlock() + select { - case c.Changes <- regs[acc][c]: - regs[acc][c] = nil + case c.Pending <- struct{}{}: default: } } - chReq.comm.r <- struct{}{} - case c := <-get: - c.Changes <- regs[c.acc][c] - regs[c.acc][c] = nil + chReq.done <- struct{}{} + case <-done: if !switchboardBusy.CompareAndSwap(true, false) { panic("switchboard already unregistered?") @@ -124,14 +130,20 @@ func Switchboard() chan struct{} { // Comm handles communication with the goroutine that maintains the // account/mailbox/message state. type Comm struct { - Changes chan []Change // Receives block until changes come in, e.g. for IMAP IDLE. - acc *Account - r chan struct{} + Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE. + + acc *Account + + sync.Mutex + changes []Change } // Register starts a Comm for the account. Unregister must be called. func RegisterComm(acc *Account) *Comm { - c := &Comm{make(chan []Change), acc, make(chan struct{})} + c := &Comm{ + Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send. + acc: acc, + } register <- c return c } @@ -146,14 +158,27 @@ func (c *Comm) Broadcast(ch []Change) { if len(ch) == 0 { return } - broadcast <- changeReq{c, ch} - <-c.r + done := make(chan struct{}, 1) + broadcast <- changeReq{c.acc, c, ch, done} + <-done } -// Get retrieves pending changes. If no changes are pending a nil or empty list +// Get retrieves all pending changes. If no changes are pending a nil or empty list // is returned. func (c *Comm) Get() []Change { - get <- c - changes := <-c.Changes - return changes + c.Lock() + defer c.Unlock() + l := c.changes + c.changes = nil + return l +} + +// BroadcastChanges ensures changes are sent to all listeners on the accoount. +func BroadcastChanges(acc *Account, ch []Change) { + if len(ch) == 0 { + return + } + done := make(chan struct{}, 1) + broadcast <- changeReq{acc, nil, ch, done} + <-done }