fix delay with propagating mailbox changes to other imap (idle) connections

when broadcasting a change, we would try to send the changes on a channel,
non-blocking. if we couldn't send (because there was no pending blocked
receive), we would wait until the potential receiver would explicitly request
the changes. however, the imap idle handler would not explicitly request the
changes, but do a receive on the changes channel. since there was no pending
blocked send on the channel, that receive would block. only when another event
would come in, would both the pending and the new changes be sent.

we now use a channel only for signaling there are pending changes. the channel
is buffered, so when broadcasting we can just set the signal by a non-blocking
send and continue with the next listener. the receiver will get the buffered
signal. it can then get the changes directly, but lock-protected.

found when looking at a missing/delayed new message notification in thunderbird
when two messages arrive immediately after each other. this doesn't fix that
problem though: it seems thunderbird just ignores imap untagged "exists"
messages (indicating a new message arrived) during the "uid fetch" command that
it issued after notifications from an "idle" command.
This commit is contained in:
Mechiel Lukkien 2023-07-23 15:28:37 +02:00
parent 3e9b4107fd
commit e943e0c65d
No known key found for this signature in database
5 changed files with 57 additions and 42 deletions

View file

@ -802,9 +802,7 @@ func importMessages(ctx context.Context, log *mlog.Log, token string, acc *store
jf = nil jf = nil
} }
comm := store.RegisterComm(acc) store.BroadcastChanges(acc, changes)
defer comm.Unregister()
comm.Broadcast(changes)
acc.Unlock() acc.Unlock()
err = acc.Close() err = acc.Close()
log.Check(err, "closing account after import") log.Check(err, "closing account after import")

View file

@ -2660,8 +2660,8 @@ wait:
xcheckf(le.err, "get line") xcheckf(le.err, "get line")
line = le.line line = le.line
break wait break wait
case changes := <-c.comm.Changes: case <-c.comm.Pending:
c.applyChanges(changes, false) c.applyChanges(c.comm.Get(), false)
c.xflush() c.xflush()
case <-mox.Shutdown.Done(): case <-mox.Shutdown.Done():
// ../rfc/9051:5375 // ../rfc/9051:5375

View file

@ -390,9 +390,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
ctl.log.Info("delivered messages through import", mlog.Field("count", len(deliveredIDs))) ctl.log.Info("delivered messages through import", mlog.Field("count", len(deliveredIDs)))
deliveredIDs = nil deliveredIDs = nil
comm := store.RegisterComm(a) store.BroadcastChanges(a, changes)
defer comm.Unregister()
comm.Broadcast(changes)
}) })
err = a.Close() err = a.Close()

View file

@ -1039,9 +1039,7 @@ func (a *Account) DeliverMailbox(log *mlog.Log, mailbox string, m *Message, msgF
} }
changes = append(changes, ChangeAddUID{m.MailboxID, m.UID, m.Flags, m.Keywords}) changes = append(changes, ChangeAddUID{m.MailboxID, m.UID, m.Flags, m.Keywords})
comm := RegisterComm(a) BroadcastChanges(a, changes)
defer comm.Unregister()
comm.Broadcast(changes)
return nil return nil
} }
@ -1104,9 +1102,7 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS
return false, err return false, err
} }
comm := RegisterComm(a) BroadcastChanges(a, changes)
defer comm.Unregister()
comm.Broadcast(changes)
return hasSpace, nil return hasSpace, nil
} }
@ -1198,9 +1194,7 @@ func (a *Account) RejectsRemove(log *mlog.Log, rejectsMailbox, messageID string)
return err return err
} }
comm := RegisterComm(a) BroadcastChanges(a, changes)
defer comm.Unregister()
comm.Broadcast(changes)
return nil return nil
} }

View file

