add per-account quota for total message size disk usage

so a single user cannot fill up the disk.
by default, there is (still) no limit. a default can be set in the config file
for all accounts, and a per-account max size can be set that would override any
global setting.

this does not take into account disk usage of the index database. and also not
of any file system overhead.
This commit is contained in:
Mechiel Lukkien 2023-12-20 20:54:12 +01:00
parent e048d0962b
commit d73bda7511
No known key found for this signature in database
28 changed files with 434 additions and 50 deletions

View file

@ -71,6 +71,7 @@ type Static struct {
NoOutgoingDMARCReports bool `sconf:"optional" sconf-doc:"Do not send DMARC reports (aggregate only). By default, aggregate reports on DMARC evaluations are sent to domains if their DMARC policy requests them. Reports are sent at whole hours, with a minimum of 1 hour and maximum of 24 hours, rounded up so a whole number of intervals cover 24 hours, aligned at whole days in UTC. Reports are sent from the postmaster@<mailhostname> address."`
NoOutgoingTLSReports bool `sconf:"optional" sconf-doc:"Do not send TLS reports. By default, reports about failed SMTP STARTTLS connections and related MTA-STS/DANE policies are sent to domains if their TLSRPT DNS record requests them. Reports covering a 24 hour UTC interval are sent daily. Reports are sent from the postmaster address of the configured domain the mailhostname is in. If there is no such domain, or it does not have DKIM configured, no reports are sent."`
OutgoingTLSReportsForAllSuccess bool `sconf:"optional" sconf-doc:"Also send TLS reports if there were no SMTP STARTTLS connection failures. By default, reports are only sent when at least one failure occurred. If a report is sent, it does always include the successful connection counts as well."`
QuotaMessageSize int64 `sconf:"optional" sconf-doc:"Default maximum total message size for accounts, only applicable if greater than zero. Can be overridden per account. Attempting to add new messages beyond the maximum size will result in an error. Useful to prevent a single account from filling storage. The quota only applies to the email message files, not to any file system overhead and also not the message index database file (account for approximately 15% overhead)."`
// All IPs that were explicitly listen on for external SMTP. Only set when there
// are no unspecified external SMTP listeners and there is at most one for IPv4 and
@ -354,6 +355,7 @@ type Account struct {
SubjectPass struct {
Period time.Duration `sconf-doc:"How long unique values are accepted after generating, e.g. 12h."` // todo: have a reasonable default for this?
} `sconf:"optional" sconf-doc:"If configured, messages classified as weakly spam are rejected with instructions to retry delivery, but this time with a signed token added to the subject. During the next delivery attempt, the signed token will bypass the spam filter. Messages with a clear spam signal, such as a known bad reputation, are rejected/delayed without a signed token."`
QuotaMessageSize int64 `sconf:"optional" sconf-doc:"Default maximum total message size for the account, overriding any globally configured maximum size if non-zero. A negative value can be used to have no limit in case there is a limit by default. Attempting to add new messages beyond the maximum size will result in an error. Useful to prevent a single account from filling storage."`
RejectsMailbox string `sconf:"optional" sconf-doc:"Mail that looks like spam will be rejected, but a copy can be stored temporarily in a mailbox, e.g. Rejects. If mail isn't coming in when you expect, you can look there. The mail still isn't accepted, so the remote mail server may retry (hopefully, if legitimate), or give up (hopefully, if indeed a spammer). Messages are automatically removed from this mailbox, so do not set it to a mailbox that has messages you want to keep."`
KeepRejects bool `sconf:"optional" sconf-doc:"Don't automatically delete mail in the RejectsMailbox listed above. This can be useful, e.g. for future spam training."`
AutomaticJunkFlags struct {

View file

@ -588,6 +588,14 @@ describe-static" and "mox config describe-domains":
# (optional)
OutgoingTLSReportsForAllSuccess: false
# Default maximum total message size for accounts, only applicable if greater than
# zero. Can be overridden per account. Attempting to add new messages beyond the
# maximum size will result in an error. Useful to prevent a single account from
# filling storage. The quota only applies to the email message files, not to any
# file system overhead and also not the message index database file (account for
# approximately 15% overhead). (optional)
QuotaMessageSize: 0
# domains.conf
# NOTE: This config file is in 'sconf' format. Indent with tabs. Comments must be
@ -849,6 +857,13 @@ describe-static" and "mox config describe-domains":
# How long unique values are accepted after generating, e.g. 12h.
Period: 0s
# Default maximum total message size for the account, overriding any globally
# configured maximum size if non-zero. A negative value can be used to have no
# limit in case there is a limit by default. Attempting to add new messages beyond
# the maximum size will result in an error. Useful to prevent a single account
# from filling storage. (optional)
QuotaMessageSize: 0
# Mail that looks like spam will be rejected, but a copy can be stored temporarily
# in a mailbox, e.g. Rejects. If mail isn't coming in when you expect, you can
# look there. The mail still isn't accepted, so the remote mail server may retry

21
ctl.go
View file

@ -685,11 +685,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
acc.WithWLock(func() {
var changes []store.Change
err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
return bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
var totalSize int64
err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
mc, err := mb.CalculateCounts(tx)
if err != nil {
return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
}
totalSize += mc.Size
if !mb.HaveCounts || mc != mb.MailboxCounts {
_, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
@ -703,6 +705,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
}
return nil
})
if err != nil {
return err
}
du := store.DiskUsage{ID: 1}
if err := tx.Get(&du); err != nil {
return fmt.Errorf("get disk usage: %v", err)
}
if du.MessageSize != totalSize {
_, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
ctl.xcheck(err, "write")
du.MessageSize = totalSize
if err := tx.Update(&du); err != nil {
return fmt.Errorf("update disk usage: %v", err)
}
}
return nil
})
ctl.xcheck(err, "write transaction for mailbox counts")

9
doc.go
View file

@ -977,12 +977,13 @@ Ensure messages in the database have a pre-parsed MIME form in the database.
# mox recalculatemailboxcounts
Recalculate message counts for all mailboxes in the account.
Recalculate message counts for all mailboxes in the account, and total message size for quota.
When a message is added to/removed from a mailbox, or when message flags change,
the total, unread, unseen and deleted messages are accounted, and the total size
of the mailbox. In case of a bug in this accounting, the numbers could become
incorrect. This command will find, fix and print them.
the total, unread, unseen and deleted messages are accounted, the total size of
the mailbox, and the total message size for the account. In case of a bug in
this accounting, the numbers could become incorrect. This command will find, fix
and print them.
usage: mox recalculatemailboxcounts account

View file

@ -284,7 +284,7 @@ Accounts:
xcheckf(err, "creating temp file for delivery")
_, err = fmt.Fprint(mf, msg)
xcheckf(err, "writing deliver message to file")
err = accTest1.DeliverMessage(c.log, tx, &m, mf, false, true, false)
err = accTest1.DeliverMessage(c.log, tx, &m, mf, false, true, false, true)
mfname := mf.Name()
xcheckf(err, "add message to account test1")
@ -344,7 +344,7 @@ Accounts:
xcheckf(err, "creating temp file for delivery")
_, err = fmt.Fprint(mf0, msg0)
xcheckf(err, "writing deliver message to file")
err = accTest2.DeliverMessage(c.log, tx, &m0, mf0, false, false, false)
err = accTest2.DeliverMessage(c.log, tx, &m0, mf0, false, false, false, true)
xcheckf(err, "add message to account test2")
mf0name := mf0.Name()
@ -375,7 +375,7 @@ Accounts:
xcheckf(err, "creating temp file for delivery")
_, err = fmt.Fprint(mf1, msg1)
xcheckf(err, "writing deliver message to file")
err = accTest2.DeliverMessage(c.log, tx, &m1, mf1, false, false, false)
err = accTest2.DeliverMessage(c.log, tx, &m1, mf1, false, false, false, true)
xcheckf(err, "add message to account test2")
mf1name := mf1.Name()

View file

@ -116,7 +116,7 @@ func (c *Conn) xrespText() RespText {
var knownCodes = stringMap(
// Without parameters.
"ALERT", "PARSE", "READ-ONLY", "READ-WRITE", "TRYCREATE", "UIDNOTSTICKY", "UNAVAILABLE", "AUTHENTICATIONFAILED", "AUTHORIZATIONFAILED", "EXPIRED", "PRIVACYREQUIRED", "CONTACTADMIN", "NOPERM", "INUSE", "EXPUNGEISSUED", "CORRUPTION", "SERVERBUG", "CLIENTBUG", "CANNOT", "LIMIT", "OVERQUOTA", "ALREADYEXISTS", "NONEXISTENT", "NOTSAVED", "HASCHILDREN", "CLOSED", "UNKNOWN-CTE",
"ALERT", "PARSE", "READ-ONLY", "READ-WRITE", "TRYCREATE", "UIDNOTSTICKY", "UNAVAILABLE", "AUTHENTICATIONFAILED", "AUTHORIZATIONFAILED", "EXPIRED", "PRIVACYREQUIRED", "CONTACTADMIN", "NOPERM", "INUSE", "EXPUNGEISSUED", "CORRUPTION", "SERVERBUG", "CLIENTBUG", "CANNOT", "LIMIT", "OVERQUOTA", "ALREADYEXISTS", "NONEXISTENT", "NOTSAVED", "HASCHILDREN", "CLOSED", "UNKNOWN-CTE", "OVERQUOTA",
// With parameters.
"BADCHARSET", "CAPABILITY", "PERMANENTFLAGS", "UIDNEXT", "UIDVALIDITY", "UNSEEN", "APPENDUID", "COPYUID",
"HIGHESTMODSEQ", "MODIFIED",

View file

@ -78,4 +78,15 @@ func TestAppend(t *testing.T) {
},
}
tc2.xuntagged(imapclient.UntaggedFetch{Seq: 2, Attrs: []imapclient.FetchAttr{uid2, xbs}})
tclimit := startArgs(t, false, false, true, true, "limit")
defer tclimit.close()
tclimit.client.Login("limit@mox.example", "testtest")
tclimit.client.Select("inbox")
// First message of 1 byte is within limits.
tclimit.transactf("ok", "append inbox (\\Seen Label1 $label2) \" 1-Jan-2022 10:10:00 +0100\" {1+}\r\nx")
tclimit.xuntagged(imapclient.UntaggedExists(1))
// Second message would take account past limit.
tclimit.transactf("no", "append inbox (\\Seen Label1 $label2) \" 1-Jan-2022 10:10:00 +0100\" {1+}\r\nx")
tclimit.xcode("OVERQUOTA")
}

View file

@ -58,4 +58,15 @@ func TestCopy(t *testing.T) {
imapclient.UntaggedFetch{Seq: 3, Attrs: []imapclient.FetchAttr{imapclient.FetchUID(3), imapclient.FetchFlags(nil)}},
imapclient.UntaggedFetch{Seq: 4, Attrs: []imapclient.FetchAttr{imapclient.FetchUID(4), imapclient.FetchFlags(nil)}},
)
tclimit := startArgs(t, false, false, true, true, "limit")
defer tclimit.close()
tclimit.client.Login("limit@mox.example", "testtest")
tclimit.client.Select("inbox")
// First message of 1 byte is within limits.
tclimit.transactf("ok", "append inbox (\\Seen Label1 $label2) \" 1-Jan-2022 10:10:00 +0100\" {1+}\r\nx")
tclimit.xuntagged(imapclient.UntaggedExists(1))
// Second message would take account past limit.
tclimit.transactf("no", "copy 1:* Trash")
tclimit.xcode("OVERQUOTA")
}

