mox/queue/hook.go

1243 lines
37 KiB
Go
Raw Permalink Normal View History

add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
package queue
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/textproto"
"runtime/debug"
"slices"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/dsn"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxvar"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/store"
"github.com/mjl-/mox/webhook"
"github.com/mjl-/mox/webops"
)
var (
metricHookRequest = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "mox_webhook_request_duration_seconds",
Help: "HTTP webhook call duration.",
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 20, 30},
},
)
metricHookResult = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mox_webhook_results_total",
Help: "HTTP webhook call results.",
},
[]string{"code"}, // Known http status codes (e.g. "404"), or "<major>xx" for unknown http status codes, or "error".
)
)
// Hook is a webhook call about a delivery. We'll try delivering with backoff until we succeed or fail.
type Hook struct {
ID int64
QueueMsgID int64 `bstore:"index"` // Original queue Msg/MsgRetired ID. Zero for hooks for incoming messages.
FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
MessageID string // Of outgoing or incoming messages. Includes <>.
Subject string // Subject of original outgoing message, or of incoming message.
Extra map[string]string // From submitted message.
Account string `bstore:"nonzero"`
URL string `bstore:"nonzero"` // Taken from config when webhook is scheduled.
Authorization string // Optional value for authorization header to include in HTTP request.
IsIncoming bool
OutgoingEvent string // Empty string if not outgoing.
Payload string // JSON data to be submitted.
Submitted time.Time `bstore:"default now,index"`
Attempts int
NextAttempt time.Time `bstore:"nonzero,index"` // Index for fast scheduling.
Results []HookResult
}
// HookResult is the result of a single attempt to deliver a webhook.
type HookResult struct {
Start time.Time
Duration time.Duration
URL string
Success bool
Code int // eg 200, 404, 500. 2xx implies success.
Error string
Response string // Max 512 bytes of HTTP response body.
}
// for logging queueing or starting delivery of a hook.
func (h Hook) attrs() []slog.Attr {
event := string(h.OutgoingEvent)
if h.IsIncoming {
event = "incoming"
}
return []slog.Attr{
slog.Int64("webhookid", h.ID),
slog.Int("attempts", h.Attempts),
slog.Int64("msgid", h.QueueMsgID),
slog.String("account", h.Account),
slog.String("url", h.URL),
slog.String("fromid", h.FromID),
slog.String("messageid", h.MessageID),
slog.String("event", event),
slog.Time("nextattempt", h.NextAttempt),
}
}
// LastResult returns the last result entry, or an empty result.
func (h Hook) LastResult() HookResult {
if len(h.Results) == 0 {
return HookResult{}
}
return h.Results[len(h.Results)-1]
}
// Retired returns a HookRetired for a Hook, for insertion into the database.
func (h Hook) Retired(success bool, lastActivity, keepUntil time.Time) HookRetired {
return HookRetired{
ID: h.ID,
QueueMsgID: h.QueueMsgID,
FromID: h.FromID,
MessageID: h.MessageID,
Subject: h.Subject,
Extra: h.Extra,
Account: h.Account,
URL: h.URL,
Authorization: h.Authorization != "",
IsIncoming: h.IsIncoming,
OutgoingEvent: h.OutgoingEvent,
Payload: h.Payload,
Submitted: h.Submitted,
Attempts: h.Attempts,
Results: h.Results,
Success: success,
LastActivity: lastActivity,
KeepUntil: keepUntil,
}
}
// HookRetired is a Hook that was delivered/failed/canceled and kept according
// to the configuration.
type HookRetired struct {
ID int64 // Same as original Hook.ID.
QueueMsgID int64 // Original queue Msg or MsgRetired ID. Zero for hooks for incoming messages.
FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
MessageID string // Of outgoing or incoming messages. Includes <>.
Subject string // Subject of original outgoing message, or of incoming message.
Extra map[string]string // From submitted message.
Account string `bstore:"nonzero,index Account+LastActivity"`
URL string `bstore:"nonzero"` // Taken from config at start of each attempt.
Authorization bool // Whether request had authorization without keeping it around.
IsIncoming bool
OutgoingEvent string
Payload string // JSON data submitted.
Submitted time.Time
SupersededByID int64 // If not 0, a Hook.ID that superseded this one and Done will be true.
Attempts int
Results []HookResult
Success bool
LastActivity time.Time `bstore:"index"`
KeepUntil time.Time `bstore:"index"`
}
// LastResult returns the last result entry, or an empty result.
func (h HookRetired) LastResult() HookResult {
if len(h.Results) == 0 {
return HookResult{}
}
return h.Results[len(h.Results)-1]
}
func cleanupHookRetired(done chan struct{}) {
log := mlog.New("queue", nil)
defer func() {
x := recover()
if x != nil {
log.Error("unhandled panic while cleaning up retired webhooks", slog.Any("x", x))
debug.PrintStack()
metrics.PanicInc(metrics.Queue)
}
}()
timer := time.NewTimer(4 * time.Second)
for {
select {
case <-mox.Shutdown.Done():
done <- struct{}{}
return
case <-timer.C:
}
cleanupHookRetiredSingle(log)
timer.Reset(time.Hour)
}
}
func cleanupHookRetiredSingle(log mlog.Log) {
n, err := bstore.QueryDB[HookRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
log.Check(err, "removing old retired webhooks")
if n > 0 {
log.Debug("cleaned up retired webhooks", slog.Int("count", n))
}
}
func hookRetiredKeep(account string) time.Duration {
keep := 24 * 7 * time.Hour
accConf, ok := mox.Conf.Account(account)
if ok {
keep = accConf.KeepRetiredWebhookPeriod
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
}
return keep
}
// HookFilter filters messages to list or operate on. Used by admin web interface
// and cli.
//
// Only non-empty/non-zero values are applied to the filter. Leaving all fields
// empty/zero matches all hooks.
type HookFilter struct {
Max int
IDs []int64
Account string
Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
Event string // Including "incoming".
}
func (f HookFilter) apply(q *bstore.Query[Hook]) error {
if len(f.IDs) > 0 {
q.FilterIDs(f.IDs)
}
applyTime := func(field string, s string) error {
orig := s
var less bool
if strings.HasPrefix(s, "<") {
less = true
} else if !strings.HasPrefix(s, ">") {
return fmt.Errorf(`must start with "<" for less or ">" for greater than a duration ago`)
}
s = strings.TrimSpace(s[1:])
var t time.Time
if s == "now" {
t = time.Now()
} else if d, err := time.ParseDuration(s); err != nil {
return fmt.Errorf("parsing duration %q: %v", orig, err)
} else {
t = time.Now().Add(d)
}
if less {
q.FilterLess(field, t)
} else {
q.FilterGreater(field, t)
}
return nil
}
if f.Submitted != "" {
if err := applyTime("Submitted", f.Submitted); err != nil {
return fmt.Errorf("applying filter for submitted: %v", err)
}
}
if f.NextAttempt != "" {
if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
return fmt.Errorf("applying filter for next attempt: %v", err)
}
}
if f.Account != "" {
q.FilterNonzero(Hook{Account: f.Account})
}
if f.Event != "" {
if f.Event == "incoming" {
q.FilterNonzero(Hook{IsIncoming: true})
} else {
q.FilterNonzero(Hook{OutgoingEvent: f.Event})
}
}
if f.Max != 0 {
q.Limit(f.Max)
}
return nil
}
type HookSort struct {
Field string // "Queued" or "NextAttempt"/"".
LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
Last any // Value of Field for last object. Must be set iff LastID is set.
Asc bool // Ascending, or descending.
}
func (s HookSort) apply(q *bstore.Query[Hook]) error {
switch s.Field {
case "", "NextAttempt":
s.Field = "NextAttempt"
case "Submitted":
s.Field = "Submitted"
default:
return fmt.Errorf("unknown sort order field %q", s.Field)
}
if s.LastID > 0 {
ls, ok := s.Last.(string)
if !ok {
return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
}
last, err := time.Parse(time.RFC3339Nano, ls)
if err != nil {
last, err = time.Parse(time.RFC3339, ls)
}
if err != nil {
return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
}
q.FilterNotEqual("ID", s.LastID)
var fieldEqual func(h Hook) bool
if s.Field == "NextAttempt" {
fieldEqual = func(h Hook) bool { return h.NextAttempt.Equal(last) }
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
} else {
fieldEqual = func(h Hook) bool { return h.Submitted.Equal(last) }
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
}
if s.Asc {
q.FilterGreaterEqual(s.Field, last)
q.FilterFn(func(h Hook) bool {
return !fieldEqual(h) || h.ID > s.LastID
})
} else {
q.FilterLessEqual(s.Field, last)
q.FilterFn(func(h Hook) bool {
return !fieldEqual(h) || h.ID < s.LastID
})
}
}
if s.Asc {
q.SortAsc(s.Field, "ID")
} else {
q.SortDesc(s.Field, "ID")
}
return nil
}
// HookQueueSize returns the number of webhooks in the queue.
func HookQueueSize(ctx context.Context) (int, error) {
return bstore.QueryDB[Hook](ctx, DB).Count()
}
// HookList returns webhooks according to filter and sort.
func HookList(ctx context.Context, filter HookFilter, sort HookSort) ([]Hook, error) {
q := bstore.QueryDB[Hook](ctx, DB)
if err := filter.apply(q); err != nil {
return nil, err
}
if err := sort.apply(q); err != nil {
return nil, err
}
return q.List()
}
// HookRetiredFilter filters messages to list or operate on. Used by admin web interface
// and cli.
//
// Only non-empty/non-zero values are applied to the filter. Leaving all fields
// empty/zero matches all hooks.
type HookRetiredFilter struct {
Max int
IDs []int64
Account string
Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
Event string // Including "incoming".
}
func (f HookRetiredFilter) apply(q *bstore.Query[HookRetired]) error {
if len(f.IDs) > 0 {
q.FilterIDs(f.IDs)
}
applyTime := func(field string, s string) error {
orig := s
var less bool
if strings.HasPrefix(s, "<") {
less = true
} else if !strings.HasPrefix(s, ">") {
return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
}
s = strings.TrimSpace(s[1:])
var t time.Time
if s == "now" {
t = time.Now()
} else if d, err := time.ParseDuration(s); err != nil {
return fmt.Errorf("parsing duration %q: %v", orig, err)
} else {
t = time.Now().Add(d)
}
if less {
q.FilterLess(field, t)
} else {
q.FilterGreater(field, t)
}
return nil
}
if f.Submitted != "" {
if err := applyTime("Submitted", f.Submitted); err != nil {
return fmt.Errorf("applying filter for submitted: %v", err)
}
}
if f.LastActivity != "" {
if err := applyTime("LastActivity", f.LastActivity); err != nil {
return fmt.Errorf("applying filter for last activity: %v", err)
}
}
if f.Account != "" {
q.FilterNonzero(HookRetired{Account: f.Account})
}
if f.Event != "" {
if f.Event == "incoming" {
q.FilterNonzero(HookRetired{IsIncoming: true})
} else {
q.FilterNonzero(HookRetired{OutgoingEvent: f.Event})
}
}
if f.Max != 0 {
q.Limit(f.Max)
}
return nil
}
type HookRetiredSort struct {
Field string // "Queued" or "LastActivity"/"".
LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
Last any // Value of Field for last object. Must be set iff LastID is set.
Asc bool // Ascending, or descending.
}
func (s HookRetiredSort) apply(q *bstore.Query[HookRetired]) error {
switch s.Field {
case "", "LastActivity":
s.Field = "LastActivity"
case "Submitted":
s.Field = "Submitted"
default:
return fmt.Errorf("unknown sort order field %q", s.Field)
}
if s.LastID > 0 {
ls, ok := s.Last.(string)
if !ok {
return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
}
last, err := time.Parse(time.RFC3339Nano, ls)
if err != nil {
last, err = time.Parse(time.RFC3339, ls)
}
if err != nil {
return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
}
q.FilterNotEqual("ID", s.LastID)
var fieldEqual func(hr HookRetired) bool
if s.Field == "LastActivity" {
fieldEqual = func(hr HookRetired) bool { return hr.LastActivity.Equal(last) }
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
} else {
fieldEqual = func(hr HookRetired) bool { return hr.Submitted.Equal(last) }
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
}
if s.Asc {
q.FilterGreaterEqual(s.Field, last)
q.FilterFn(func(hr HookRetired) bool {
return !fieldEqual(hr) || hr.ID > s.LastID
})
} else {
q.FilterLessEqual(s.Field, last)
q.FilterFn(func(hr HookRetired) bool {
return !fieldEqual(hr) || hr.ID < s.LastID
})
}
}
if s.Asc {
q.SortAsc(s.Field, "ID")
} else {
q.SortDesc(s.Field, "ID")
}
return nil
}
// HookRetiredList returns retired webhooks according to filter and sort.
func HookRetiredList(ctx context.Context, filter HookRetiredFilter, sort HookRetiredSort) ([]HookRetired, error) {
q := bstore.QueryDB[HookRetired](ctx, DB)
if err := filter.apply(q); err != nil {
return nil, err
}
if err := sort.apply(q); err != nil {
return nil, err
}
return q.List()
}
// HookNextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
// kicks the queue.
func HookNextAttemptAdd(ctx context.Context, filter HookFilter, d time.Duration) (affected int, err error) {
err = DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[Hook](tx)
if err := filter.apply(q); err != nil {
return err
}
hooks, err := q.List()
if err != nil {
return fmt.Errorf("listing matching hooks: %v", err)
}
for _, h := range hooks {
h.NextAttempt = h.NextAttempt.Add(d)
if err := tx.Update(&h); err != nil {
return err
}
}
affected = len(hooks)
return nil
})
if err != nil {
return 0, err
}
hookqueueKick()
return affected, nil
}
// HookNextAttemptSet sets NextAttempt for all matching messages to a new absolute
// time and kicks the queue.
func HookNextAttemptSet(ctx context.Context, filter HookFilter, t time.Time) (affected int, err error) {
q := bstore.QueryDB[Hook](ctx, DB)
if err := filter.apply(q); err != nil {
return 0, err
}
n, err := q.UpdateNonzero(Hook{NextAttempt: t})
if err != nil {
return 0, fmt.Errorf("selecting and updating hooks in queue: %v", err)
}
hookqueueKick()
return n, nil
}
// HookCancel prevents more delivery attempts of the hook, moving it to the
// retired list if configured.
func HookCancel(ctx context.Context, log mlog.Log, filter HookFilter) (affected int, err error) {
var hooks []Hook
err = DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[Hook](tx)
if err := filter.apply(q); err != nil {
return err
}
q.Gather(&hooks)
n, err := q.Delete()
if err != nil {
return fmt.Errorf("selecting and deleting hooks from queue: %v", err)
}
if len(hooks) == 0 {
return nil
}
now := time.Now()
for _, h := range hooks {
keep := hookRetiredKeep(h.Account)
if keep > 0 {
hr := h.Retired(false, now, now.Add(keep))
hr.Results = append(hr.Results, HookResult{Start: now, Error: "canceled by admin"})
if err := tx.Insert(&hr); err != nil {
return fmt.Errorf("inserting retired hook: %v", err)
}
}
}
affected = n
return nil
})
if err != nil {
return 0, err
}
for _, h := range hooks {
log.Info("canceled hook", h.attrs()...)
}
hookqueueKick()
return affected, nil
}
func hookCompose(m Msg, url, authz string, event webhook.OutgoingEvent, suppressing bool, code int, secodeOpt string) (Hook, error) {
now := time.Now()
var lastError string
if len(m.Results) > 0 {
lastError = m.Results[len(m.Results)-1].Error
}
var ecode string
if secodeOpt != "" {
ecode = fmt.Sprintf("%d.%s", code/100, secodeOpt)
}
data := webhook.Outgoing{
Event: event,
Suppressing: suppressing,
QueueMsgID: m.ID,
FromID: m.FromID,
MessageID: m.MessageID,
Subject: m.Subject,
WebhookQueued: now,
Error: lastError,
SMTPCode: code,
SMTPEnhancedCode: ecode,
Extra: m.Extra,
}
if data.Extra == nil {
data.Extra = map[string]string{}
}
payload, err := json.Marshal(data)
if err != nil {
return Hook{}, fmt.Errorf("marshal webhook payload: %v", err)
}
h := Hook{
QueueMsgID: m.ID,
FromID: m.FromID,
MessageID: m.MessageID,
Subject: m.Subject,
Extra: m.Extra,
Account: m.SenderAccount,
URL: url,
Authorization: authz,
IsIncoming: false,
OutgoingEvent: string(event),
Payload: string(payload),
Submitted: now,
NextAttempt: now,
}
return h, nil
}
// Incoming processes a message delivered over SMTP for webhooks. If the message is
// a DSN, a webhook for outgoing deliveries may be scheduled (if configured).
// Otherwise, a webhook for incoming deliveries may be scheduled.
func Incoming(ctx context.Context, log mlog.Log, acc *store.Account, messageID string, m store.Message, part message.Part, mailboxName string) error {
now := time.Now()
var data any
log = log.With(
slog.Int64("msgid", m.ID),
slog.String("messageid", messageID),
slog.String("mailbox", mailboxName),
)
// todo future: if there is no fromid in our rcpt address, but this is a 3-part dsn with headers that includes message-id, try matching based on that.
// todo future: once we implement the SMTP DSN extension, use ENVID when sending (if destination implements it), and start looking for Original-Envelope-ID in the DSN.
// If this is a DSN for a message we sent, don't deliver a hook for incoming
// message, but an outgoing status webhook.
var fromID string
dom, err := dns.ParseDomain(m.RcptToDomain)
if err != nil {
log.Debugx("parsing recipient domain in incoming message", err)
} else {
domconf, _ := mox.Conf.Domain(dom)
if domconf.LocalpartCatchallSeparator != "" {
t := strings.SplitN(string(m.RcptToLocalpart), domconf.LocalpartCatchallSeparator, 2)
if len(t) == 2 {
fromID = t[1]
}
}
}
var outgoingEvent webhook.OutgoingEvent
var queueMsgID int64
var subject string
if fromID != "" {
err := DB.Write(ctx, func(tx *bstore.Tx) (rerr error) {
mr, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Get()
if err == bstore.ErrAbsent {
log.Debug("no original message found for fromid", slog.String("fromid", fromID))
return nil
} else if err != nil {
return fmt.Errorf("looking up original message for fromid: %v", err)
}
queueMsgID = mr.ID
subject = mr.Subject
log = log.With(slog.String("fromid", fromID))
log.Debug("processing incoming message about previous delivery for webhooks")
// We'll record this message in the results.
mr.LastActivity = now
mr.Results = append(mr.Results, MsgResult{Start: now, Error: "incoming message"})
result := &mr.Results[len(mr.Results)-1] // Updated below.
outgoingEvent = webhook.EventUnrecognized
var suppressedMsgIDs []int64
var isDSN bool
var code int
var secode string
defer func() {
if rerr == nil {
var ecode string
if secode != "" {
ecode = fmt.Sprintf("%d.%s", code/100, secode)
}
data = webhook.Outgoing{
Event: outgoingEvent,
DSN: isDSN,
Suppressing: len(suppressedMsgIDs) > 0,
QueueMsgID: mr.ID,
FromID: fromID,
MessageID: mr.MessageID,
Subject: mr.Subject,
WebhookQueued: now,
SMTPCode: code,
SMTPEnhancedCode: ecode,
Extra: mr.Extra,
}
if err := tx.Update(&mr); err != nil {
rerr = fmt.Errorf("updating retired message after processing: %v", err)
return
}
}
}()
if !(part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT" && len(part.Parts) >= 2 && part.Parts[1].MediaType == "MESSAGE" && (part.Parts[1].MediaSubType == "DELIVERY-STATUS" || part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS")) {
// Some kind of delivery-related event, but we don't recognize it.
result.Error = "incoming message not a dsn"
return nil
}
isDSN = true
dsnutf8 := part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS"
dsnmsg, err := dsn.Decode(part.Parts[1].ReaderUTF8OrBinary(), dsnutf8)
if err != nil {
log.Infox("parsing dsn message for webhook", err)
result.Error = fmt.Sprintf("parsing incoming dsn: %v", err)
return nil
} else if len(dsnmsg.Recipients) != 1 {
log.Info("dsn message for webhook does not have exactly one dsn recipient", slog.Int("nrecipients", len(dsnmsg.Recipients)))
result.Error = fmt.Sprintf("incoming dsn has %d recipients, expecting 1", len(dsnmsg.Recipients))
return nil
}
dsnrcpt := dsnmsg.Recipients[0]
if dsnrcpt.DiagnosticCodeSMTP != "" {
code, secode = parseSMTPCodes(dsnrcpt.DiagnosticCodeSMTP)
}
if code == 0 && dsnrcpt.Status != "" {
if strings.HasPrefix(dsnrcpt.Status, "4.") {
code = 400
secode = dsnrcpt.Status[2:]
} else if strings.HasPrefix(dsnrcpt.Status, "5.") {
code = 500
secode = dsnrcpt.Status[2:]
}
}
result.Code = code
result.Secode = secode
log.Debug("incoming dsn message", slog.String("action", string(dsnrcpt.Action)), slog.Int("dsncode", code), slog.String("dsnsecode", secode))
switch s := dsnrcpt.Action; s {
case dsn.Failed:
outgoingEvent = webhook.EventFailed
if code != 0 {
sc := suppressionCheck{
MsgID: mr.ID,
Account: acc.Name,
Recipient: mr.Recipient(),
Code: code,
Secode: secode,
Source: "DSN",
}
suppressedMsgIDs, err = suppressionProcess(log, tx, sc)
if err != nil {
return fmt.Errorf("processing dsn for suppression list: %v", err)
}
} else {
log.Debug("no code/secode in dsn for failed delivery", slog.Int64("msgid", mr.ID))
}
case dsn.Delayed, dsn.Delivered, dsn.Relayed, dsn.Expanded:
outgoingEvent = webhook.OutgoingEvent(string(s))
result.Success = s != dsn.Delayed
default:
log.Info("unrecognized dsn action", slog.String("action", string(dsnrcpt.Action)))
}
return nil
})
if err != nil {
return fmt.Errorf("processing message based on fromid: %v", err)
}
}
accConf, _ := acc.Conf()
var hookURL, authz string
var isIncoming bool
if data == nil {
if accConf.IncomingWebhook == nil {
return nil
}
hookURL = accConf.IncomingWebhook.URL
authz = accConf.IncomingWebhook.Authorization
log.Debug("composing webhook for incoming message")
isIncoming = true
var rcptTo string
if m.RcptToDomain != "" {
rcptTo = m.RcptToLocalpart.String() + "@" + m.RcptToDomain
}
in := webhook.Incoming{
Structure: webhook.PartStructure(&part),
Meta: webhook.IncomingMeta{
MsgID: m.ID,
MailFrom: m.MailFrom,
MailFromValidated: m.MailFromValidated,
MsgFromValidated: m.MsgFromValidated,
RcptTo: rcptTo,
DKIMVerifiedDomains: m.DKIMDomains,
RemoteIP: m.RemoteIP,
Received: m.Received,
MailboxName: mailboxName,
},
}
if in.Meta.DKIMVerifiedDomains == nil {
in.Meta.DKIMVerifiedDomains = []string{}
}
if env := part.Envelope; env != nil {
subject = env.Subject
in.From = addresses(env.From)
in.To = addresses(env.To)
in.CC = addresses(env.CC)
in.BCC = addresses(env.BCC)
in.ReplyTo = addresses(env.ReplyTo)
in.Subject = env.Subject
in.MessageID = env.MessageID
in.InReplyTo = env.InReplyTo
if !env.Date.IsZero() {
in.Date = &env.Date
}
}
// todo: ideally, we would have this information available in parsed Part, not require parsing headers here.
h, err := part.Header()
if err != nil {
log.Debugx("parsing headers of incoming message", err, slog.Int64("msgid", m.ID))
} else {
refs, err := message.ReferencedIDs(h.Values("References"), nil)
if err != nil {
log.Debugx("parsing references header", err, slog.Int64("msgid", m.ID))
}
for i, r := range refs {
refs[i] = "<" + r + ">"
}
if refs == nil {
refs = []string{}
}
in.References = refs
// Check if message is automated. Empty SMTP MAIL FROM indicates this was some kind
// of service message. Several headers indicate out-of-office replies, messages
// from mailing or marketing lists. And the content-type can indicate a report
// (e.g. DSN/MDN).
in.Meta.Automated = m.MailFrom == "" || isAutomated(h) || part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT"
}
text, html, _, err := webops.ReadableParts(part, 1*1024*1024)
if err != nil {
log.Debugx("looking for text and html content in message", err)
}
in.Text = strings.ReplaceAll(text, "\r\n", "\n")
in.HTML = strings.ReplaceAll(html, "\r\n", "\n")
data = in
} else if accConf.OutgoingWebhook == nil {
return nil
} else if len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(outgoingEvent)) {
hookURL = accConf.OutgoingWebhook.URL
authz = accConf.OutgoingWebhook.Authorization
} else {
log.Debug("not sending webhook, account not subscribed for event", slog.String("event", string(outgoingEvent)))
return nil
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshal webhook payload: %v", err)
}
h := Hook{
QueueMsgID: queueMsgID,
FromID: fromID,
MessageID: messageID,
Subject: subject,
Account: acc.Name,
URL: hookURL,
Authorization: authz,
IsIncoming: isIncoming,
OutgoingEvent: string(outgoingEvent),
Payload: string(payload),
Submitted: now,
NextAttempt: now,
}
err = DB.Write(ctx, func(tx *bstore.Tx) error {
if err := hookInsert(tx, &h, now, accConf.KeepRetiredWebhookPeriod); err != nil {
return fmt.Errorf("queueing webhook for incoming message: %v", err)
}
return nil
})
if err != nil {
return fmt.Errorf("inserting webhook in database: %v", err)
}
log.Debug("queued webhook for incoming message", h.attrs()...)
hookqueueKick()
return nil
}
func isAutomated(h textproto.MIMEHeader) bool {
l := []string{"List-Id", "List-Unsubscribe", "List-Unsubscribe-Post", "Precedence"}
for _, k := range l {
if h.Get(k) != "" {
return true
}
}
if s := strings.TrimSpace(h.Get("Auto-Submitted")); s != "" && !strings.EqualFold(s, "no") {
return true
}
return false
}
func parseSMTPCodes(line string) (code int, secode string) {
t := strings.SplitN(line, " ", 3)
if len(t) <= 1 || len(t[0]) != 3 {
return 0, ""
}
v, err := strconv.ParseUint(t[0], 10, 64)
if err != nil || code >= 600 {
return 0, ""
}
if len(t) >= 2 && (strings.HasPrefix(t[1], "4.") || strings.HasPrefix(t[1], "5.")) {
secode = t[1][2:]
}
return int(v), secode
}
// Insert hook into database, but first retire any existing pending hook for
// QueueMsgID if it is > 0.
func hookInsert(tx *bstore.Tx, h *Hook, now time.Time, accountKeepPeriod time.Duration) error {
if err := tx.Insert(h); err != nil {
return fmt.Errorf("insert webhook: %v", err)
}
if h.QueueMsgID == 0 {
return nil
}
// Find existing queued hook for previously msgid from queue. Can be at most one.
oh, err := bstore.QueryTx[Hook](tx).FilterNonzero(Hook{QueueMsgID: h.QueueMsgID}).FilterNotEqual("ID", h.ID).Get()
if err == bstore.ErrAbsent {
return nil
} else if err != nil {
return fmt.Errorf("get existing webhook before inserting new hook for same queuemsgid %d", h.QueueMsgID)
}
// Retire this queued hook.
// This hook may be in the process of being delivered. When delivered, we'll try to
// move it from Hook to HookRetired. But that will fail since Hook is already
// retired. We detect that situation and update the retired hook with the new
// (final) result.
if accountKeepPeriod > 0 {
hr := oh.Retired(false, now, now.Add(accountKeepPeriod))
hr.SupersededByID = h.ID
if err := tx.Insert(&hr); err != nil {
return fmt.Errorf("inserting superseded webhook as retired hook: %v", err)
}
}
if err := tx.Delete(&oh); err != nil {
return fmt.Errorf("deleting superseded webhook: %v", err)
}
return nil
}
func addresses(al []message.Address) []webhook.NameAddress {
l := make([]webhook.NameAddress, len(al))
for i, a := range al {
addr := a.User + "@" + a.Host
pa, err := smtp.ParseAddress(addr)
if err == nil {
addr = pa.Pack(true)
}
l[i] = webhook.NameAddress{
Name: a.Name,
Address: addr,
}
}
return l
}
var (
hookqueue = make(chan struct{}, 1)
hookDeliveryResults = make(chan string, 1)
)
func hookqueueKick() {
select {
case hookqueue <- struct{}{}:
default:
}
}
func startHookQueue(done chan struct{}) {
log := mlog.New("queue", nil)
busyHookURLs := map[string]struct{}{}
timer := time.NewTimer(0)
for {
select {
case <-mox.Shutdown.Done():
for len(busyHookURLs) > 0 {
url := <-hookDeliveryResults
delete(busyHookURLs, url)
}
add a webapi and webhooks for a simple http/json-based api for applications to compose/send messages, receive delivery feedback, and maintain suppression lists. this is an alternative to applications using a library to compose messages, submitting those messages using smtp, and monitoring a mailbox with imap for DSNs, which can be processed into the equivalent of suppression lists. but you need to know about all these standards/protocols and find libraries. by using the webapi & webhooks, you just need a http & json library. unfortunately, there is no standard for these kinds of api, so mox has made up yet another one... matching incoming DSNs about deliveries to original outgoing messages requires keeping history of "retired" messages (delivered from the queue, either successfully or failed). this can be enabled per account. history is also useful for debugging deliveries. we now also keep history of each delivery attempt, accessible while still in the queue, and kept when a message is retired. the queue webadmin pages now also have pagination, to show potentially large history. a queue of webhook calls is now managed too. failures are retried similar to message deliveries. webhooks can also be saved to the retired list after completing. also configurable per account. messages can be sent with a "unique smtp mail from" address. this can only be used if the domain is configured with a localpart catchall separator such as "+". when enabled, a queued message gets assigned a random "fromid", which is added after the separator when sending. when DSNs are returned, they can be related to previously sent messages based on this fromid. in the future, we can implement matching on the "envid" used in the smtp dsn extension, or on the "message-id" of the message. using a fromid can be triggered by authenticating with a login email address that is configured as enabling fromid. suppression lists are automatically managed per account. if a delivery attempt results in certain smtp errors, the destination address is added to the suppression list. future messages queued for that recipient will immediately fail without a delivery attempt. suppression lists protect your mail server reputation. submitted messages can carry "extra" data through the queue and webhooks for outgoing deliveries. through webapi as a json object, through smtp submission as message headers of the form "x-mox-extra-<key>: value". to make it easy to test webapi/webhooks locally, the "localserve" mode actually puts messages in the queue. when it's time to deliver, it still won't do a full delivery attempt, but just delivers to the sender account. unless the recipient address has a special form, simulating a failure to deliver. admins now have more control over the queue. "hold rules" can be added to mark newly queued messages as "on hold", pausing delivery. rules can be about certain sender or recipient domains/addresses, or apply to all messages pausing the entire queue. also useful for (local) testing. new config options have been introduced. they are editable through the admin and/or account web interfaces. the webapi http endpoints are enabled for newly generated configs with the quickstart, and in localserve. existing configurations must explicitly enable the webapi in mox.conf. gopherwatch.org was created to dogfood this code. it initially used just the compose/smtpclient/imapclient mox packages to send messages and process delivery feedback. it will get a config option to use the mox webapi/webhooks instead. the gopherwatch code to use webapi/webhook is smaller and simpler, and developing that shaped development of the mox webapi/webhooks. for issue #31 by cuu508
2024-04-15 22:49:02 +03:00
done <- struct{}{}
return
case <-hookqueue:
case <-timer.C:
case url := <-hookDeliveryResults:
delete(busyHookURLs, url)
}
if len(busyHookURLs) >= maxConcurrentHookDeliveries {
continue
}
hookLaunchWork(log, busyHookURLs)
timer.Reset(hookNextWork(mox.Shutdown, log, busyHookURLs))
}
}
func hookNextWork(ctx context.Context, log mlog.Log, busyURLs map[string]struct{}) time.Duration {
q := bstore.QueryDB[Hook](ctx, DB)
if len(busyURLs) > 0 {
var urls []any
for u := range busyURLs {
urls = append(urls, u)
}
q.FilterNotEqual("URL", urls...)
}
q.SortAsc("NextAttempt")
q.Limit(1)
h, err := q.Get()
if err == bstore.ErrAbsent {
return 24 * time.Hour
} else if err != nil {
log.Errorx("finding time for next webhook delivery attempt", err)
return 1 * time.Minute
}
return time.Until(h.NextAttempt)
}
func hookLaunchWork(log mlog.Log, busyURLs map[string]struct{}) int {
q := bstore.QueryDB[Hook](mox.Shutdown, DB)
q.FilterLessEqual("NextAttempt", time.Now())
q.SortAsc("NextAttempt")
q.Limit(maxConcurrentHookDeliveries)
if len(busyURLs) > 0 {
var urls []any
for u := range busyURLs {
urls = append(urls, u)
}
q.FilterNotEqual("URL", urls...)
}
var hooks []Hook
seen := map[string]bool{}
err := q.ForEach(func(h Hook) error {
u := h.URL
if _, ok := busyURLs[u]; !ok && !seen[u] {
seen[u] = true
hooks = append(hooks, h)
}
return nil
})
if err != nil {
log.Errorx("querying for work in webhook queue", err)
mox.Sleep(mox.Shutdown, 1*time.Second)
return -1
}
for _, h := range hooks {
busyURLs[h.URL] = struct{}{}
go hookDeliver(log, h)
}
return len(hooks)
}
var hookIntervals []time.Duration
func init() {
const M = time.Minute
const H = time.Hour
hookIntervals = []time.Duration{M, 2 * M, 4 * M, 15 * M / 2, 15 * M, 30 * M, 1 * H, 2 * H, 4 * H, 8 * H, 16 * H}
}
func hookDeliver(log mlog.Log, h Hook) {
ctx := mox.Shutdown
qlog := log.WithCid(mox.Cid())
qlog.Debug("attempting to deliver webhook", h.attrs()...)
qlog = qlog.With(slog.Int64("webhookid", h.ID))
defer func() {
hookDeliveryResults <- h.URL
x := recover()
if x != nil {
qlog.Error("webhook deliver panic", slog.Any("panic", x))
debug.PrintStack()
metrics.PanicInc(metrics.Queue)
}
}()
// todo: should we get a new webhook url from the config before attempting? would intervene with our "urls busy" approach. may not be worth it.
// Set Attempts & NextAttempt early. In case of failures while processing, at least
// we won't try again immediately. We do backoff at intervals:
var backoff time.Duration
if h.Attempts < len(hookIntervals) {
backoff = hookIntervals[h.Attempts]
} else {
backoff = hookIntervals[len(hookIntervals)-1] * time.Duration(2)
}
backoff += time.Duration(jitter.Intn(200)-100) * backoff / 10000
h.Attempts++
now := time.Now()
h.NextAttempt = now.Add(backoff)
h.Results = append(h.Results, HookResult{Start: now, URL: h.URL, Error: resultErrorDelivering})
result := &h.Results[len(h.Results)-1]
if err := DB.Update(mox.Shutdown, &h); err != nil {
qlog.Errorx("storing webhook delivery attempt", err)
return
}
hctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
t0 := time.Now()
code, response, err := HookPost(hctx, qlog, h.ID, h.Attempts, h.URL, h.Authorization, h.Payload)
result.Duration = time.Since(t0)
result.Success = err == nil
result.Code = code
result.Error = ""
result.Response = response
if err != nil {
result.Error = err.Error()
}
if err != nil && h.Attempts <= len(hookIntervals) {
// We'll try again later, so only update existing record.
qlog.Debugx("webhook delivery failed, will try again later", err)
xerr := DB.Write(context.Background(), func(tx *bstore.Tx) error {
if err := tx.Update(&h); err == bstore.ErrAbsent {
return updateRetiredHook(tx, h, result)
} else if err != nil {
return fmt.Errorf("update webhook after retryable failure: %v", err)
}
return nil
})
qlog.Check(xerr, "updating failed webhook delivery attempt in database", slog.String("deliveryerr", err.Error()))
return
}
qlog.Debugx("webhook delivery completed", err, slog.Bool("success", result.Success))
// Move Hook to HookRetired.
err = DB.Write(context.Background(), func(tx *bstore.Tx) error {
if err := tx.Delete(&h); err == bstore.ErrAbsent {
return updateRetiredHook(tx, h, result)
} else if err != nil {
return fmt.Errorf("removing webhook from database: %v", err)
}
keep := hookRetiredKeep(h.Account)
if keep > 0 {
hr := h.Retired(result.Success, t0, t0.Add(keep))
if err := tx.Insert(&hr); err != nil {
return fmt.Errorf("inserting retired webhook in database: %v", err)
}
}
return nil
})
qlog.Check(err, "moving delivered webhook from to retired hooks")
}
func updateRetiredHook(tx *bstore.Tx, h Hook, result *HookResult) error {
// Hook is gone. It may have been superseded and moved to HookRetired while we were
// delivering it. If so, add the result to the retired hook.
hr := HookRetired{ID: h.ID}
if err := tx.Get(&hr); err != nil {
return fmt.Errorf("result for webhook that was no longer in webhook queue or retired webhooks: %v", err)
}
result.Error += "(superseded)"
hr.Results = append(hr.Results, *result)
if err := tx.Update(&hr); err != nil {
return fmt.Errorf("updating retired webhook after webhook was superseded during delivery: %v", err)
}
return nil
}
var hookClient = &http.Client{Transport: hookTransport()}
func hookTransport() *http.Transport {
t := http.DefaultTransport.(*http.Transport).Clone()
// Limit resources consumed during idle periods, probably most of the time. But
// during busy periods, we may use the few connections for many events.
t.IdleConnTimeout = 5 * time.Second
t.MaxIdleConnsPerHost = 2
return t
}
func HookPost(ctx context.Context, log mlog.Log, hookID int64, attempt int, url, authz string, payload string) (code int, response string, err error) {
req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(payload))
if err != nil {
return 0, "", fmt.Errorf("new request: %v", err)
}
req.Header.Set("User-Agent", fmt.Sprintf("mox/%s (webhook)", moxvar.Version))
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("X-Mox-Webhook-ID", fmt.Sprintf("%d", hookID))
req.Header.Set("X-Mox-Webhook-Attempt", fmt.Sprintf("%d", attempt))
if authz != "" {
req.Header.Set("Authorization", authz)
}
t0 := time.Now()
resp, err := hookClient.Do(req)
metricHookRequest.Observe(float64(time.Since(t0)) / float64(time.Second))
if err != nil {
metricHookResult.WithLabelValues("error").Inc()
log.Debugx("webhook http transaction", err)
return 0, "", fmt.Errorf("http transact: %v", err)
}
defer resp.Body.Close()
// Use full http status code for known codes, and a generic "<major>xx" for others.
result := fmt.Sprintf("%d", resp.StatusCode)
if http.StatusText(resp.StatusCode) == "" {
result = fmt.Sprintf("%dxx", resp.StatusCode/100)
}
metricHookResult.WithLabelValues(result).Inc()
log.Debug("webhook http post result", slog.Int("statuscode", resp.StatusCode), slog.Duration("duration", time.Since(t0)))
respbuf, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("http status %q, expected 200 ok", resp.Status)
}
return resp.StatusCode, string(respbuf), err
}