From cf69cd7b2755b9893ff774e9d3fc2e323fa1f7c4 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Sat, 15 Apr 2023 10:51:36 -0400 Subject: [PATCH] Add `success_duration` --- modules/caddyhttp/reverseproxy/admin.go | 2 + modules/caddyhttp/reverseproxy/caddyfile.go | 17 ++++++ .../caddyhttp/reverseproxy/healthchecks.go | 57 ++++++++++++++++++- modules/caddyhttp/reverseproxy/hosts.go | 16 ++++++ .../caddyhttp/reverseproxy/reverseproxy.go | 3 + 5 files changed, 93 insertions(+), 2 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/admin.go b/modules/caddyhttp/reverseproxy/admin.go index f64d1ecf..5bdd410e 100644 --- a/modules/caddyhttp/reverseproxy/admin.go +++ b/modules/caddyhttp/reverseproxy/admin.go @@ -37,6 +37,7 @@ type upstreamStatus struct { Address string `json:"address"` NumRequests int `json:"num_requests"` Fails int `json:"fails"` + Successes int `json:"successes"` } // CaddyModule returns the Caddy module information. @@ -99,6 +100,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er Address: address, NumRequests: upstream.NumRequests(), Fails: upstream.Fails(), + Successes: upstream.Successes(), }) return true }) diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index fc8eed60..4f27ca1e 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -77,6 +77,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // # passive health checking // fail_duration // max_fails +// success_duration // unhealthy_status // unhealthy_latency // unhealthy_request_count @@ -422,6 +423,22 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } h.HealthChecks.Passive.MaxFails = maxFails + case "success_duration": + if !d.NextArg() { + return d.ArgErr() + } + if h.HealthChecks == nil { + h.HealthChecks = new(HealthChecks) + } + if h.HealthChecks.Passive == nil { + h.HealthChecks.Passive = new(PassiveHealthChecks) + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad duration value '%s': %v", d.Val(), err) + } + h.HealthChecks.Passive.SuccessDuration = caddy.Duration(dur) + case "fail_duration": if !d.NextArg() { return d.ArgErr() diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index cfc7bdff..1362afa8 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -110,8 +110,8 @@ type ActiveHealthChecks struct { // health checks (that is, health checks which occur during // the normal flow of request proxying). type PassiveHealthChecks struct { - // How long to remember a failed request to a backend. A duration > 0 - // enables passive health checking. Default is 0. + // How long to remember a failed request to a backend. + // A duration > 0 enables passive health checking. Default is 0. FailDuration caddy.Duration `json:"fail_duration,omitempty"` // The number of failed requests within the FailDuration window to @@ -119,6 +119,9 @@ type PassiveHealthChecks struct { // that FailDuration be > 0. MaxFails int `json:"max_fails,omitempty"` + // How long to remember a successful request to a backend. Default is 0. + SuccessDuration caddy.Duration `json:"success_duration,omitempty"` + // Limits the number of simultaneous requests to a backend by // marking the backend as "down" if it has this many concurrent // requests or more. @@ -362,6 +365,56 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre return nil } +// countSuccess is used with passive health checks. It +// remembers 1 success for upstream for the configured +// duration. If passive health checks are disabled or +// success expiry is 0, this is a no-op. +func (h *Handler) countSuccess(upstream *Upstream) { + // only count successes if passive health checking is enabled + // and if successes are configured have a non-zero expiry + if h.HealthChecks == nil || h.HealthChecks.Passive == nil { + return + } + successDuration := time.Duration(h.HealthChecks.Passive.SuccessDuration) + if successDuration == 0 { + return + } + + // count success immediately + err := upstream.Host.countSuccess(1) + if err != nil { + h.HealthChecks.Passive.logger.Error("could not count success", + zap.String("host", upstream.Dial), + zap.Error(err)) + return + } + + // forget it later + go func(host *Host, successDuration time.Duration) { + defer func() { + if err := recover(); err != nil { + h.HealthChecks.Passive.logger.Error("passive health check success forgetter panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) + } + }() + timer := time.NewTimer(successDuration) + select { + case <-h.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + } + err := host.countSuccess(-1) + if err != nil { + h.HealthChecks.Passive.logger.Error("could not forget success", + zap.String("host", upstream.Dial), + zap.Error(err)) + } + }(upstream.Host, successDuration) +} + // countFailure is used with passive health checks. It // remembers 1 failure for upstream for the configured // duration. If passive health checks are disabled or diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 298d4f32..6815ea62 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -136,6 +136,7 @@ func (u *Upstream) fillHost() { // Its fields are accessed atomically and Host values must not be copied. type Host struct { numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) + successes int64 fails int64 } @@ -144,6 +145,11 @@ func (h *Host) NumRequests() int { return int(atomic.LoadInt64(&h.numRequests)) } +// Successes returns the number of recent successes with the upstream. +func (h *Host) Successes() int { + return int(atomic.LoadInt64(&h.successes)) +} + // Fails returns the number of recent failures with the upstream. func (h *Host) Fails() int { return int(atomic.LoadInt64(&h.fails)) @@ -159,6 +165,16 @@ func (h *Host) countRequest(delta int) error { return nil } +// countSuccess mutates the recent successes count by +// delta. It returns an error if the adjustment fails. +func (h *Host) countSuccess(delta int) error { + result := atomic.AddInt64(&h.successes, int64(delta)) + if result < 0 { + return fmt.Errorf("count below 0: %d", result) + } + return nil +} + // countFail mutates the recent failures count by // delta. It returns an error if the adjustment fails. func (h *Host) countFail(delta int) error { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 367b8a27..ad0bcfcf 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -562,6 +562,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port) repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests()) repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests) + repl.Set("http.reverse_proxy.upstream.successes", upstream.Host.Successes()) repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails()) // mutate request headers according to this upstream; @@ -580,6 +581,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h if proxyErr == nil || errors.Is(proxyErr, context.Canceled) { // context.Canceled happens when the downstream client // cancels the request, which is not our failure + h.countSuccess(upstream) return true, nil } @@ -588,6 +590,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h // occur after the roundtrip if, for example, a response handler // after the roundtrip returns an error) if succ, ok := proxyErr.(roundtripSucceeded); ok { + h.countSuccess(upstream) return true, succ.error }