package store

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"runtime"
	"slices"
	"sort"
	"time"

	"github.com/mjl-/bstore"

	"github.com/mjl-/mox/message"
	"github.com/mjl-/mox/mlog"
	"github.com/mjl-/mox/moxio"
)

// Assign a new/incoming message to a thread. Message does not yet have an ID. If
// this isn't a response, ThreadID should remain 0 (unless this is a message with
// existing message-id) and the caller must set ThreadID to ID.
// If the account is still busy upgrading messages with threadids in the background, parents
// may have a threadid 0. That results in this message getting threadid 0, which
// will handled by the background upgrade process assigning a threadid when it gets
// to this message.
func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
	if m.MessageID != "" {
		// Match against existing different message with same Message-ID.
		q := bstore.QueryTx[Message](tx)
		q.FilterNonzero(Message{MessageID: m.MessageID})
		q.FilterEqual("Expunged", false)
		q.FilterNotEqual("ID", m.ID)
		q.FilterNotEqual("ThreadID", int64(0))
		q.SortAsc("ID")
		q.Limit(1)
		em, err := q.Get()
		if err != nil && err != bstore.ErrAbsent {
			return fmt.Errorf("looking up existing message with message-id: %v", err)
		} else if err == nil {
			assignParent(m, em, true)
			return nil
		}
	}

	h, err := part.Header()
	if err != nil {
		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
	}
	messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
	if err != nil {
		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
	}
	for i := len(messageIDs) - 1; i >= 0; i-- {
		messageID := messageIDs[i]
		if messageID == m.MessageID {
			continue
		}
		tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
		if err != nil {
			return fmt.Errorf("looking up thread message for new message: %v", err)
		} else if tm != nil {
			assignParent(m, *tm, true)
			return nil
		}
		m.ThreadMissingLink = true
	}
	if len(messageIDs) > 0 {
		return nil
	}

	var isResp bool
	if part != nil && part.Envelope != nil {
		m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
	}
	if !isResp || m.SubjectBase == "" {
		return nil
	}
	m.ThreadMissingLink = true
	tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
	if err != nil {
		return fmt.Errorf("looking up thread message by subject: %v", err)
	} else if tm != nil {
		assignParent(m, *tm, true)
	}
	return nil
}

// assignParent assigns threading fields to m that make it a child of parent message pm.
// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
func assignParent(m *Message, pm Message, updateSeen bool) {
	if pm.ThreadID == 0 {
		panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
	}
	if m.ID == pm.ID {
		panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
	}
	m.ThreadID = pm.ThreadID
	// Make sure we don't add cycles.
	if !slices.Contains(pm.ThreadParentIDs, m.ID) {
		m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
	} else if pm.ID != m.ID {
		m.ThreadParentIDs = []int64{pm.ID}
	} else {
		m.ThreadParentIDs = nil
	}
	if m.MessageID != "" && m.MessageID == pm.MessageID {
		m.ThreadMissingLink = true
	}
	m.ThreadMuted = pm.ThreadMuted
	m.ThreadCollapsed = pm.ThreadCollapsed
	if updateSeen && m.ThreadMuted {
		m.Seen = true
	}
}

// ResetThreading resets the MessageID and SubjectBase fields for all messages in
// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
// are made in transactions of batchSize changes. The total number of updated
// messages is returned.
//
// ModSeq is not changed. Calles should bump the uid validity of the mailboxes
// to propagate the changes to IMAP clients.
func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize int, clearIDs bool) (int, error) {
	// todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?

	var lastID int64
	total := 0
	for {
		n := 0

		prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
			for {
				w, ok := <-in
				if !ok {
					return
				}

				m := w.In

				// We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
				// struct so we don't generate so much garbage for the garbage collector to sift
				// through.
				var part struct {
					Envelope *message.Envelope
				}
				if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
					log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, slog.Int64("msgid", m.ID))
				} else {
					m.MessageID = ""
					if part.Envelope != nil && part.Envelope.MessageID != "" {
						s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
						if err != nil {
							log.Debugx("parsing message-id, skipping", err, slog.Int64("msgid", m.ID), slog.String("messageid", part.Envelope.MessageID))
						}
						m.MessageID = s
					}
					if part.Envelope != nil {
						m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
					}
				}
				w.Out = m

				out <- w
			}
		}

		err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
			processMessage := func(in, m Message) error {
				if clearIDs {
					m.ThreadID = 0
					m.ThreadParentIDs = nil
					m.ThreadMissingLink = false
				}
				return tx.Update(&m)
			}

			// JSON parsing is relatively heavy, we benefit from multiple goroutines.
			procs := runtime.GOMAXPROCS(0)
			wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)

			q := bstore.QueryTx[Message](tx)
			q.FilterEqual("Expunged", false)
			q.FilterGreater("ID", lastID)
			q.SortAsc("ID")
			err := q.ForEach(func(m Message) error {
				// We process in batches so we don't block other operations for a long time.
				if n >= batchSize {
					return bstore.StopForEach
				}
				// Update starting point for next batch.
				lastID = m.ID

				n++
				return wq.Add(m)
			})
			if err == nil {
				err = wq.Finish()
			}
			wq.Stop()
			return err
		})
		if err != nil {
			return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
		}
		total += n
		if n == 0 {
			break
		}
	}
	return total, nil
}