@ -1,6 +1,7 @@
package store package store
import ( import (
"sync"
"sync/atomic" "sync/atomic"
) )
@ -8,12 +9,13 @@ var (
register = make(chan *Comm) register = make(chan *Comm)
unregister = make(chan *Comm) unregister = make(chan *Comm)
broadcast = make(chan changeReq) broadcast = make(chan changeReq)
get = make(chan *Comm)
) )
type changeReq struct { type changeReq struct {
comm *Comm acc *Account
comm *Comm // Can be nil.
changes []Change changes []Change
done chan struct{}
} }
type UID uint32 // IMAP UID. type UID uint32 // IMAP UID.
@ -72,7 +74,7 @@ var switchboardBusy atomic.Bool
// Switchboard distributes changes to accounts to interested listeners. See Comm and Change. // Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
func Switchboard() chan struct{} { func Switchboard() chan struct{} {
regs := map[*Account]map[*Comm][]Change{} regs := map[*Account]map[*Comm]struct{}{}
done := make(chan struct{}) done := make(chan struct{})
if !switchboardBusy.CompareAndSwap(false, true) { if !switchboardBusy.CompareAndSwap(false, true) {
@ -84,32 +86,36 @@ func Switchboard() chan struct{} {
select { select {
case c := <-register: case c := <-register:
if _, ok := regs[c.acc]; !ok { if _, ok := regs[c.acc]; !ok {
regs[c.acc] = map[*Comm][]Change{} regs[c.acc] = map[*Comm]struct{}{}
} }
regs[c.acc][c] = nil regs[c.acc][c] = struct{}{}
case c := <-unregister: case c := <-unregister:
delete(regs[c.acc], c) delete(regs[c.acc], c)
if len(regs[c.acc]) == 0 { if len(regs[c.acc]) == 0 {
delete(regs, c.acc) delete(regs, c.acc)
} }
case chReq := <-broadcast: case chReq := <-broadcast:
acc := chReq.comm.acc acc := chReq.acc
for c, changes := range regs[acc] { for c := range regs[acc] {
// Do not send the broadcaster back their own changes. // Do not send the broadcaster back their own changes. chReq.comm is nil if not
// originating from a comm, so won't match in that case.
if c == chReq.comm { if c == chReq.comm {
continue continue
} }
regs[acc][c] = append(changes, chReq.changes...)
c.Lock()
c.changes = append(c.changes, chReq.changes...)
c.Unlock()
select { select {
case c.Changes <- regs[acc][c]: case c.Pending <- struct{}{}:
regs[acc][c] = nil
default: default:
} }
} }
chReq.comm.r <- struct{}{} chReq.done <- struct{}{}
case c := <-get:
c.Changes <- regs[c.acc][c]
regs[c.acc][c] = nil
case <-done: case <-done:
if !switchboardBusy.CompareAndSwap(true, false) { if !switchboardBusy.CompareAndSwap(true, false) {
panic("switchboard already unregistered?") panic("switchboard already unregistered?")
@ -124,14 +130,20 @@ func Switchboard() chan struct{} {
// Comm handles communication with the goroutine that maintains the // Comm handles communication with the goroutine that maintains the
// account/mailbox/message state. // account/mailbox/message state.
type Comm struct { type Comm struct {
Changes chan []Change // Receives block until changes come in, e.g. for IMAP IDLE. Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
acc *Account
r chan struct{} acc *Account
sync.Mutex
changes []Change
} }
// Register starts a Comm for the account. Unregister must be called. // Register starts a Comm for the account. Unregister must be called.
func RegisterComm(acc *Account) *Comm { func RegisterComm(acc *Account) *Comm {
c := &Comm{make(chan []Change), acc, make(chan struct{})} c := &Comm{
Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
acc: acc,
}
register <- c register <- c
return c return c
} }
@ -146,14 +158,27 @@ func (c *Comm) Broadcast(ch []Change) {
if len(ch) == 0 { if len(ch) == 0 {
return return
} }
broadcast <- changeReq{c, ch} done := make(chan struct{}, 1)
<-c.r broadcast <- changeReq{c.acc, c, ch, done}
<-done
} }
// Get retrieves pending changes. If no changes are pending a nil or empty list // Get retrieves all pending changes. If no changes are pending a nil or empty list
// is returned. // is returned.
func (c *Comm) Get() []Change { func (c *Comm) Get() []Change {
get <- c c.Lock()
changes := <-c.Changes defer c.Unlock()
return changes l := c.changes
c.changes = nil
return l
}
// BroadcastChanges ensures changes are sent to all listeners on the accoount.
func BroadcastChanges(acc *Account, ch []Change) {
if len(ch) == 0 {
return
}
done := make(chan struct{}, 1)
broadcast <- changeReq{acc, nil, ch, done}
<-done
} }