View file

@ -243,6 +243,7 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) {
err := tx.Update(&mb)
xcheckf(err, "updating mailbox counts")
cmd.changes = append(cmd.changes, mb.ChangeCounts())
// No need to update account total message size.
}
})
@ -349,6 +350,7 @@ func (cmd *fetchCmd) process(atts []fetchAtt) {
m.ModSeq = cmd.xmodseq()
err := cmd.tx.Update(m)
xcheckf(err, "marking message as seen")
// No need to update account total message size.
cmd.changes = append(cmd.changes, m.ChangeFlags(origFlags))
}

View file

@ -2750,13 +2750,20 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) {
Size: mw.Size,
}
ok, maxSize, err := c.account.CanAddMessageSize(tx, m.Size)
xcheckf(err, "checking quota")
if !ok {
// ../rfc/9051:5155
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
}
mb.Add(m.MailboxCounts())
// Update mailbox before delivering, which updates uidnext which we mustn't overwrite.
err = tx.Update(&mb)
xcheckf(err, "updating mailbox counts")
err := c.account.DeliverMessage(c.log, tx, &m, msgFile, true, false, false)
err = c.account.DeliverMessage(c.log, tx, &m, msgFile, true, false, false, true)
xcheckf(err, "delivering message")
})
@ -2923,10 +2930,12 @@ func (c *conn) xexpunge(uidSet *numSet, missingMailboxOK bool) (remove []store.M
removeIDs := make([]int64, len(remove))
anyIDs := make([]any, len(remove))
var totalSize int64
for i, m := range remove {
removeIDs[i] = m.ID
anyIDs[i] = m.ID
mb.Sub(m.MailboxCounts())
totalSize += m.Size
// Update "remove", because RetrainMessage below will save the message.
remove[i].Expunged = true
remove[i].ModSeq = modseq
@ -2947,6 +2956,9 @@ func (c *conn) xexpunge(uidSet *numSet, missingMailboxOK bool) (remove []store.M
err = tx.Update(&mb)
xcheckf(err, "updating mailbox counts")
err = c.account.AddMessageSize(c.log, tx, -totalSize)
xcheckf(err, "updating disk usage")
// Mark expunged messages as not needing training, then retrain them, so if they
// were trained, they get untrained.
for i := range remove {
@ -3208,6 +3220,20 @@ func (c *conn) cmdxCopy(isUID bool, tag, cmd string, p *parser) {
xserverErrorf("uid and message mismatch")
}
// See if quota allows copy.
var totalSize int64
for _, m := range xmsgs {
totalSize += m.Size
}
if ok, maxSize, err := c.account.CanAddMessageSize(tx, totalSize); err != nil {
xcheckf(err, "checking quota")
} else if !ok {
// ../rfc/9051:5155
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
}
err = c.account.AddMessageSize(c.log, tx, totalSize)
xcheckf(err, "updating disk usage")
msgs := map[store.UID]store.Message{}
for _, m := range xmsgs {
msgs[m.UID] = m

View file

@ -327,14 +327,14 @@ func xparseNumSet(s string) imapclient.NumSet {
var connCounter int64
func start(t *testing.T) *testconn {
return startArgs(t, true, false, true)
return startArgs(t, true, false, true, true, "mjl")
}
func startNoSwitchboard(t *testing.T) *testconn {
return startArgs(t, false, false, true)
return startArgs(t, false, false, true, false, "mjl")
}
func startArgs(t *testing.T, first, isTLS, allowLoginWithoutTLS bool) *testconn {
func startArgs(t *testing.T, first, isTLS, allowLoginWithoutTLS, setPassword bool, accname string) *testconn {
limitersInit() // Reset rate limiters.
if first {
@ -343,9 +343,9 @@ func startArgs(t *testing.T, first, isTLS, allowLoginWithoutTLS bool) *testconn
mox.Context = ctxbg
mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf")
mox.MustLoadConfig(true, false)
acc, err := store.OpenAccount(pkglog, "mjl")
acc, err := store.OpenAccount(pkglog, accname)
tcheck(t, err, "open account")
if first {
if setPassword {
err = acc.SetPassword(pkglog, "testtest")
tcheck(t, err, "set password")
}

View file

@ -13,11 +13,11 @@ func TestStarttls(t *testing.T) {
tc.client.Login("mjl@mox.example", "testtest")
tc.close()
tc = startArgs(t, true, true, false)
tc = startArgs(t, true, true, false, true, "mjl")
tc.transactf("bad", "starttls") // TLS already active.
tc.close()
tc = startArgs(t, true, false, false)
tc = startArgs(t, true, false, false, true, "mjl")
tc.transactf("no", `login "mjl@mox.example" "testtest"`)
tc.xcode("PRIVACYREQUIRED")
tc.transactf("no", "authenticate PLAIN %s", base64.StdEncoding.EncodeToString([]byte("\u0000mjl@mox.example\u0000testtest")))

View file

@ -282,7 +282,8 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
const sync = false
const notrain = true
const nothreads = true
err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads)
const updateDiskUsage = false
err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
ctl.xcheck(err, "delivering message")
deliveredIDs = append(deliveredIDs, m.ID)
ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
@ -313,9 +314,20 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
conf, _ := a.Conf()
maxSize := a.QuotaMessageSize()
var addSize int64
du := store.DiskUsage{ID: 1}
err = tx.Get(&du)
ctl.xcheck(err, "get disk usage")
process := func(m *store.Message, msgf *os.File, origPath string) {
defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
addSize += m.Size
if maxSize > 0 && du.MessageSize+addSize > maxSize {
ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
}
for _, kw := range m.Keywords {
mailboxKeywords[kw] = true
}
@ -407,6 +419,9 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
ctl.xcheck(err, "updating message counts and keywords in mailbox")
changes = append(changes, mb.ChangeCounts())
err = a.AddMessageSize(ctl.log, tx, addSize)
xcheckf(err, "updating total message size")
err = tx.Commit()
ctl.xcheck(err, "commit")
tx = nil

View file

@ -3197,12 +3197,13 @@ func cmdEnsureParsed(c *cmd) {
func cmdRecalculateMailboxCounts(c *cmd) {
c.params = "account"
c.help = `Recalculate message counts for all mailboxes in the account.
c.help = `Recalculate message counts for all mailboxes in the account, and total message size for quota.
When a message is added to/removed from a mailbox, or when message flags change,
the total, unread, unseen and deleted messages are accounted, and the total size
of the mailbox. In case of a bug in this accounting, the numbers could become
incorrect. This command will find, fix and print them.
the total, unread, unseen and deleted messages are accounted, the total size of
the mailbox, and the total message size for the account. In case of a bug in
this accounting, the numbers could become incorrect. This command will find, fix
and print them.
`
args := c.Parse()
if len(args) != 1 {

View file

@ -976,7 +976,7 @@ func DestinationSave(ctx context.Context, account, destName string, newDest conf
}
// AccountLimitsSave saves new message sending limits for an account.
func AccountLimitsSave(ctx context.Context, account string, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay int) (rerr error) {
func AccountLimitsSave(ctx context.Context, account string, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay int, quotaMessageSize int64) (rerr error) {
log := pkglog.WithContext(ctx)
defer func() {
if rerr != nil {
@ -1002,6 +1002,7 @@ func AccountLimitsSave(ctx context.Context, account string, maxOutgoingMessagesP
}
acc.MaxOutgoingMessagesPerDay = maxOutgoingMessagesPerDay
acc.MaxFirstTimeRecipientsPerDay = maxFirstTimeRecipientsPerDay
acc.QuotaMessageSize = quotaMessageSize
nc.Accounts[account] = acc
if err := writeDynamic(ctx, log, nc); err != nil {

View file

@ -2789,7 +2789,11 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
if err := acc.DeliverMailbox(log, a.mailbox, &m, dataFile); err != nil {
log.Errorx("delivering", err)
metricDelivery.WithLabelValues("delivererror", a.reason).Inc()
if errors.Is(err, store.ErrOverQuota) {
addError(rcptAcc, smtp.C452StorageFull, smtp.SeMailbox2Full2, true, "account storage full")
} else {
addError(rcptAcc, smtp.C451LocalErr, smtp.SeSys3Other0, false, "error processing")
}
return
}
metricDelivery.WithLabelValues("delivered", a.reason).Inc()

View file

@ -1240,6 +1240,37 @@ func TestLimitOutgoing(t *testing.T) {
testSubmit("b@other.example", &smtpclient.Error{Code: smtp.C451LocalErr, Secode: smtp.SePol7DeliveryUnauth1}) // Would be 5th message.
}
// Test account size limit enforcement.
func TestQuota(t *testing.T) {
resolver := dns.MockResolver{
A: map[string][]string{
"other.example.": {"127.0.0.10"}, // For mx check.
},
PTR: map[string][]string{
"127.0.0.10": {"other.example."},
},
}
ts := newTestServer(t, filepath.FromSlash("../testdata/smtpserverquota/mox.conf"), resolver)
defer ts.close()
testDeliver := func(rcptTo string, expErr *smtpclient.Error) {
t.Helper()
ts.run(func(err error, client *smtpclient.Client) {
t.Helper()
mailFrom := "mjl@other.example"
if err == nil {
err = client.Deliver(ctxbg, mailFrom, rcptTo, int64(len(deliverMessage)), strings.NewReader(deliverMessage), false, false, false)
}
var cerr smtpclient.Error
if expErr == nil && err != nil || expErr != nil && (err == nil || !errors.As(err, &cerr) || cerr.Secode != expErr.Secode) {
t.Fatalf("got err %#v, expected %#v", err, expErr)
}
})
}
testDeliver("mjl@mox.example", &smtpclient.Error{Code: smtp.C452StorageFull, Secode: smtp.SeMailbox2Full2})
}
// Test with catchall destination address.
func TestCatchall(t *testing.T) {
resolver := dns.MockResolver{

View file

@ -71,6 +71,7 @@ var (
ErrUnknownMailbox = errors.New("no such mailbox")
ErrUnknownCredentials = errors.New("credentials not found")
ErrAccountUnknown = errors.New("no such account")
ErrOverQuota = errors.New("account over quota")
)
var DefaultInitialMailboxes = config.InitialMailboxes{
@ -684,8 +685,14 @@ type RecipientDomainTLS struct {
RequireTLS bool // Supports RequireTLS SMTP extension.
}
// DiskUsage tracks quota use.
type DiskUsage struct {
ID int64 // Always one record with ID 1.
MessageSize int64 // Sum of all messages, for quota accounting.
}
// Types stored in DB.
var DBTypes = []any{NextUIDValidity{}, Message{}, Recipient{}, Mailbox{}, Subscription{}, Outgoing{}, Password{}, Subjectpass{}, SyncState{}, Upgrade{}, RecipientDomainTLS{}}
var DBTypes = []any{NextUIDValidity{}, Message{}, Recipient{}, Mailbox{}, Subscription{}, Outgoing{}, Password{}, Subjectpass{}, SyncState{}, Upgrade{}, RecipientDomainTLS{}, DiskUsage{}}
// Account holds the information about a user, includings mailboxes, messages, imap subscriptions.
type Account struct {
@ -815,10 +822,10 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re
return acc, nil
}
// Ensure mailbox counts are set.
// Ensure mailbox counts and total message size are set.
var mentioned bool
err = db.Write(context.TODO(), func(tx *bstore.Tx) error {
return bstore.QueryTx[Mailbox](tx).FilterEqual("HaveCounts", false).ForEach(func(mb Mailbox) error {
err := bstore.QueryTx[Mailbox](tx).FilterEqual("HaveCounts", false).ForEach(func(mb Mailbox) error {
if !mentioned {
mentioned = true
log.Info("first calculation of mailbox counts for account", slog.String("account", accountName))
@ -831,6 +838,24 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re
mb.MailboxCounts = mc
return tx.Update(&mb)
})
if err != nil {
return err
}
du := DiskUsage{ID: 1}
err = tx.Get(&du)
if err == nil || !errors.Is(err, bstore.ErrAbsent) {
return err
}
// No DiskUsage record yet, calculate total size and insert.
err = bstore.QueryTx[Mailbox](tx).ForEach(func(mb Mailbox) error {
du.MessageSize += mb.Size
return nil
})
if err != nil {
return err
}
return tx.Insert(&du)
})
if err != nil {
return nil, fmt.Errorf("calculating counts for mailbox: %v", err)
@ -918,6 +943,9 @@ func initAccount(db *bstore.DB) error {
if err := tx.Insert(&Upgrade{ID: 1, Threads: 2}); err != nil {
return err
}
if err := tx.Insert(&DiskUsage{ID: 1}); err != nil {
return err
}
if len(mox.Conf.Static.DefaultMailboxes) > 0 {
// Deprecated in favor of InitialMailboxes.
@ -1024,6 +1052,7 @@ func (a *Account) Close() error {
// - Mismatch between message size and length of MsgPrefix and on-disk file.
// - Missing HaveCounts.
// - Incorrect mailbox counts.
// - Incorrect total message size.
// - Message with UID >= mailbox uid next.
// - Mailbox uidvalidity >= account uid validity.
// - ModSeq > 0, CreateSeq > 0, CreateSeq <= ModSeq.
@ -1114,7 +1143,9 @@ func (a *Account) CheckConsistency() error {
return fmt.Errorf("reading messages: %v", err)
}
var totalSize int64
for _, mb := range mailboxes {
totalSize += mb.Size
if !mb.HaveCounts {
errmsg := fmt.Sprintf("mailbox %q (id %d) does not have counts, should be %#v", mb.Name, mb.ID, counts[mb.ID])
errors = append(errors, errmsg)
@ -1124,6 +1155,15 @@ func (a *Account) CheckConsistency() error {
}
}
du := DiskUsage{ID: 1}
if err := tx.Get(&du); err != nil {
return fmt.Errorf("get diskusage")
}
if du.MessageSize != totalSize {
errmsg := fmt.Sprintf("total message size in database is %d, sum of mailbox message sizes is %d", du.MessageSize, totalSize)
errors = append(errors, errmsg)
}
return nil
})
if err != nil {
@ -1215,6 +1255,10 @@ func (a *Account) WithRLock(fn func()) {
// If sync is true, the message file and its directory are synced. Should be true
// for regular mail delivery, but can be false when importing many messages.
//
// If updateDiskUsage is true, the account total message size (for quota) is
// updated. Callers must check if a message can be added within quota before
// calling DeliverMessage.
//
// If CreateSeq/ModSeq is not set, it is assigned automatically.
//
// Must be called with account rlock or wlock.
@ -1222,7 +1266,7 @@ func (a *Account) WithRLock(fn func()) {
// Caller must broadcast new message.
//
// Caller must update mailbox counts.
func (a *Account) DeliverMessage(log mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, sync, notrain, nothreads bool) error {
func (a *Account) DeliverMessage(log mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, sync, notrain, nothreads, updateDiskUsage bool) error {
if m.Expunged {
return fmt.Errorf("cannot deliver expunged message")
}
@ -1237,6 +1281,17 @@ func (a *Account) DeliverMessage(log mlog.Log, tx *bstore.Tx, m *Message, msgFil
return fmt.Errorf("updating mailbox nextuid: %w", err)
}
if updateDiskUsage {
du := DiskUsage{ID: 1}
if err := tx.Get(&du); err != nil {
return fmt.Errorf("get disk usage: %v", err)
}
du.MessageSize += m.Size
if err := tx.Update(&du); err != nil {
return fmt.Errorf("update disk usage: %v", err)
}
}
conf, _ := a.Conf()
m.JunkFlagsForMailbox(mb, conf)
@ -1673,6 +1728,8 @@ func (a *Account) MessageReader(m Message) *MsgReader {
// DeliverDestination delivers an email to dest, based on the configured rulesets.
//
// Returns ErrOverQuota when account would be over quota after adding message.
//
// Caller must hold account wlock (mailbox may be created).
// Message delivery, possible mailbox creation, and updated mailbox counts are
// broadcasted.
@ -1691,12 +1748,20 @@ func (a *Account) DeliverDestination(log mlog.Log, dest config.Destination, m *M
// DeliverMailbox delivers an email to the specified mailbox.
//
// Returns ErrOverQuota when account would be over quota after adding message.
//
// Caller must hold account wlock (mailbox may be created).
// Message delivery, possible mailbox creation, and updated mailbox counts are
// broadcasted.
func (a *Account) DeliverMailbox(log mlog.Log, mailbox string, m *Message, msgFile *os.File) error {
var changes []Change
err := a.DB.Write(context.TODO(), func(tx *bstore.Tx) error {
if ok, _, err := a.CanAddMessageSize(tx, m.Size); err != nil {
return err
} else if !ok {
return ErrOverQuota
}
mb, chl, err := a.MailboxEnsure(tx, mailbox, true)
if err != nil {
return fmt.Errorf("ensuring mailbox: %w", err)
@ -1711,7 +1776,7 @@ func (a *Account) DeliverMailbox(log mlog.Log, mailbox string, m *Message, msgFi
return fmt.Errorf("updating mailbox for delivery: %w", err)
}
if err := a.DeliverMessage(log, tx, m, msgFile, true, false, false); err != nil {
if err := a.DeliverMessage(log, tx, m, msgFile, true, false, false, true); err != nil {
return err
}
@ -1828,13 +1893,18 @@ func (a *Account) rejectsRemoveMessages(ctx context.Context, log mlog.Log, tx *b
return nil, fmt.Errorf("expunging messages: %w", err)
}
var totalSize int64
for _, m := range expunged {
m.Expunged = false // Was set by update, but would cause wrong count.
mb.MailboxCounts.Sub(m.MailboxCounts())
totalSize += m.Size
}
if err := tx.Update(mb); err != nil {
return nil, fmt.Errorf("updating mailbox counts: %w", err)
}
if err := a.AddMessageSize(log, tx, -totalSize); err != nil {
return nil, fmt.Errorf("updating disk usage: %w", err)
}
// Mark as neutral and train so junk filter gets untrained with these (junk) messages.
for i := range expunged {
@ -1902,6 +1972,54 @@ func (a *Account) RejectsRemove(log mlog.Log, rejectsMailbox, messageID string)
return nil
}
// AddMessageSize adjusts the DiskUsage.MessageSize by size.
func (a *Account) AddMessageSize(log mlog.Log, tx *bstore.Tx, size int64) error {
du := DiskUsage{ID: 1}
if err := tx.Get(&du); err != nil {
return fmt.Errorf("get diskusage: %v", err)
}
du.MessageSize += size
if du.MessageSize < 0 {
log.Error("negative total message size", slog.Int64("delta", size), slog.Int64("newtotalsize", du.MessageSize))
}
if err := tx.Update(&du); err != nil {
return fmt.Errorf("update total message size: %v", err)
}
return nil
}
// QuotaMessageSize returns the effective maximum total message size for an
// account. Returns 0 if there is no maximum.
func (a *Account) QuotaMessageSize() int64 {
conf, _ := a.Conf()
size := conf.QuotaMessageSize
if size <= 0 {
size = mox.Conf.Static.QuotaMessageSize
}
if size < 0 {
size = 0
}
return size
}
// CanAddMessageSize checks if a message of size bytes can be added, depending on
// total message size and configured quota for account.
func (a *Account) CanAddMessageSize(tx *bstore.Tx, size int64) (ok bool, maxSize int64, err error) {
defer func() {
mlog.New("x", nil).Printx("canaddsize", err, slog.Int64("size", size), slog.Bool("ok", ok), slog.Int64("maxsize", maxSize))
}()
maxSize = a.QuotaMessageSize()
if maxSize <= 0 {
return true, 0, nil
}
du := DiskUsage{ID: 1}
if err := tx.Get(&du); err != nil {
return false, maxSize, fmt.Errorf("get diskusage: %v", err)
}
return du.MessageSize+size <= maxSize, maxSize, nil
}
// We keep a cache of recent successful authentications, so we don't have to bcrypt successful calls each time.
var authCache = struct {
sync.Mutex
@ -2421,11 +2539,16 @@ func (a *Account) MailboxDelete(ctx context.Context, log mlog.Log, tx *bstore.Tx
return nil, nil, false, fmt.Errorf("removing messages: %v", err)
}
var totalSize int64
for _, m := range remove {
if !m.Expunged {
removeMessageIDs = append(removeMessageIDs, m.ID)
totalSize += m.Size
}
}
if err := a.AddMessageSize(log, tx, -totalSize); err != nil {
return nil, nil, false, fmt.Errorf("updating disk usage: %v", err)
}
// Mark messages as not needing training. Then retrain them, so they are untrained if they were.
n := 0

View file

@ -81,7 +81,7 @@ func TestMailbox(t *testing.T) {
tcheck(t, err, "sent mailbox")
msent.MailboxID = mbsent.ID
msent.MailboxOrigID = mbsent.ID
err = acc.DeliverMessage(pkglog, tx, &msent, msgFile, true, false, false)
err = acc.DeliverMessage(pkglog, tx, &msent, msgFile, true, false, false, true)
tcheck(t, err, "deliver message")
if !msent.ThreadMuted || !msent.ThreadCollapsed {
t.Fatalf("thread muted & collapsed should have been copied from parent (duplicate message-id) m")
@ -97,7 +97,7 @@ func TestMailbox(t *testing.T) {
tcheck(t, err, "insert rejects mailbox")
mreject.MailboxID = mbrejects.ID
mreject.MailboxOrigID = mbrejects.ID
err = acc.DeliverMessage(pkglog, tx, &mreject, msgFile, true, false, false)
err = acc.DeliverMessage(pkglog, tx, &mreject, msgFile, true, false, false, true)
tcheck(t, err, "deliver message")
err = tx.Get(&mbrejects)

View file

@ -14,3 +14,8 @@ Accounts:
MaxPower: 0.1
TopWords: 10
IgnoreWords: 0.1
limit:
Domain: mox.example
Destinations:
limit@mox.example: nil
QuotaMessageSize: 1

7
testdata/smtpserverquota/domains.conf vendored Normal file
View file

@ -0,0 +1,7 @@
Domains:
mox.example: nil
Accounts:
mjl:
Domain: mox.example
Destinations:
mjl@mox.example: nil

10
testdata/smtpserverquota/mox.conf vendored Normal file
View file

@ -0,0 +1,10 @@
DataDir: data
User: 1000
LogLevel: trace
Hostname: mox.example
Postmaster:
Account: mjl
Mailbox: postmaster
Listeners:
local: nil
QuotaMessageSize: 1

View file

@ -265,6 +265,7 @@ possibly making them potentially no longer readable by the previous version.
checkf(err, dbpath, "reading mailboxes to check uidnext consistency")
mbCounts := map[int64]store.MailboxCounts{}
var totalSize int64
err = bstore.QueryDB[store.Message](ctxbg, db).ForEach(func(m store.Message) error {
mb := mailboxes[m.MailboxID]
if m.UID >= mb.UIDNext {
@ -282,6 +283,7 @@ possibly making them potentially no longer readable by the previous version.
if m.Expunged {
return nil
}
totalSize += m.Size
mp := store.MessagePath(m.ID)
seen[mp] = struct{}{}
@ -317,12 +319,28 @@ possibly making them potentially no longer readable by the previous version.
})
checkf(err, dbpath, "reading messages in account database to check files")
haveCounts := true
for _, mb := range mailboxes {
// We only check if database doesn't have zero values, i.e. not yet set.
if !mb.HaveCounts {
haveCounts = false
}
if mb.HaveCounts && mb.MailboxCounts != mbCounts[mb.ID] {
checkf(errors.New(`wrong mailbox counts, see "mox recalculatemailboxcounts"`), dbpath, "mailbox %q (id %d) has wrong counts %s, should be %s", mb.Name, mb.ID, mb.MailboxCounts, mbCounts[mb.ID])
}
}
if haveCounts {
du := store.DiskUsage{ID: 1}
err := db.Get(ctxbg, &du)
if err == nil {
if du.MessageSize != totalSize {
checkf(errors.New(`wrong total message size, see mox recalculatemailboxcounts"`), dbpath, "account has wrong total message size %d, should be %d", du.MessageSize, totalSize)
}
} else if err != nil && !errors.Is(err, bstore.ErrAbsent) {
checkf(err, dbpath, "get disk usage")
}
}
}
// Walk through all files in the msg directory. Warn about files that weren't in

View file

@ -371,6 +371,12 @@ func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.
mailboxes := map[string]store.Mailbox{}
messages := map[string]int{}
maxSize := acc.QuotaMessageSize()
du := store.DiskUsage{ID: 1}
err = tx.Get(&du)
ximportcheckf(err, "get disk usage")
var addSize int64
// For maildirs, we are likely to get a possible dovecot-keywords file after having
// imported the messages. Once we see the keywords, we use them. But before that
// time we remember which messages miss a keywords. Once the keywords become
@ -490,6 +496,11 @@ func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.
m.MailboxID = mb.ID
m.MailboxOrigID = mb.ID
addSize += m.Size
if maxSize > 0 && du.MessageSize+addSize > maxSize {
ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
}
if modseq == 0 {
var err error
modseq, err = acc.NextModSeq(tx)
@ -543,7 +554,8 @@ func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.
const sync = false
const notrain = true
const nothreads = true
if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads); err != nil {
const updateDiskUsage = false
if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
problemf("delivering message %s: %s (continuing)", pos, err)
return
}
@ -838,6 +850,9 @@ func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.
}
}
err = acc.AddMessageSize(log, tx, addSize)
ximportcheckf(err, "updating disk usage after import")
err = tx.Commit()
tx = nil
ximportcheckf(err, "commit")

View file

@ -1852,8 +1852,8 @@ func (Admin) SetPassword(ctx context.Context, accountName, password string) {
}
// SetAccountLimits set new limits on outgoing messages for an account.
func (Admin) SetAccountLimits(ctx context.Context, accountName string, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay int) {
err := mox.AccountLimitsSave(ctx, accountName, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay)
func (Admin) SetAccountLimits(ctx context.Context, accountName string, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay int, maxMsgSize int64) {
err := mox.AccountLimitsSave(ctx, accountName, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay, maxMsgSize)
xcheckf(ctx, err, "saving account limits")
}

View file

@ -491,9 +491,50 @@ const account = async (name) => {
const config = await api.Account(name)
let form, fieldset, email
let formSendlimits, fieldsetSendlimits, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay
let formLimits, fieldsetLimits, maxOutgoingMessagesPerDay, maxFirstTimeRecipientsPerDay, quotaMessageSize
let formPassword, fieldsetPassword, password, passwordHint
const xparseSize = (s) => {
const origs = s
s = s.toLowerCase()
let mult = 1
if (s.endsWith('k')) {
mult = 1024
} else if (s.endsWith('m')) {
mult = 1024*1024
} else if (s.endsWith('g')) {
mult = 1024*1024*1024
} else if (s.endsWith('t')) {
mult = 1024*1024*1024*1024
}
if (mult !== 1) {
s = s.substring(0, s.length-1)
}
let v = parseInt(s)
console.log('x', s, v, mult, formatQuotaSize(v*mult))
if (isNaN(v) || origs !== formatQuotaSize(v*mult)) {
throw new Error('invalid number')
}
return v*mult
}
const formatQuotaSize = (v) => {
if (v === 0) {
return '0'
}
const m = 1024*1024
const g = m*1024
const t = g*1024
if (Math.floor(v/t)*t === v) {
return ''+(v/t)+'t'
} else if (Math.floor(v/g)*g === v) {
return ''+(v/g)+'g'
} else if (Math.floor(v/m)*m === v) {
return ''+(v/m)+'m'
}
return ''+v
}
const page = document.getElementById('page')
dom._kids(page,
crumbs(
@ -595,38 +636,42 @@ const account = async (name) => {
),
),
dom.br(),
dom.h2('Send limits'),
formSendlimits=dom.form(
fieldsetSendlimits=dom.fieldset(
dom.h2('Limits'),
formLimits=dom.form(
fieldsetLimits=dom.fieldset(
dom.label(
style({display: 'inline-block'}),
style({display: 'block', marginBottom: '.5ex'}),
dom.span('Maximum outgoing messages per day', attr({title: 'Maximum number of outgoing messages for this account in a 24 hour window. This limits the damage to recipients and the reputation of this mail server in case of account compromise. Default 1000. MaxOutgoingMessagesPerDay in configuration file.'})),
dom.br(),
maxOutgoingMessagesPerDay=dom.input(attr({type: 'number', required: '', value: config.MaxOutgoingMessagesPerDay || 1000})),
),
' ',
dom.label(
style({display: 'inline-block'}),
style({display: 'block', marginBottom: '.5ex'}),
dom.span('Maximum first-time recipients per day', attr({title: 'Maximum number of first-time recipients in outgoing messages for this account in a 24 hour window. This limits the damage to recipients and the reputation of this mail server in case of account compromise. Default 200. MaxFirstTimeRecipientsPerDay in configuration file.'})),
dom.br(),
maxFirstTimeRecipientsPerDay=dom.input(attr({type: 'number', required: '', value: config.MaxFirstTimeRecipientsPerDay || 200})),
),
' ',
dom.label(
style({display: 'block', marginBottom: '.5ex'}),
dom.span('Disk usage quota: Maximum total message size ', attr({title: 'Default maximum total message size for the account, overriding any globally configured maximum size if non-zero. A negative value can be used to have no limit in case there is a limit by default. Attempting to add new messages beyond the maximum size will result in an error. Useful to prevent a single account from filling storage.'})),
dom.br(),
quotaMessageSize=dom.input(attr({value: formatQuotaSize(config.QuotaMessageSize)})),
),
dom.button('Save'),
),
async function submit(e) {
e.stopPropagation()
e.preventDefault()
fieldsetSendlimits.disabled = true
fieldsetLimits.disabled = true
try {
await api.SetAccountLimits(name, parseInt(maxOutgoingMessagesPerDay.value) || 0, parseInt(maxFirstTimeRecipientsPerDay.value) || 0)
window.alert('Send limits saved.')
await api.SetAccountLimits(name, parseInt(maxOutgoingMessagesPerDay.value) || 0, parseInt(maxFirstTimeRecipientsPerDay.value) || 0, xparseSize(quotaMessageSize.value))
window.alert('Limits saved.')
} catch (err) {
console.log({err})
window.alert('Error: ' + err.message)
return
} finally {
fieldsetSendlimits.disabled = false
fieldsetLimits.disabled = false
}
},
),

View file

@ -559,6 +559,12 @@
"Typewords": [
"int32"
]
},
{
"Name": "maxMsgSize",
"Typewords": [
"int64"
]
}
],
"Returns": []

View file

@ -650,12 +650,18 @@ func (w Webmail) MessageSubmit(ctx context.Context, m SubmitMessage) {
MsgPrefix: []byte(msgPrefix),
}
if ok, maxSize, err := acc.CanAddMessageSize(tx, sentm.Size); err != nil {
xcheckf(ctx, err, "checking quota")
} else if !ok {
xcheckuserf(ctx, fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
}
// Update mailbox before delivery, which changes uidnext.
sentmb.Add(sentm.MailboxCounts())
err = tx.Update(&sentmb)
xcheckf(ctx, err, "updating sent mailbox for counts")
err = acc.DeliverMessage(log, tx, &sentm, dataFile, true, false, false)
err = acc.DeliverMessage(log, tx, &sentm, dataFile, true, false, false, true)
if err != nil {
metricSubmission.WithLabelValues("storesenterror").Inc()
metricked = true
@ -825,8 +831,10 @@ func (Webmail) MessageDelete(ctx context.Context, messageIDs []int64) {
var mb store.Mailbox
remove := make([]store.Message, 0, len(messageIDs))
var totalSize int64
for _, mid := range messageIDs {
m := xmessageID(ctx, tx, mid)
totalSize += m.Size
if m.MailboxID != mb.ID {
if mb.ID != 0 {
@ -867,6 +875,9 @@ func (Webmail) MessageDelete(ctx context.Context, messageIDs []int64) {
changes = append(changes, mb.ChangeCounts())
}
err = acc.AddMessageSize(log, tx, -totalSize)
xcheckf(ctx, err, "updating disk usage")
// Mark removed messages as not needing training, then retrain them, so if they
// were trained, they get untrained.
for i := range remove {
@ -1171,10 +1182,12 @@ func (Webmail) MailboxEmpty(ctx context.Context, mailboxID int64) {
xcheckf(ctx, err, "removing message recipients")
// Adjust mailbox counts, gather UIDs for broadcasted change, prepare for untraining.
var totalSize int64
uids := make([]store.UID, len(expunged))
for i, m := range expunged {
m.Expunged = false // Gather returns updated values.
mb.Sub(m.MailboxCounts())
totalSize += m.Size
uids[i] = m.UID
expunged[i].Junk = false
@ -1184,6 +1197,9 @@ func (Webmail) MailboxEmpty(ctx context.Context, mailboxID int64) {
err = tx.Update(&mb)
xcheckf(ctx, err, "updating mailbox for counts")
err = acc.AddMessageSize(log, tx, -totalSize)
xcheckf(ctx, err, "updating disk usage")
err = acc.RetrainMessages(ctx, log, tx, expunged, true)
xcheckf(ctx, err, "retraining expunged messages")