// AssignThreads assigns thread-related fields to messages with ID >=
// startMessageID. Changes are committed each batchSize changes if txOpt is nil
// (i.e. during automatic account upgrade, we don't want to block database access
// for a long time). If txOpt is not nil, all changes are made in that
// transaction.
//
// When resetting thread assignments, the caller must first clear the existing
// thread fields.
//
// Messages are processed in order of ID, so when added to the account, not
// necessarily by received/date. Most threaded messages can immediately be matched
// to their parent message. If not, we keep track of the missing message-id and
// resolve as soon as we encounter it. At the end, we resolve all remaining
// messages, they start with a cycle.
//
// Does not set Seen flag for muted threads.
//
// Progress is written to progressWriter, every 100k messages.
func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
	// We use a more basic version of the thread-matching algorithm describe in:
	// ../rfc/5256:443
	// The algorithm assumes you'll select messages, then group into threads. We normally do
	// thread-calculation when messages are delivered. Here, we assign threads as soon
	// as we can, but will queue messages that reference known ancestors and resolve as
	// soon as we process them. We can handle large number of messages, but not very
	// quickly because we make lots of database queries.

	type childMsg struct {
		ID                int64  // This message will be fetched and updated with the threading fields once the parent is resolved.
		MessageID         string // Of child message. Once child is resolved, its own children can be resolved too.
		ThreadMissingLink bool
	}
	// Messages that have a References/In-Reply-To that we want to set as parent, but
	// where the parent doesn't have a ThreadID yet are added to pending. The key is
	// the normalized MessageID of the parent, and the value is a list of messages that
	// can get resolved once the parent gets its ThreadID. The kids will get the same
	// ThreadIDs, and they themselves may be parents to kids, and so on.
	// For duplicate messages (messages with identical Message-ID), the second
	// Message-ID to be added to pending is added under its own message-id, so it gets
	// its original as parent.
	pending := map[string][]childMsg{}

	// Current tx. If not equal to txOpt, we clean it up before we leave.
	var tx *bstore.Tx
	defer func() {
		if tx != nil && tx != txOpt {
			err := tx.Rollback()
			log.Check(err, "rolling back transaction")
		}
	}()

	// Set thread-related fields for a single message. Caller must save the message,
	// only if not an error and not added to the pending list.
	assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
		if m.MessageID != "" {
			// Attempt to match against existing different message with same Message-ID that
			// already has a threadid.
			// If there are multiple messages for a message-id a future call to assign may use
			// its threadid, or it may end up in pending and we resolve it when we need to.
			q := bstore.QueryTx[Message](tx)
			q.FilterNonzero(Message{MessageID: m.MessageID})
			q.FilterEqual("Expunged", false)
			q.FilterLess("ID", m.ID)
			q.SortAsc("ID")
			q.Limit(1)
			em, err := q.Get()
			if err != nil && err != bstore.ErrAbsent {
				return false, fmt.Errorf("looking up existing message with message-id: %v", err)
			} else if err == nil {
				if em.ThreadID == 0 {
					pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
					return true, nil
				} else {
					assignParent(m, em, false)
					return false, nil
				}
			}
		}

		refids, err := message.ReferencedIDs(references, inReplyTo)
		if err != nil {
			log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
		}

		for i := len(refids) - 1; i >= 0; i-- {
			messageID := refids[i]
			if messageID == m.MessageID {
				continue
			}
			tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
			if err != nil {
				return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
			} else if tm != nil {
				assignParent(m, *tm, false)
				return false, nil
			} else if exists {
				pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
				return true, nil
			}
		}

		var subjectBase string
		var isResp bool
		if subject != "" {
			subjectBase, isResp = message.ThreadSubject(subject, false)
		}
		if len(refids) > 0 || !isResp || subjectBase == "" {
			m.ThreadID = m.ID
			m.ThreadMissingLink = len(refids) > 0
			return false, nil
		}

		// No references to use. If this is a reply/forward (based on subject), we'll match
		// against base subject, at most 4 weeks back so we don't match against ancient
		// messages and 1 day ahead so we can match against delayed deliveries.
		tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
		if err != nil {
			return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
		} else if tm != nil {
			m.ThreadID = tm.ThreadID
			m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
			m.ThreadMissingLink = true
			m.ThreadMuted = tm.ThreadMuted
			m.ThreadCollapsed = tm.ThreadCollapsed
		} else {
			m.ThreadID = m.ID
		}
		return false, nil
	}

	npendingResolved := 0

	// Resolve pending messages that wait on m.MessageID to be resolved, recursively.
	var resolvePending func(tm Message, cyclic bool) error
	resolvePending = func(tm Message, cyclic bool) error {
		if tm.MessageID == "" {
			return nil
		}
		l := pending[tm.MessageID]
		delete(pending, tm.MessageID)
		for _, mi := range l {
			m := Message{ID: mi.ID}
			if err := tx.Get(&m); err != nil {
				return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
			}
			if m.ThreadID != 0 {
				// ThreadID already set because this is a cyclic message. If we would assign a
				// parent again, we would create a cycle.
				if m.MessageID != tm.MessageID && !cyclic {
					panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
				}
				continue
			}
			assignParent(&m, tm, false)
			m.ThreadMissingLink = mi.ThreadMissingLink
			if err := tx.Update(&m); err != nil {
				return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
			}
			if err := resolvePending(m, cyclic); err != nil {
				return err
			}
			npendingResolved++
		}
		return nil
	}

	// Output of the worker goroutines.
	type threadPrep struct {
		references []string
		inReplyTo  []string
		subject    string
	}

	// Single allocation.
	threadingFields := [][]byte{
		[]byte("references"),
		[]byte("in-reply-to"),
		[]byte("subject"),
	}

	// Worker goroutine function. We start with a reasonably large buffer for reading
	// the header into. And we have scratch space to copy the needed headers into. That
	// means we normally won't allocate any more buffers.
	prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
		headerbuf := make([]byte, 8*1024)
		scratch := make([]byte, 4*1024)
		for {
			w, ok := <-in
			if !ok {
				return
			}

			m := w.In
			var partialPart struct {
				HeaderOffset int64
				BodyOffset   int64
			}
			if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
				w.Err = fmt.Errorf("unmarshal part: %v", err)
			} else {
				size := partialPart.BodyOffset - partialPart.HeaderOffset
				if int(size) > len(headerbuf) {
					headerbuf = make([]byte, size)
				}
				if size > 0 {
					buf := headerbuf[:int(size)]
					err := func() error {
						mr := a.MessageReader(m)
						defer mr.Close()

						// ReadAt returns whole buffer or error. Single read should be fast.
						n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
						if err != nil || n != len(buf) {
							return fmt.Errorf("read header: %v", err)
						}
						return nil
					}()
					if err != nil {
						w.Err = err
					} else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
						w.Err = err
					} else {
						w.Out.references = h["References"]
						w.Out.inReplyTo = h["In-Reply-To"]
						l := h["Subject"]
						if len(l) > 0 {
							w.Out.subject = l[0]
						}
					}
				}
			}

			out <- w
		}
	}

	// Assign threads to messages, possibly in batches.
	nassigned := 0
	for {
		n := 0
		tx = txOpt
		if tx == nil {
			var err error
			tx, err = a.DB.Begin(ctx, true)
			if err != nil {
				return fmt.Errorf("begin transaction: %w", err)
			}
		}

		processMessage := func(m Message, prep threadPrep) error {
			pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
			if err != nil {
				return fmt.Errorf("for msgid %d: %w", m.ID, err)
			} else if pend {
				return nil
			}
			if m.ThreadID == 0 {
				panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
			}
			// Fields have been set, store in database and resolve messages waiting for this MessageID.
			if slices.Contains(m.ThreadParentIDs, m.ID) {
				panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
			}
			if err := tx.Update(&m); err != nil {
				return err
			}
			if err := resolvePending(m, false); err != nil {
				return fmt.Errorf("resolving pending message-id: %v", err)
			}
			return nil
		}

		// Use multiple worker goroutines to read parse headers from on-disk messages.
		procs := runtime.GOMAXPROCS(0)
		wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)

		// We assign threads in order by ID, so messages delivered in between our
		// transaction will get assigned threads too: they'll have the highest id's.
		q := bstore.QueryTx[Message](tx)
		q.FilterGreaterEqual("ID", startMessageID)
		q.FilterEqual("Expunged", false)
		q.SortAsc("ID")
		err := q.ForEach(func(m Message) error {
			// Batch number of changes, so we give other users of account a change to run.
			if txOpt == nil && n >= batchSize {
				return bstore.StopForEach
			}
			// Starting point for next batch.
			startMessageID = m.ID + 1
			// Don't process again. Can happen when earlier upgrade was aborted.
			if m.ThreadID != 0 {
				return nil
			}

			n++
			return wq.Add(m)
		})
		if err == nil {
			err = wq.Finish()
		}
		wq.Stop()

		if err == nil && txOpt == nil {
			err = tx.Commit()
			tx = nil
		}
		if err != nil {
			return fmt.Errorf("assigning threads: %w", err)
		}
		if n == 0 {
			break
		}
		nassigned += n
		if nassigned%100000 == 0 {
			log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
			if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
				return fmt.Errorf("writing progress: %v", err)
			}
		}
	}
	if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
		return fmt.Errorf("writing progress: %v", err)
	}

	log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))

	if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
		return fmt.Errorf("writing progress: %v", err)
	}

	// Remaining messages in pending have cycles and possibly tails. The cycle is at
	// the head of the thread. Once we resolve that, the rest of the thread can be
	// resolved too. Ignoring self-references (duplicate messages), there can only be
	// one cycle, and it is at the head. So we look for cycles, ignoring
	// self-references, and resolve a message as soon as we see the cycle.

	parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
	pendlist := []string{}
	for pmsgid, l := range pending {
		pendlist = append(pendlist, pmsgid)
		for _, k := range l {
			if k.MessageID == pmsgid {
				// No self-references for duplicate messages.
				continue
			}
			if _, ok := parent[k.MessageID]; !ok {
				parent[k.MessageID] = pmsgid
			}
			// else, this message should be resolved by following pending.
		}
	}
	sort.Strings(pendlist)

	tx = txOpt
	if tx == nil {
		var err error
		tx, err = a.DB.Begin(ctx, true)
		if err != nil {
			return fmt.Errorf("begin transaction: %w", err)
		}
	}

	// We walk through all messages of pendlist, but some will already have been
	// resolved by the time we get to them.
	done := map[string]bool{}
	for _, msgid := range pendlist {
		if done[msgid] {
			continue
		}

		// We walk up to parent, until we see a message-id we've already seen, a cycle.
		seen := map[string]bool{}
		for {
			pmsgid, ok := parent[msgid]
			if !ok {
				panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
			}
			if !seen[pmsgid] {
				seen[pmsgid] = true
				msgid = pmsgid
				continue
			}

			// Cycle detected. Make this message-id the thread root.
			q := bstore.QueryTx[Message](tx)
			q.FilterNonzero(Message{MessageID: msgid})
			q.FilterEqual("ThreadID", int64(0))
			q.FilterEqual("Expunged", false)
			q.SortAsc("ID")
			l, err := q.List()
			if err == nil && len(l) == 0 {
				err = errors.New("no messages")
			}
			if err != nil {
				return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
			}
			for i, m := range l {
				m.ThreadID = l[0].ID
				m.ThreadMissingLink = true
				if i == 0 {
					m.ThreadParentIDs = nil
					l[0] = m // For resolvePending below.
				} else {
					assignParent(&m, l[0], false)
				}
				if slices.Contains(m.ThreadParentIDs, m.ID) {
					panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
				}
				if err := tx.Update(&m); err != nil {
					return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
				}
			}

			// Mark all children as done so we don't process these messages again.
			walk := map[string]struct{}{msgid: {}}
			for len(walk) > 0 {
				for msgid := range walk {
					delete(walk, msgid)
					if done[msgid] {
						continue
					}
					done[msgid] = true
					for _, mi := range pending[msgid] {
						if !done[mi.MessageID] {
							walk[mi.MessageID] = struct{}{}
						}
					}
				}
			}

			// Resolve all messages in this thread.
			if err := resolvePending(l[0], true); err != nil {
				return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
			}

			break
		}
	}

	// Check that there are no more messages without threadid.
	q := bstore.QueryTx[Message](tx)
	q.FilterEqual("ThreadID", int64(0))
	q.FilterEqual("Expunged", false)
	l, err := q.List()
	if err == nil && len(l) > 0 {
		err = errors.New("found messages without threadid")
	}
	if err != nil {
		return fmt.Errorf("listing messages without threadid: %v", err)
	}

	if txOpt == nil {
		err := tx.Commit()
		tx = nil
		if err != nil {
			return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
		}
	}
	return nil
}

