mox/store/state.go

188 lines
4 KiB
Go
Raw Normal View History

2023-01-30 16:27:06 +03:00
package store
import (
"sync"
2023-01-30 16:27:06 +03:00
"sync/atomic"
)
var (
register = make(chan *Comm)
unregister = make(chan *Comm)
broadcast = make(chan changeReq)
)
type changeReq struct {
acc *Account
comm *Comm // Can be nil.
2023-01-30 16:27:06 +03:00
changes []Change
done chan struct{}
2023-01-30 16:27:06 +03:00
}
type UID uint32 // IMAP UID.
// Change to mailboxes/subscriptions/messages in an account. One of the Change*
// types in this package.
type Change any
// ChangeAddUID is sent for a new message in a mailbox.
type ChangeAddUID struct {
MailboxID int64
UID UID
ModSeq ModSeq
Flags Flags // System flags.
Keywords []string // Other flags.
2023-01-30 16:27:06 +03:00
}
// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
type ChangeRemoveUIDs struct {
MailboxID int64
UIDs []UID
ModSeq ModSeq
2023-01-30 16:27:06 +03:00
}
// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
type ChangeFlags struct {
MailboxID int64
UID UID
ModSeq ModSeq
Mask Flags // Which flags are actually modified.
Flags Flags // New flag values. All are set, not just mask.
Keywords []string // Other flags.
2023-01-30 16:27:06 +03:00
}
// ChangeRemoveMailbox is sent for a removed mailbox.
type ChangeRemoveMailbox struct {
Name string
}
// ChangeAddMailbox is sent for a newly created mailbox.
type ChangeAddMailbox struct {
Name string
Flags []string
}
// ChangeRenameMailbox is sent for a rename mailbox.
type ChangeRenameMailbox struct {
OldName string
NewName string
Flags []string
}
// ChangeAddSubscription is sent for an added subscription to a mailbox.
type ChangeAddSubscription struct {
Name string
}
var switchboardBusy atomic.Bool
// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
func Switchboard() chan struct{} {
regs := map[*Account]map[*Comm]struct{}{}
2023-01-30 16:27:06 +03:00
done := make(chan struct{})
if !switchboardBusy.CompareAndSwap(false, true) {
panic("switchboard already busy")
}
go func() {
for {
select {
case c := <-register:
if _, ok := regs[c.acc]; !ok {
regs[c.acc] = map[*Comm]struct{}{}
2023-01-30 16:27:06 +03:00
}
regs[c.acc][c] = struct{}{}
2023-01-30 16:27:06 +03:00
case c := <-unregister:
delete(regs[c.acc], c)
if len(regs[c.acc]) == 0 {
delete(regs, c.acc)
}
2023-01-30 16:27:06 +03:00
case chReq := <-broadcast:
acc := chReq.acc
for c := range regs[acc] {
// Do not send the broadcaster back their own changes. chReq.comm is nil if not
// originating from a comm, so won't match in that case.
2023-01-30 16:27:06 +03:00
if c == chReq.comm {
continue
}
c.Lock()
c.changes = append(c.changes, chReq.changes...)
c.Unlock()
2023-01-30 16:27:06 +03:00
select {
case c.Pending <- struct{}{}:
2023-01-30 16:27:06 +03:00
default:
}
}
chReq.done <- struct{}{}
2023-01-30 16:27:06 +03:00
case <-done:
if !switchboardBusy.CompareAndSwap(true, false) {
panic("switchboard already unregistered?")
}
return
}
}
}()
return done
}
// Comm handles communication with the goroutine that maintains the
// account/mailbox/message state.
type Comm struct {
Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
acc *Account
sync.Mutex
changes []Change
2023-01-30 16:27:06 +03:00
}
// Register starts a Comm for the account. Unregister must be called.
func RegisterComm(acc *Account) *Comm {
c := &Comm{
Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
acc: acc,
}
2023-01-30 16:27:06 +03:00
register <- c
return c
}
// Unregister stops this Comm.
func (c *Comm) Unregister() {
unregister <- c
}
// Broadcast ensures changes are sent to other Comms.
func (c *Comm) Broadcast(ch []Change) {
if len(ch) == 0 {
return
}
done := make(chan struct{}, 1)
broadcast <- changeReq{c.acc, c, ch, done}
<-done
2023-01-30 16:27:06 +03:00
}
// Get retrieves all pending changes. If no changes are pending a nil or empty list
2023-01-30 16:27:06 +03:00
// is returned.
func (c *Comm) Get() []Change {
c.Lock()
defer c.Unlock()
l := c.changes
c.changes = nil
return l
}
// BroadcastChanges ensures changes are sent to all listeners on the accoount.
func BroadcastChanges(acc *Account, ch []Change) {
if len(ch) == 0 {
return
}
done := make(chan struct{}, 1)
broadcast <- changeReq{acc, nil, ch, done}
<-done
2023-01-30 16:27:06 +03:00
}