From b7ec84b80a1f17d53822325dca8abb11dcbb46e7 Mon Sep 17 00:00:00 2001 From: Mechiel Lukkien Date: Sun, 28 Apr 2024 22:48:51 +0200 Subject: [PATCH] queue: when shutting down, wait for pending deliveries before signaling that shutdown is complete also fixes flaky test, which is how i found it --- queue/hook.go | 4 ++++ queue/queue.go | 4 ++++ queue/queue_test.go | 5 +---- serve.go | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/queue/hook.go b/queue/hook.go index 4a028c0..bc740eb 100644 --- a/queue/hook.go +++ b/queue/hook.go @@ -1006,6 +1006,10 @@ func startHookQueue(done chan struct{}) { for { select { case <-mox.Shutdown.Done(): + for len(busyHookURLs) > 0 { + url := <-hookDeliveryResults + delete(busyHookURLs, url) + } done <- struct{}{} return case <-hookqueue: diff --git a/queue/queue.go b/queue/queue.go index ec1d8b8..3cbbb56 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -1154,6 +1154,10 @@ func startQueue(resolver dns.Resolver, done chan struct{}) { for { select { case <-mox.Shutdown.Done(): + for len(busyDomains) > 0 { + domain := <-deliveryResults + delete(busyDomains, domain) + } done <- struct{}{} return case <-msgqueue: diff --git a/queue/queue_test.go b/queue/queue_test.go index adbb27e..e2f1a63 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -1183,7 +1183,7 @@ func TestQueueStart(t *testing.T) { _, cleanup := setup(t) defer cleanup() - done := make(chan struct{}, 4) + done := make(chan struct{}) defer func() { mox.ShutdownCancel() // Wait for message and hooks deliverers and cleaners. @@ -1269,7 +1269,6 @@ func TestQueueStart(t *testing.T) { t.Fatalf("kick changed %d messages, expected 1", n) } checkDialed(true) - time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done... // Submit another, should be delivered immediately without HoldRule. path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}} @@ -1280,8 +1279,6 @@ func TestQueueStart(t *testing.T) { err = Add(ctxbg, pkglog, "mjl", mf, qm) tcheck(t, err, "add message to queue for delivery") checkDialed(true) // Immediate. - - time.Sleep(100 * time.Millisecond) // Racy... give time to finish. } func TestListFilterSort(t *testing.T) { diff --git a/serve.go b/serve.go index 2c3a33b..ac7ea8c 100644 --- a/serve.go +++ b/serve.go @@ -78,7 +78,7 @@ func start(mtastsdbRefresher, sendDMARCReports, sendTLSReports, skipForkExec boo return fmt.Errorf("tlsrpt init: %s", err) } - done := make(chan struct{}, 4) // Goroutines for messages and webhooks, and cleaners. + done := make(chan struct{}) // Goroutines for messages and webhooks, and cleaners. if err := queue.Start(dns.StrictResolver{Pkg: "queue"}, done); err != nil { return fmt.Errorf("queue start: %s", err) }