mirror of
https://github.com/mjl-/mox.git
synced 2024-12-27 08:53:48 +03:00
5336032088
so users can easily take their email out of somewhere else, and import it into mox. this goes a little way to give feedback as the import progresses: upload progress is shown (surprisingly, browsers aren't doing this...), imported mailboxes/messages are counted (batched) and import issues/warnings are displayed, all sent over an SSE connection. an import token is stored in sessionstorage. if you reload the page (e.g. after a connection error), the browser will reconnect to the running import and show its progress again. and you can just abort the import before it is finished and committed, and nothing will have changed. this also imports flags/keywords from mbox files.
535 lines
16 KiB
Go
535 lines
16 KiB
Go
package queue
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/dns"
|
|
"github.com/mjl-/mox/mox-"
|
|
"github.com/mjl-/mox/smtp"
|
|
"github.com/mjl-/mox/store"
|
|
)
|
|
|
|
func tcheck(t *testing.T, err error, msg string) {
|
|
if err != nil {
|
|
t.Helper()
|
|
t.Fatalf("%s: %s", msg, err)
|
|
}
|
|
}
|
|
|
|
func setup(t *testing.T) (*store.Account, func()) {
|
|
// Prepare config so email can be delivered to mjl@mox.example.
|
|
os.RemoveAll("../testdata/queue/data")
|
|
mox.Context = context.Background()
|
|
mox.ConfigStaticPath = "../testdata/queue/mox.conf"
|
|
mox.MustLoadConfig()
|
|
acc, err := store.OpenAccount("mjl")
|
|
tcheck(t, err, "open account")
|
|
err = acc.SetPassword("testtest")
|
|
tcheck(t, err, "set password")
|
|
switchDone := store.Switchboard()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(context.Background())
|
|
return acc, func() {
|
|
acc.Close()
|
|
mox.ShutdownCancel()
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(context.Background())
|
|
Shutdown()
|
|
close(switchDone)
|
|
}
|
|
}
|
|
|
|
var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
|
|
To: <mjl@mox.example>
|
|
Subject: test
|
|
|
|
test email
|
|
`, "\n", "\r\n")
|
|
|
|
func prepareFile(t *testing.T) *os.File {
|
|
t.Helper()
|
|
msgFile, err := store.CreateMessageTemp("queue")
|
|
tcheck(t, err, "create temp message for delivery to queue")
|
|
_, err = msgFile.Write([]byte(testmsg))
|
|
tcheck(t, err, "write message file")
|
|
return msgFile
|
|
}
|
|
|
|
func TestQueue(t *testing.T) {
|
|
acc, cleanup := setup(t)
|
|
defer cleanup()
|
|
err := Init()
|
|
tcheck(t, err, "queue init")
|
|
|
|
msgs, err := List()
|
|
tcheck(t, err, "listing messages in queue")
|
|
if len(msgs) != 0 {
|
|
t.Fatalf("got %d messages in queue, expected 0", len(msgs))
|
|
}
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
mf2 := prepareFile(t)
|
|
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, mf2, nil, false)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
os.Remove(mf2.Name())
|
|
|
|
msgs, err = List()
|
|
tcheck(t, err, "listing queue")
|
|
if len(msgs) != 2 {
|
|
t.Fatalf("got msgs %v, expected 1", msgs)
|
|
}
|
|
msg := msgs[0]
|
|
if msg.Attempts != 0 {
|
|
t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
|
|
}
|
|
n, err := Drop(msgs[1].ID, "", "")
|
|
tcheck(t, err, "drop")
|
|
if n != 1 {
|
|
t.Fatalf("dropped %d, expected 1", n)
|
|
}
|
|
|
|
next := nextWork(nil)
|
|
if next > 0 {
|
|
t.Fatalf("nextWork in %s, should be now", next)
|
|
}
|
|
busy := map[string]struct{}{"mox.example": {}}
|
|
if x := nextWork(busy); x != 24*time.Hour {
|
|
t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
|
|
}
|
|
if nn := launchWork(nil, busy); nn != 0 {
|
|
t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
|
|
}
|
|
|
|
// Override dial function. We'll make connecting fail for now.
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
dialed := make(chan struct{}, 1)
|
|
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
dialed <- struct{}{}
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
|
|
launchWork(resolver, map[string]struct{}{})
|
|
|
|
// Wait until we see the dial and the failed attempt.
|
|
timer := time.NewTimer(time.Second)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-dialed:
|
|
i := 0
|
|
for {
|
|
m, err := bstore.QueryDB[Msg](queueDB).Get()
|
|
tcheck(t, err, "get")
|
|
if m.Attempts == 1 {
|
|
break
|
|
}
|
|
i++
|
|
if i == 10 {
|
|
t.Fatalf("message in queue not updated")
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
case <-timer.C:
|
|
t.Fatalf("no dial within 1s")
|
|
}
|
|
<-deliveryResult // Deliver sends here.
|
|
|
|
_, err = OpenMessage(msg.ID + 1)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
|
|
}
|
|
reader, err := OpenMessage(msg.ID)
|
|
tcheck(t, err, "open message")
|
|
defer reader.Close()
|
|
msgbuf, err := io.ReadAll(reader)
|
|
tcheck(t, err, "read message")
|
|
if string(msgbuf) != testmsg {
|
|
t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
|
|
}
|
|
|
|
n, err = Kick(msg.ID+1, "", "")
|
|
tcheck(t, err, "kick")
|
|
if n != 0 {
|
|
t.Fatalf("kick %d, expected 0", n)
|
|
}
|
|
n, err = Kick(msg.ID, "", "")
|
|
tcheck(t, err, "kick")
|
|
if n != 1 {
|
|
t.Fatalf("kicked %d, expected 1", n)
|
|
}
|
|
|
|
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
|
|
// client-side to the invocation dial, for the attempted delivery from the queue.
|
|
// The delivery should succeed.
|
|
server, client := net.Pipe()
|
|
defer server.Close()
|
|
defer client.Close()
|
|
|
|
smtpdone := make(chan struct{})
|
|
go func() {
|
|
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to cyclic dependencies.
|
|
fmt.Fprintf(server, "220 mox.example\r\n")
|
|
br := bufio.NewReader(server)
|
|
br.ReadString('\n') // Should be EHLO.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be MAIL FROM.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be RCPT TO.
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be DATA.
|
|
fmt.Fprintf(server, "354 continue\r\n")
|
|
reader := smtp.NewDataReader(br)
|
|
io.Copy(io.Discard, reader)
|
|
fmt.Fprintf(server, "250 ok\r\n")
|
|
br.ReadString('\n') // Should be QUIT.
|
|
fmt.Fprintf(server, "221 ok\r\n")
|
|
|
|
smtpdone <- struct{}{}
|
|
}()
|
|
|
|
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
dialed <- struct{}{}
|
|
return client, nil
|
|
}
|
|
launchWork(resolver, map[string]struct{}{})
|
|
|
|
timer.Reset(time.Second)
|
|
select {
|
|
case <-dialed:
|
|
select {
|
|
case <-smtpdone:
|
|
i := 0
|
|
for {
|
|
xmsgs, err := List()
|
|
tcheck(t, err, "list queue")
|
|
if len(xmsgs) == 0 {
|
|
break
|
|
}
|
|
i++
|
|
if i == 10 {
|
|
t.Fatalf("%d messages in queue, expected 0", len(xmsgs))
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
case <-timer.C:
|
|
t.Fatalf("no deliver within 1s")
|
|
}
|
|
case <-timer.C:
|
|
t.Fatalf("no dial within 1s")
|
|
}
|
|
<-deliveryResult // Deliver sends here.
|
|
|
|
// Add another message that we'll fail to deliver entirely.
|
|
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
|
|
msgs, err = List()
|
|
tcheck(t, err, "list queue")
|
|
if len(msgs) != 1 {
|
|
t.Fatalf("queue has %d messages, expected 1", len(msgs))
|
|
}
|
|
msg = msgs[0]
|
|
|
|
prepServer := func(code string) (net.Conn, func()) {
|
|
server, client := net.Pipe()
|
|
go func() {
|
|
fmt.Fprintf(server, "%s mox.example\r\n", code)
|
|
server.Close()
|
|
}()
|
|
return client, func() {
|
|
server.Close()
|
|
client.Close()
|
|
}
|
|
}
|
|
|
|
conn2, cleanup2 := prepServer("220")
|
|
conn3, cleanup3 := prepServer("451")
|
|
defer func() {
|
|
cleanup2()
|
|
cleanup3()
|
|
}()
|
|
|
|
seq := 0
|
|
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
seq++
|
|
switch seq {
|
|
default:
|
|
return nil, fmt.Errorf("connect error from test")
|
|
case 2:
|
|
return conn2, nil
|
|
case 3:
|
|
return conn3, nil
|
|
}
|
|
}
|
|
|
|
comm := store.RegisterComm(acc)
|
|
defer comm.Unregister()
|
|
|
|
for i := 1; i < 8; i++ {
|
|
go func() { <-deliveryResult }() // Deliver sends here.
|
|
deliver(resolver, msg)
|
|
err = queueDB.Get(&msg)
|
|
tcheck(t, err, "get msg")
|
|
if msg.Attempts != i {
|
|
t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
|
|
}
|
|
if msg.Attempts == 5 {
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trigger final failure.
|
|
go func() { <-deliveryResult }() // Deliver sends here.
|
|
deliver(resolver, msg)
|
|
err = queueDB.Get(&msg)
|
|
if err != bstore.ErrAbsent {
|
|
t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
|
|
}
|
|
|
|
timer.Reset(time.Second)
|
|
changes := make(chan struct{}, 1)
|
|
go func() {
|
|
comm.Get()
|
|
changes <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-changes:
|
|
case <-timer.C:
|
|
t.Fatalf("no dsn in 1s")
|
|
}
|
|
}
|
|
|
|
// test Start and that it attempts to deliver.
|
|
func TestQueueStart(t *testing.T) {
|
|
// Override dial function. We'll make connecting fail and check the attempt.
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
|
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
|
}
|
|
dialed := make(chan struct{}, 1)
|
|
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
dialed <- struct{}{}
|
|
return nil, fmt.Errorf("failure from test")
|
|
}
|
|
|
|
_, cleanup := setup(t)
|
|
defer cleanup()
|
|
done := make(chan struct{}, 1)
|
|
defer func() {
|
|
mox.ShutdownCancel()
|
|
<-done
|
|
mox.Shutdown, mox.ShutdownCancel = context.WithCancel(context.Background())
|
|
}()
|
|
err := Start(resolver, done)
|
|
tcheck(t, err, "queue start")
|
|
|
|
checkDialed := func(need bool) {
|
|
t.Helper()
|
|
d := time.Second / 10
|
|
if need {
|
|
d = time.Second
|
|
}
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-dialed:
|
|
if !need {
|
|
t.Fatalf("unexpected dial attempt")
|
|
}
|
|
case <-timer.C:
|
|
if need {
|
|
t.Fatalf("expected to see a dial attempt")
|
|
}
|
|
}
|
|
}
|
|
|
|
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
|
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
|
tcheck(t, err, "add message to queue for delivery")
|
|
checkDialed(true)
|
|
|
|
// Don't change message nextattempt time, but kick queue. Message should not be delivered.
|
|
queuekick()
|
|
checkDialed(false)
|
|
|
|
// Kick for real, should see another attempt.
|
|
n, err := Kick(0, "mox.example", "")
|
|
tcheck(t, err, "kick queue")
|
|
if n != 1 {
|
|
t.Fatalf("kick changed %d messages, expected 1", n)
|
|
}
|
|
checkDialed(true)
|
|
time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done...
|
|
}
|
|
|
|
func TestWriteFile(t *testing.T) {
|
|
name := "../testdata/queue.test"
|
|
os.Remove(name)
|
|
defer os.Remove(name)
|
|
err := writeFile(name, strings.NewReader("test"))
|
|
if err != nil {
|
|
t.Fatalf("writeFile, unexpected error %v", err)
|
|
}
|
|
buf, err := os.ReadFile(name)
|
|
if err != nil || string(buf) != "test" {
|
|
t.Fatalf("writeFile, read file, got err %v, data %q", err, buf)
|
|
}
|
|
}
|
|
|
|
func TestGatherHosts(t *testing.T) {
|
|
mox.Context = context.Background()
|
|
|
|
// Test basic MX lookup case, but also following CNAME, detecting CNAME loops and
|
|
// having a CNAME limit, connecting directly to a host, and domain that does not
|
|
// exist or has temporary error.
|
|
|
|
resolver := dns.MockResolver{
|
|
MX: map[string][]*net.MX{
|
|
"basic.example.": {{Host: "mail.basic.example.", Pref: 10}},
|
|
"multimx.example.": {{Host: "mail1.multimx.example.", Pref: 10}, {Host: "mail2.multimx.example.", Pref: 10}},
|
|
"nullmx.example.": {{Host: ".", Pref: 10}},
|
|
"temperror-mx.example.": {{Host: "absent.example.", Pref: 10}},
|
|
},
|
|
A: map[string][]string{
|
|
"mail.basic.example": {"10.0.0.1"},
|
|
"justhost.example.": {"10.0.0.1"}, // No MX record for domain, only an A record.
|
|
"temperror-a.example.": {"10.0.0.1"},
|
|
},
|
|
AAAA: map[string][]string{
|
|
"justhost6.example.": {"2001:db8::1"}, // No MX record for domain, only an AAAA record.
|
|
},
|
|
CNAME: map[string]string{
|
|
"cname.example.": "basic.example.",
|
|
"cnameloop.example.": "cnameloop2.example.",
|
|
"cnameloop2.example.": "cnameloop.example.",
|
|
"danglingcname.example.": "absent.example.", // Points to missing name.
|
|
"temperror-cname.example.": "absent.example.",
|
|
},
|
|
Fail: map[dns.Mockreq]struct{}{
|
|
{Type: "mx", Name: "temperror-mx.example."}: {},
|
|
{Type: "host", Name: "temperror-a.example."}: {},
|
|
{Type: "cname", Name: "temperror-cname.example."}: {},
|
|
},
|
|
}
|
|
for i := 0; i <= 16; i++ {
|
|
s := fmt.Sprintf("cnamelimit%d.example.", i)
|
|
next := fmt.Sprintf("cnamelimit%d.example.", i+1)
|
|
resolver.CNAME[s] = next
|
|
}
|
|
|
|
test := func(ipd dns.IPDomain, expHosts []dns.IPDomain, expDomain dns.Domain, expPerm bool, expErr error) {
|
|
t.Helper()
|
|
|
|
m := Msg{RecipientDomain: ipd}
|
|
hosts, ed, perm, err := gatherHosts(resolver, m, 1, xlog)
|
|
if (err == nil) != (expErr == nil) || err != nil && !errors.Is(err, expErr) {
|
|
// todo: could also check the individual errors? code currently does not have structured errors.
|
|
t.Fatalf("gather hosts: %v", err)
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !reflect.DeepEqual(hosts, expHosts) || ed != expDomain || perm != expPerm {
|
|
t.Fatalf("got hosts %#v, effectiveDomain %#v, permanent %#v, expected %#v %#v %#v", hosts, ed, perm, expHosts, expDomain, expPerm)
|
|
}
|
|
}
|
|
|
|
domain := func(s string) dns.Domain {
|
|
d, err := dns.ParseDomain(s)
|
|
if err != nil {
|
|
t.Fatalf("parse domain: %v", err)
|
|
}
|
|
return d
|
|
}
|
|
ipdomain := func(s string) dns.IPDomain {
|
|
ip := net.ParseIP(s)
|
|
if ip != nil {
|
|
return dns.IPDomain{IP: ip}
|
|
}
|
|
d, err := dns.ParseDomain(s)
|
|
if err != nil {
|
|
t.Fatalf("parse domain %q: %v", s, err)
|
|
}
|
|
return dns.IPDomain{Domain: d}
|
|
}
|
|
|
|
ipdomains := func(s ...string) (l []dns.IPDomain) {
|
|
for _, e := range s {
|
|
l = append(l, ipdomain(e))
|
|
}
|
|
return
|
|
}
|
|
|
|
var zerodom dns.Domain
|
|
|
|
test(ipdomain("10.0.0.1"), ipdomains("10.0.0.1"), zerodom, false, nil)
|
|
test(ipdomain("basic.example"), ipdomains("mail.basic.example"), domain("basic.example"), false, nil) // Basic with simple MX.
|
|
test(ipdomain("multimx.example"), ipdomains("mail1.multimx.example", "mail2.multimx.example"), domain("multimx.example"), false, nil) // Basic with simple MX.
|
|
test(ipdomain("justhost.example"), ipdomains("justhost.example"), domain("justhost.example"), false, nil) // Only an A record.
|
|
test(ipdomain("justhost6.example"), ipdomains("justhost6.example"), domain("justhost6.example"), false, nil) // Only an AAAA record.
|
|
test(ipdomain("cname.example"), ipdomains("mail.basic.example"), domain("basic.example"), false, nil) // Follow CNAME.
|
|
test(ipdomain("cnamelimit1.example"), nil, zerodom, true, errCNAMELimit)
|
|
test(ipdomain("cnameloop.example"), nil, zerodom, true, errCNAMELoop)
|
|
test(ipdomain("absent.example"), nil, zerodom, true, errNoRecord)
|
|
test(ipdomain("danglingcname.example"), nil, zerodom, true, errNoRecord)
|
|
test(ipdomain("nullmx.example"), nil, zerodom, true, errNoMail)
|
|
test(ipdomain("temperror-mx.example"), nil, zerodom, false, errDNS)
|
|
test(ipdomain("temperror-cname.example"), nil, zerodom, false, errDNS)
|
|
test(ipdomain("temperror-a.example"), nil, zerodom, false, errDNS)
|
|
}
|
|
|
|
func TestDialHost(t *testing.T) {
|
|
// We mostly want to test that dialing a second time switches to the other address family.
|
|
|
|
resolver := dns.MockResolver{
|
|
A: map[string][]string{
|
|
"dualstack.example.": {"10.0.0.1"},
|
|
},
|
|
AAAA: map[string][]string{
|
|
"dualstack.example.": {"2001:db8::1"},
|
|
},
|
|
}
|
|
|
|
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
|
return nil, nil // No error, nil connection isn't used.
|
|
}
|
|
|
|
ipdomain := func(s string) dns.IPDomain {
|
|
return dns.IPDomain{Domain: dns.Domain{ASCII: s}}
|
|
}
|
|
|
|
m := Msg{DialedIPs: map[string][]net.IP{}}
|
|
_, ip, dualstack, err := dialHost(context.Background(), xlog, resolver, ipdomain("dualstack.example"), &m)
|
|
if err != nil || ip.String() != "10.0.0.1" || !dualstack {
|
|
t.Fatalf("expected err nil, address 10.0.0.1, dualstack true, got %v %v %v", err, ip, dualstack)
|
|
}
|
|
_, ip, dualstack, err = dialHost(context.Background(), xlog, resolver, ipdomain("dualstack.example"), &m)
|
|
if err != nil || ip.String() != "2001:db8::1" || !dualstack {
|
|
t.Fatalf("expected err nil, address 2001:db8::1, dualstack true, got %v %v %v", err, ip, dualstack)
|
|
}
|
|
}
|