queue: when shutting down, wait for pending deliveries before signaling that shutdown is complete

also fixes flaky test, which is how i found it
This commit is contained in:
Mechiel Lukkien 2024-04-28 22:48:51 +02:00
parent ff6cca1bf9
commit b7ec84b80a
No known key found for this signature in database
4 changed files with 10 additions and 5 deletions

View file

@ -1006,6 +1006,10 @@ func startHookQueue(done chan struct{}) {
for { for {
select { select {
case <-mox.Shutdown.Done(): case <-mox.Shutdown.Done():
for len(busyHookURLs) > 0 {
url := <-hookDeliveryResults
delete(busyHookURLs, url)
}
done <- struct{}{} done <- struct{}{}
return return
case <-hookqueue: case <-hookqueue:

View file

@ -1154,6 +1154,10 @@ func startQueue(resolver dns.Resolver, done chan struct{}) {
for { for {
select { select {
case <-mox.Shutdown.Done(): case <-mox.Shutdown.Done():
for len(busyDomains) > 0 {
domain := <-deliveryResults
delete(busyDomains, domain)
}
done <- struct{}{} done <- struct{}{}
return return
case <-msgqueue: case <-msgqueue:

View file

@ -1183,7 +1183,7 @@ func TestQueueStart(t *testing.T) {
_, cleanup := setup(t) _, cleanup := setup(t)
defer cleanup() defer cleanup()
done := make(chan struct{}, 4) done := make(chan struct{})
defer func() { defer func() {
mox.ShutdownCancel() mox.ShutdownCancel()
// Wait for message and hooks deliverers and cleaners. // Wait for message and hooks deliverers and cleaners.
@ -1269,7 +1269,6 @@ func TestQueueStart(t *testing.T) {
t.Fatalf("kick changed %d messages, expected 1", n) t.Fatalf("kick changed %d messages, expected 1", n)
} }
checkDialed(true) checkDialed(true)
time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done...
// Submit another, should be delivered immediately without HoldRule. // Submit another, should be delivered immediately without HoldRule.
path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}} path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
@ -1280,8 +1279,6 @@ func TestQueueStart(t *testing.T) {
err = Add(ctxbg, pkglog, "mjl", mf, qm) err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery") tcheck(t, err, "add message to queue for delivery")
checkDialed(true) // Immediate. checkDialed(true) // Immediate.
time.Sleep(100 * time.Millisecond) // Racy... give time to finish.
} }
func TestListFilterSort(t *testing.T) { func TestListFilterSort(t *testing.T) {

View file

@ -78,7 +78,7 @@ func start(mtastsdbRefresher, sendDMARCReports, sendTLSReports, skipForkExec boo
return fmt.Errorf("tlsrpt init: %s", err) return fmt.Errorf("tlsrpt init: %s", err)
} }
done := make(chan struct{}, 4) // Goroutines for messages and webhooks, and cleaners. done := make(chan struct{}) // Goroutines for messages and webhooks, and cleaners.
if err := queue.Start(dns.StrictResolver{Pkg: "queue"}, done); err != nil { if err := queue.Start(dns.StrictResolver{Pkg: "queue"}, done); err != nil {
return fmt.Errorf("queue start: %s", err) return fmt.Errorf("queue start: %s", err)
} }