// lookupThreadMessage tries to find the parent message with messageID that must
// have a matching subjectBase.
//
// If the message isn't present (with a valid thread id), a nil message and nil
// error is returned. The bool return value indicates if a message with the
// message-id exists at all.
func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string) (*Message, bool, error) {
	q := bstore.QueryTx[Message](tx)
	q.FilterNonzero(Message{MessageID: messageID})
	q.FilterEqual("SubjectBase", subjectBase)
	q.FilterEqual("Expunged", false)
	q.FilterNotEqual("ID", mID)
	q.SortAsc("ID")
	l, err := q.List()
	if err != nil {
		return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
	}
	exists := len(l) > 0
	for _, tm := range l {
		if tm.ThreadID != 0 {
			return &tm, true, nil
		}
	}
	return nil, exists, nil
}

// lookupThreadMessageSubject looks up a parent/ancestor message for the message
// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
//
// If no message (with a threadid) is found a nil message and nil error is returned.
func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
	q := bstore.QueryTx[Message](tx)
	q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
	q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
	q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
	q.FilterEqual("Expunged", false)
	q.FilterNotEqual("ID", m.ID)
	q.FilterNotEqual("ThreadID", int64(0))
	q.SortDesc("Received")
	q.Limit(1)
	tm, err := q.Get()
	if err == bstore.ErrAbsent {
		return nil, nil
	} else if err != nil {
		return nil, err
	}
	return &tm, nil
}

