mirror of
https://github.com/mjl-/mox.git
synced 2024-12-27 08:53:48 +03:00
3620d6f05e
increase() and rate() don't seem to assume a previous value of 0 when a vector gets a first value for a label. you would think that an increase() on a first-value mox_panic_total{"..."}=1 would return 1, and similar for rate(), but that doesn't appear to be the behaviour. so we just explicitly initialize the count to 0 for each possible label value. mox has more vector metrics, but panics feels like the most important, and it's too much code to initialize them all, for all combinations of label values. there is probably a better way that fixes this for all cases...
170 lines
4.1 KiB
Go
170 lines
4.1 KiB
Go
package webmail
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
mathrand "math/rand"
|
|
"net/http"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mjl-/mox/metrics"
|
|
"github.com/mjl-/mox/mlog"
|
|
)
|
|
|
|
type eventWriter struct {
|
|
out writeFlusher
|
|
waitMin, waitMax time.Duration
|
|
|
|
// If connection is closed, the goroutine doing delayed writes must abort.
|
|
sync.Mutex
|
|
closed bool
|
|
|
|
wrote bool // To be reset by user, set on write.
|
|
events chan struct {
|
|
name string // E.g. "start" for EventStart.
|
|
v any // Written as JSON.
|
|
when time.Time // For delaying.
|
|
} // Will only be set when waitMin or waitMax is > 0. Closed on connection shutdown.
|
|
errors chan error // If we have an events channel, we read errors and abort for them.
|
|
}
|
|
|
|
func newEventWriter(out writeFlusher, waitMin, waitMax time.Duration) *eventWriter {
|
|
return &eventWriter{out: out, waitMin: waitMin, waitMax: waitMax}
|
|
}
|
|
|
|
// close shuts down the events channel, causing the goroutine (if created) to
|
|
// stop.
|
|
func (ew *eventWriter) close() {
|
|
if ew.events != nil {
|
|
close(ew.events)
|
|
}
|
|
ew.Lock()
|
|
defer ew.Unlock()
|
|
ew.closed = true
|
|
}
|
|
|
|
// Write an event to the connection, e.g. "start" with value v, written as
|
|
// JSON. This directly writes the event, no more delay.
|
|
func (ew *eventWriter) write(name string, v any) error {
|
|
bw := bufio.NewWriter(ew.out)
|
|
if _, err := fmt.Fprintf(bw, "event: %s\ndata: ", name); err != nil {
|
|
return err
|
|
} else if err := json.NewEncoder(bw).Encode(v); err != nil {
|
|
return err
|
|
} else if _, err := fmt.Fprint(bw, "\n"); err != nil {
|
|
return err
|
|
} else if err := bw.Flush(); err != nil {
|
|
return err
|
|
}
|
|
return ew.out.Flush()
|
|
}
|
|
|
|
// For random wait between min and max delay.
|
|
var waitGen = mathrand.New(mathrand.NewSource(time.Now().UnixNano()))
|
|
|
|
// Schedule an event for writing to the connection. If events get a delay, this
|
|
// function still returns immediately.
|
|
func (ew *eventWriter) xsendEvent(ctx context.Context, log *mlog.Log, name string, v any) {
|
|
if (ew.waitMin > 0 || ew.waitMax > 0) && ew.events == nil {
|
|
// First write on a connection with delay.
|
|
ew.events = make(chan struct {
|
|
name string
|
|
v any
|
|
when time.Time
|
|
}, 100)
|
|
ew.errors = make(chan error)
|
|
go func() {
|
|
defer func() {
|
|
x := recover() // Should not happen, but don't take program down if it does.
|
|
if x != nil {
|
|
log.WithContext(ctx).Error("writeEvent panic", mlog.Field("err", x))
|
|
debug.PrintStack()
|
|
metrics.PanicInc(metrics.Webmailsendevent)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
ev, ok := <-ew.events
|
|
if !ok {
|
|
return
|
|
}
|
|
d := time.Until(ev.when)
|
|
if d > 0 {
|
|
time.Sleep(d)
|
|
}
|
|
ew.Lock()
|
|
if ew.closed {
|
|
ew.Unlock()
|
|
return
|
|
}
|
|
err := ew.write(ev.name, ev.v)
|
|
ew.Unlock()
|
|
if err != nil {
|
|
ew.errors <- err
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
// Check for previous write error before continuing.
|
|
if ew.errors != nil {
|
|
select {
|
|
case err := <-ew.errors:
|
|
panic(ioErr{err})
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
// If we have an events channel, we have a goroutine that write the events, delayed.
|
|
if ew.events != nil {
|
|
wait := ew.waitMin + time.Duration(waitGen.Intn(1000))*(ew.waitMax-ew.waitMin)/1000
|
|
when := time.Now().Add(wait)
|
|
ew.events <- struct {
|
|
name string
|
|
v any
|
|
when time.Time
|
|
}{name, v, when}
|
|
} else {
|
|
err := ew.write(name, v)
|
|
if err != nil {
|
|
panic(ioErr{err})
|
|
}
|
|
}
|
|
ew.wrote = true
|
|
}
|
|
|
|
// writeFlusher is a writer and flusher. We need to flush after writing an
|
|
// Event. Both to flush pending gzip data to the http response, and the http
|
|
// response to the client.
|
|
type writeFlusher interface {
|
|
io.Writer
|
|
Flush() error
|
|
}
|
|
|
|
// nopFlusher is a standin for writeFlusher if gzip is not used.
|
|
type nopFlusher struct {
|
|
io.Writer
|
|
}
|
|
|
|
func (f nopFlusher) Flush() error {
|
|
return nil
|
|
}
|
|
|
|
// httpFlusher wraps Flush for a writeFlusher with a call to an http.Flusher.
|
|
type httpFlusher struct {
|
|
writeFlusher
|
|
f http.Flusher
|
|
}
|
|
|
|
// Flush flushes the underlying writeFlusher, and calls Flush on the http.Flusher
|
|
// (which doesn't return an error).
|
|
func (f httpFlusher) Flush() error {
|
|
err := f.writeFlusher.Flush()
|
|
f.f.Flush()
|
|
return err
|
|
}
|