diff --git a/http/import.go b/http/import.go index 55a0369..1a91b33 100644 --- a/http/import.go +++ b/http/import.go @@ -802,9 +802,7 @@ func importMessages(ctx context.Context, log *mlog.Log, token string, acc *store jf = nil } - comm := store.RegisterComm(acc) - defer comm.Unregister() - comm.Broadcast(changes) + store.BroadcastChanges(acc, changes) acc.Unlock() err = acc.Close() log.Check(err, "closing account after import") diff --git a/imapserver/server.go b/imapserver/server.go index 312fec3..cd8284d 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -2660,8 +2660,8 @@ wait: xcheckf(le.err, "get line") line = le.line break wait - case changes := <-c.comm.Changes: - c.applyChanges(changes, false) + case <-c.comm.Pending: + c.applyChanges(c.comm.Get(), false) c.xflush() case <-mox.Shutdown.Done(): // ../rfc/9051:5375 diff --git a/import.go b/import.go index a6e900f..342b9d3 100644 --- a/import.go +++ b/import.go @@ -390,9 +390,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { ctl.log.Info("delivered messages through import", mlog.Field("count", len(deliveredIDs))) deliveredIDs = nil - comm := store.RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + store.BroadcastChanges(a, changes) }) err = a.Close() diff --git a/store/account.go b/store/account.go index 3012a96..e839c45 100644 --- a/store/account.go +++ b/store/account.go @@ -1039,9 +1039,7 @@ func (a *Account) DeliverMailbox(log *mlog.Log, mailbox string, m *Message, msgF } changes = append(changes, ChangeAddUID{m.MailboxID, m.UID, m.Flags, m.Keywords}) - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return nil } @@ -1104,9 +1102,7 @@ func (a *Account) TidyRejectsMailbox(log *mlog.Log, rejectsMailbox string) (hasS return false, err } - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return hasSpace, nil } @@ -1198,9 +1194,7 @@ func (a *Account) RejectsRemove(log *mlog.Log, rejectsMailbox, messageID string) return err } - comm := RegisterComm(a) - defer comm.Unregister() - comm.Broadcast(changes) + BroadcastChanges(a, changes) return nil } diff --git a/store/state.go b/store/state.go index 8a30c7b..6d0d4aa 100644 --- a/store/state.go +++ b/store/state.go @@ -1,6 +1,7 @@ package store import ( + "sync" "sync/atomic" ) @@ -8,12 +9,13 @@ var ( register = make(chan *Comm) unregister = make(chan *Comm) broadcast = make(chan changeReq) - get = make(chan *Comm) ) type changeReq struct { - comm *Comm + acc *Account + comm *Comm // Can be nil. changes []Change + done chan struct{} } type UID uint32 // IMAP UID. @@ -72,7 +74,7 @@ var switchboardBusy atomic.Bool // Switchboard distributes changes to accounts to interested listeners. See Comm and Change. func Switchboard() chan struct{} { - regs := map[*Account]map[*Comm][]Change{} + regs := map[*Account]map[*Comm]struct{}{} done := make(chan struct{}) if !switchboardBusy.CompareAndSwap(false, true) { @@ -84,32 +86,36 @@ func Switchboard() chan struct{} { select { case c := <-register: if _, ok := regs[c.acc]; !ok { - regs[c.acc] = map[*Comm][]Change{} + regs[c.acc] = map[*Comm]struct{}{} } - regs[c.acc][c] = nil + regs[c.acc][c] = struct{}{} + case c := <-unregister: delete(regs[c.acc], c) if len(regs[c.acc]) == 0 { delete(regs, c.acc) } + case chReq := <-broadcast: - acc := chReq.comm.acc - for c, changes := range regs[acc] { - // Do not send the broadcaster back their own changes. + acc := chReq.acc + for c := range regs[acc] { + // Do not send the broadcaster back their own changes. chReq.comm is nil if not + // originating from a comm, so won't match in that case. if c == chReq.comm { continue } - regs[acc][c] = append(changes, chReq.changes...) + + c.Lock() + c.changes = append(c.changes, chReq.changes...) + c.Unlock() + select { - case c.Changes <- regs[acc][c]: - regs[acc][c] = nil + case c.Pending <- struct{}{}: default: } } - chReq.comm.r <- struct{}{} - case c := <-get: - c.Changes <- regs[c.acc][c] - regs[c.acc][c] = nil + chReq.done <- struct{}{} + case <-done: if !switchboardBusy.CompareAndSwap(true, false) { panic("switchboard already unregistered?") @@ -124,14 +130,20 @@ func Switchboard() chan struct{} { // Comm handles communication with the goroutine that maintains the // account/mailbox/message state. type Comm struct { - Changes chan []Change // Receives block until changes come in, e.g. for IMAP IDLE. - acc *Account - r chan struct{} + Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE. + + acc *Account + + sync.Mutex + changes []Change } // Register starts a Comm for the account. Unregister must be called. func RegisterComm(acc *Account) *Comm { - c := &Comm{make(chan []Change), acc, make(chan struct{})} + c := &Comm{ + Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send. + acc: acc, + } register <- c return c } @@ -146,14 +158,27 @@ func (c *Comm) Broadcast(ch []Change) { if len(ch) == 0 { return } - broadcast <- changeReq{c, ch} - <-c.r + done := make(chan struct{}, 1) + broadcast <- changeReq{c.acc, c, ch, done} + <-done } -// Get retrieves pending changes. If no changes are pending a nil or empty list +// Get retrieves all pending changes. If no changes are pending a nil or empty list // is returned. func (c *Comm) Get() []Change { - get <- c - changes := <-c.Changes - return changes + c.Lock() + defer c.Unlock() + l := c.changes + c.changes = nil + return l +} + +// BroadcastChanges ensures changes are sent to all listeners on the accoount. +func BroadcastChanges(acc *Account, ch []Change) { + if len(ch) == 0 { + return + } + done := make(chan struct{}, 1) + broadcast <- changeReq{acc, nil, ch, done} + <-done }