func upgradeThreads(ctx context.Context, log mlog.Log, acc *Account, up *Upgrade) error {
	log = log.With(slog.String("account", acc.Name))

	if up.Threads == 0 {
		// Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
		// message and the base subject for thread matching. This allows efficient thread
		// lookup in the second step.

		log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
		t0 := time.Now()

		const batchSize = 10000
		total, err := acc.ResetThreading(ctx, log, batchSize, true)
		if err != nil {
			return fmt.Errorf("resetting message threading fields: %v", err)
		}

		up.Threads = 1
		if err := acc.DB.Update(ctx, up); err != nil {
			up.Threads = 0
			return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
		}
		log.Info("upgrading account for threading, step 1/2: completed", slog.Duration("duration", time.Since(t0)), slog.Int("messages", total))
	}

	if up.Threads == 1 {
		// Step 2 of the upgrade is going through all messages and assigning threadid's.
		// Lookup of messageid and base subject is now fast through indexed database
		// access.

		log.Info("upgrading account for threading, step 2/2: matching messages to threads")
		t0 := time.Now()

		const batchSize = 10000
		if err := acc.AssignThreads(ctx, log, nil, 1, batchSize, io.Discard); err != nil {
			return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
		}
		up.Threads = 2
		if err := acc.DB.Update(ctx, up); err != nil {
			up.Threads = 1
			return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
		}
		log.Info("upgrading account for threading, step 2/2: completed", slog.Duration("duration", time.Since(t0)))
	}

	// Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
	// use threadid's before, so there is nothing to be out of date.

	return nil
}