mirror of
https://github.com/mjl-/mox.git
synced 2024-12-26 16:33:47 +03:00
for localserve, don't special-case smtp submit
the recent webmail addition added localserve local delivery in queue.Add, so we just that for smtpserver too. and don't drop incoming smtp deliver messages, but deliver as normal.
This commit is contained in:
parent
ce91b7d23e
commit
a30d8c1378
1 changed files with 49 additions and 109 deletions
|
@ -1752,102 +1752,43 @@ func (c *conn) submit(ctx context.Context, recvHdrFor func(string) string, msgWr
|
|||
}
|
||||
msgPrefix = append(msgPrefix, []byte(authResults.Header())...)
|
||||
|
||||
if Localserve {
|
||||
var timeout bool
|
||||
c.account.WithWLock(func() {
|
||||
for i, rcptAcc := range c.recipients {
|
||||
var code int
|
||||
code, timeout = localserveNeedsError(rcptAcc.rcptTo.Localpart)
|
||||
if timeout {
|
||||
// Get out of wlock, and sleep there.
|
||||
return
|
||||
} else if code != 0 {
|
||||
c.log.Info("failure due to special localpart", mlog.Field("code", code))
|
||||
xsmtpServerErrorf(codes{code, smtp.SeOther00}, "failure with code %d due to special localpart", code)
|
||||
}
|
||||
|
||||
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
|
||||
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
|
||||
if !msgWriter.HaveHeaders {
|
||||
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
|
||||
}
|
||||
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
|
||||
|
||||
ipmasked1, ipmasked2, ipmasked3 := ipmasked(c.remoteIP)
|
||||
m := store.Message{
|
||||
Received: time.Now(),
|
||||
RemoteIP: c.remoteIP.String(),
|
||||
RemoteIPMasked1: ipmasked1,
|
||||
RemoteIPMasked2: ipmasked2,
|
||||
RemoteIPMasked3: ipmasked3,
|
||||
EHLODomain: c.hello.Domain.Name(),
|
||||
MailFrom: c.mailFrom.String(),
|
||||
MailFromLocalpart: c.mailFrom.Localpart,
|
||||
MailFromDomain: c.mailFrom.IPDomain.Domain.Name(),
|
||||
RcptToLocalpart: rcptAcc.rcptTo.Localpart,
|
||||
RcptToDomain: rcptAcc.rcptTo.IPDomain.Domain.Name(),
|
||||
MsgFromLocalpart: msgFrom.Localpart,
|
||||
MsgFromDomain: msgFrom.Domain.Name(),
|
||||
MsgFromOrgDomain: publicsuffix.Lookup(ctx, msgFrom.Domain).Name(),
|
||||
EHLOValidated: true,
|
||||
MailFromValidated: true,
|
||||
MsgFromValidated: true,
|
||||
EHLOValidation: store.ValidationPass,
|
||||
MailFromValidation: store.ValidationRelaxed,
|
||||
MsgFromValidation: store.ValidationRelaxed,
|
||||
DKIMDomains: nil,
|
||||
Size: msgSize,
|
||||
MsgPrefix: xmsgPrefix,
|
||||
}
|
||||
|
||||
if err := c.account.Deliver(c.log, rcptAcc.destination, &m, dataFile, i == len(c.recipients)-1); err != nil {
|
||||
// Aborting the transaction is not great. But continuing and generating DSNs will
|
||||
// probably result in errors as well...
|
||||
metricSubmission.WithLabelValues("localserveerror").Inc()
|
||||
c.log.Errorx("delivering message", err)
|
||||
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
|
||||
}
|
||||
metricSubmission.WithLabelValues("ok").Inc()
|
||||
c.log.Info("submitted message delivered", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
|
||||
|
||||
err := c.account.DB.Insert(ctx, &store.Outgoing{Recipient: rcptAcc.rcptTo.XString(true)})
|
||||
xcheckf(err, "adding outgoing message")
|
||||
// We always deliver through the queue. It would be more efficient to deliver
|
||||
// directly, but we don't want to circumvent all the anti-spam measures. Accounts
|
||||
// on a single mox instance should be allowed to block each other.
|
||||
for i, rcptAcc := range c.recipients {
|
||||
if Localserve {
|
||||
code, timeout := localserveNeedsError(rcptAcc.rcptTo.Localpart)
|
||||
if timeout {
|
||||
c.log.Info("timing out submission due to special localpart")
|
||||
mox.Sleep(mox.Context, time.Hour)
|
||||
xsmtpServerErrorf(codes{smtp.C451LocalErr, smtp.SeSys3Other0}, "timing out submission due to special localpart")
|
||||
} else if code != 0 {
|
||||
c.log.Info("failure due to special localpart", mlog.Field("code", code))
|
||||
xsmtpServerErrorf(codes{code, smtp.SeOther00}, "failure with code %d due to special localpart", code)
|
||||
}
|
||||
})
|
||||
|
||||
if timeout {
|
||||
c.log.Info("timing out submission due to special localpart")
|
||||
mox.Sleep(mox.Context, time.Hour)
|
||||
xsmtpServerErrorf(codes{smtp.C451LocalErr, smtp.SeSys3Other0}, "timing out submission due to special localpart")
|
||||
}
|
||||
|
||||
} else {
|
||||
// We always deliver through the queue. It would be more efficient to deliver
|
||||
// directly, but we don't want to circumvent all the anti-spam measures. Accounts
|
||||
// on a single mox instance should be allowed to block each other.
|
||||
|
||||
for i, rcptAcc := range c.recipients {
|
||||
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
|
||||
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
|
||||
if !msgWriter.HaveHeaders {
|
||||
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
|
||||
}
|
||||
|
||||
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
|
||||
if _, err := queue.Add(ctx, c.log, c.account.Name, *c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, messageID, xmsgPrefix, dataFile, nil, i == len(c.recipients)-1); err != nil {
|
||||
// Aborting the transaction is not great. But continuing and generating DSNs will
|
||||
// probably result in errors as well...
|
||||
metricSubmission.WithLabelValues("queueerror").Inc()
|
||||
c.log.Errorx("queuing message", err)
|
||||
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
|
||||
}
|
||||
metricSubmission.WithLabelValues("ok").Inc()
|
||||
c.log.Info("message queued for delivery", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
|
||||
|
||||
err := c.account.DB.Insert(ctx, &store.Outgoing{Recipient: rcptAcc.rcptTo.XString(true)})
|
||||
xcheckf(err, "adding outgoing message")
|
||||
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
|
||||
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
|
||||
if !msgWriter.HaveHeaders {
|
||||
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
|
||||
}
|
||||
|
||||
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
|
||||
if _, err := queue.Add(ctx, c.log, c.account.Name, *c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, messageID, xmsgPrefix, dataFile, nil, i == len(c.recipients)-1); err != nil {
|
||||
// Aborting the transaction is not great. But continuing and generating DSNs will
|
||||
// probably result in errors as well...
|
||||
metricSubmission.WithLabelValues("queueerror").Inc()
|
||||
c.log.Errorx("queuing message", err)
|
||||
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
|
||||
}
|
||||
metricSubmission.WithLabelValues("ok").Inc()
|
||||
c.log.Info("message queued for delivery", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
|
||||
|
||||
err := c.account.DB.Insert(ctx, &store.Outgoing{Recipient: rcptAcc.rcptTo.XString(true)})
|
||||
xcheckf(err, "adding outgoing message")
|
||||
}
|
||||
|
||||
err = dataFile.Close()
|
||||
c.log.Check(err, "closing file after submission")
|
||||
*pdataFile = nil
|
||||
|
@ -2472,25 +2413,24 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
|
|||
metricDelivery.WithLabelValues("delivererror", "localserve").Inc()
|
||||
addError(rcptAcc, code, smtp.SeOther00, false, fmt.Sprintf("failure with code %d due to special localpart", code))
|
||||
}
|
||||
} else {
|
||||
acc.WithWLock(func() {
|
||||
if err := acc.DeliverMailbox(log, a.mailbox, m, dataFile, false); err != nil {
|
||||
log.Errorx("delivering", err)
|
||||
metricDelivery.WithLabelValues("delivererror", a.reason).Inc()
|
||||
addError(rcptAcc, smtp.C451LocalErr, smtp.SeSys3Other0, false, "error processing")
|
||||
return
|
||||
}
|
||||
metricDelivery.WithLabelValues("delivered", a.reason).Inc()
|
||||
log.Info("incoming message delivered", mlog.Field("reason", a.reason), mlog.Field("msgfrom", msgFrom))
|
||||
|
||||
conf, _ := acc.Conf()
|
||||
if conf.RejectsMailbox != "" && messageID != "" {
|
||||
if err := acc.RejectsRemove(log, conf.RejectsMailbox, messageID); err != nil {
|
||||
log.Errorx("removing message from rejects mailbox", err, mlog.Field("messageid", messageID))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
acc.WithWLock(func() {
|
||||
if err := acc.DeliverMailbox(log, a.mailbox, m, dataFile, false); err != nil {
|
||||
log.Errorx("delivering", err)
|
||||
metricDelivery.WithLabelValues("delivererror", a.reason).Inc()
|
||||
addError(rcptAcc, smtp.C451LocalErr, smtp.SeSys3Other0, false, "error processing")
|
||||
return
|
||||
}
|
||||
metricDelivery.WithLabelValues("delivered", a.reason).Inc()
|
||||
log.Info("incoming message delivered", mlog.Field("reason", a.reason), mlog.Field("msgfrom", msgFrom))
|
||||
|
||||
conf, _ := acc.Conf()
|
||||
if conf.RejectsMailbox != "" && messageID != "" {
|
||||
if err := acc.RejectsRemove(log, conf.RejectsMailbox, messageID); err != nil {
|
||||
log.Errorx("removing message from rejects mailbox", err, mlog.Field("messageid", messageID))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
err = acc.Close()
|
||||
log.Check(err, "closing account after delivering")
|
||||
|
|
Loading…
Reference in a new issue