diff --git a/ctl_test.go b/ctl_test.go index d9e42cc..7713c7e 100644 --- a/ctl_test.go +++ b/ctl_test.go @@ -48,6 +48,7 @@ func TestCtl(t *testing.T) { err := queue.Init() tcheck(t, err, "queue init") + defer queue.Shutdown() testctl := func(fn func(clientctl *ctl)) { t.Helper() @@ -426,10 +427,13 @@ func TestCtl(t *testing.T) { // "backup", backup account. err = dmarcdb.Init() tcheck(t, err, "dmarcdb init") + defer dmarcdb.Close() err = mtastsdb.Init(false) tcheck(t, err, "mtastsdb init") + defer mtastsdb.Close() err = tlsrptdb.Init() tcheck(t, err, "tlsrptdb init") + defer tlsrptdb.Close() testctl(func(ctl *ctl) { os.RemoveAll("testdata/ctl/data/tmp/backup-data") err := os.WriteFile("testdata/ctl/data/receivedid.key", make([]byte, 16), 0600) diff --git a/develop.txt b/develop.txt index f13da0f..b6bc935 100644 --- a/develop.txt +++ b/develop.txt @@ -308,7 +308,7 @@ done - Update features & roadmap in README.md and website. - Write release notes, copy from previous. - Build and run tests with previous major Go release. -- Run tests, including with race detector, also with TZ= for UTC-behaviour. +- Run tests, including with race detector, also with TZ= for UTC-behaviour, and with -count 2. - Run integration and upgrade tests. - Run fuzzing tests for a while. - Deploy to test environment. Test the update instructions. diff --git a/mox-/lifecycle_test.go b/mox-/lifecycle_test.go index 9a8056f..fe6a187 100644 --- a/mox-/lifecycle_test.go +++ b/mox-/lifecycle_test.go @@ -6,24 +6,17 @@ import ( "net" "os" "testing" - - "github.com/prometheus/client_golang/prometheus" ) func TestLifecycle(t *testing.T) { Shutdown, ShutdownCancel = context.WithCancel(context.Background()) - c := &connections{ - conns: map[net.Conn]connKind{}, - gauges: map[connKind]prometheus.GaugeFunc{}, - active: map[connKind]int64{}, - } nc0, nc1 := net.Pipe() defer nc0.Close() defer nc1.Close() - c.Register(nc0, "proto", "listener") - c.Shutdown() + Connections.Register(nc0, "proto", "listener") + Connections.Shutdown() - done := c.Done() + done := Connections.Done() select { case <-done: t.Fatalf("already done, but still a connection open") @@ -37,7 +30,7 @@ func TestLifecycle(t *testing.T) { if !errors.Is(err, os.ErrDeadlineExceeded) { t.Fatalf("got %v, expected os.ErrDeadlineExceeded", err) } - c.Unregister(nc0) + Connections.Unregister(nc0) select { case <-done: default: diff --git a/tlsrptsend/send_test.go b/tlsrptsend/send_test.go index bccc08f..3cbb36c 100644 --- a/tlsrptsend/send_test.go +++ b/tlsrptsend/send_test.go @@ -46,6 +46,7 @@ func TestSendReports(t *testing.T) { err := tlsrptdb.Init() tcheckf(t, err, "init database") + defer tlsrptdb.Close() db := tlsrptdb.ResultDB diff --git a/webaccount/account_test.go b/webaccount/account_test.go index 8a947c2..f2b2e23 100644 --- a/webaccount/account_test.go +++ b/webaccount/account_test.go @@ -230,6 +230,7 @@ func TestAccount(t *testing.T) { err = queue.Init() // For DB. tcheck(t, err, "queue init") + defer queue.Shutdown() account, _, _, _ := api.Account(ctx) @@ -250,6 +251,9 @@ func TestAccount(t *testing.T) { api.AccountSaveFullName(ctx, account.FullName) go ImportManage() + defer func() { + importers.Stop <- struct{}{} + }() // Import mbox/maildir tgz/zip. testImport := func(filename string, expect int) { diff --git a/webaccount/import.go b/webaccount/import.go index be51a39..d523a08 100644 --- a/webaccount/import.go +++ b/webaccount/import.go @@ -57,11 +57,13 @@ var importers = struct { Unregister chan *importListener Events chan importEvent Abort chan importAbortRequest + Stop chan struct{} }{ make(chan *importListener, 1), make(chan *importListener, 1), make(chan importEvent), make(chan importAbortRequest), + make(chan struct{}), } // ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections. @@ -89,38 +91,38 @@ func ImportManage() { select { case l := <-importers.Register: // If we have state, send it so the client is up to date. - if s, ok := imports[l.Token]; ok { - l.Register <- true - s.Listeners[l] = struct{}{} + s, ok := imports[l.Token] + l.Register <- ok + if !ok { + break + } + s.Listeners[l] = struct{}{} - sendEvent := func(kind string, v any) { - buf, err := json.Marshal(v) - if err != nil { - log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v)) - return - } - ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf) + sendEvent := func(kind string, v any) { + buf, err := json.Marshal(v) + if err != nil { + log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v)) + return + } + ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf) - select { - case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}: - default: - log.Debug("dropped initial import event to slow consumer") - } + select { + case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}: + default: + log.Debug("dropped initial import event to slow consumer") } + } - for m, c := range s.MailboxCounts { - sendEvent("count", importCount{m, c}) - } - for _, p := range s.Problems { - sendEvent("problem", importProblem{p}) - } - if s.Done != nil { - sendEvent("done", importDone{}) - } else if s.Aborted != nil { - sendEvent("aborted", importAborted{}) - } - } else { - l.Register <- false + for m, c := range s.MailboxCounts { + sendEvent("count", importCount{m, c}) + } + for _, p := range s.Problems { + sendEvent("problem", importProblem{p}) + } + if s.Done != nil { + sendEvent("done", importDone{}) + } else if s.Aborted != nil { + sendEvent("aborted", importAborted{}) } case l := <-importers.Unregister: @@ -172,6 +174,9 @@ func ImportManage() { } s.Cancel() a.Response <- nil + + case <-importers.Stop: + return } // Cleanup old state. diff --git a/webadmin/admin_test.go b/webadmin/admin_test.go index aaa5f90..8de371b 100644 --- a/webadmin/admin_test.go +++ b/webadmin/admin_test.go @@ -220,6 +220,7 @@ func TestAdmin(t *testing.T) { mox.MustLoadConfig(true, false) err := queue.Init() tcheck(t, err, "queue init") + defer queue.Shutdown() api := Admin{} diff --git a/webapisrv/server_test.go b/webapisrv/server_test.go index 1120caa..faaff75 100644 --- a/webapisrv/server_test.go +++ b/webapisrv/server_test.go @@ -65,6 +65,7 @@ func TestServer(t *testing.T) { defer store.Switchboard()() err := queue.Init() tcheckf(t, err, "queue init") + defer queue.Shutdown() log := mlog.New("webapisrv", nil) acc, err := store.OpenAccount(log, "mjl") diff --git a/webmail/api_test.go b/webmail/api_test.go index 5658878..ab46197 100644 --- a/webmail/api_test.go +++ b/webmail/api_test.go @@ -314,6 +314,7 @@ func TestAPI(t *testing.T) { queue.Localserve = true // Deliver directly to us instead attempting actual delivery. err = queue.Init() tcheck(t, err, "queue init") + defer queue.Shutdown() api.MessageSubmit(ctx, SubmitMessage{ From: "mjl@mox.example", To: []string{"mjl+to@mox.example", "mjl to2 "},