reverse_proxy: Health checks: Don't cross the streams

Fixes https://caddy.community/t/v2-health-checks-are-going-to-the-wrong-upstream/7084?u=matt

... I think
This commit is contained in:
Matthew Holt 2020-02-23 14:30:52 -07:00
parent e3591009dc
commit 7cca291d62
2 changed files with 47 additions and 49 deletions

View file

@ -124,26 +124,25 @@ type CircuitBreaker interface {
// h.HealthChecks.Active.stopChan is closed. // h.HealthChecks.Active.stopChan is closed.
func (h *Handler) activeHealthChecker() { func (h *Handler) activeHealthChecker() {
ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval))
h.doActiveHealthChecksForAllHosts() h.doActiveHealthCheckForAllHosts()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
h.doActiveHealthChecksForAllHosts() h.doActiveHealthCheckForAllHosts()
case <-h.HealthChecks.Active.stopChan: case <-h.HealthChecks.Active.stopChan:
// TODO: consider using a Context for cancellation instead
ticker.Stop() ticker.Stop()
return return
} }
} }
} }
// doActiveHealthChecksForAllHosts immediately performs a // doActiveHealthCheckForAllHosts immediately performs a
// health checks for all hosts in the global repository. // health checks for all upstream hosts configured by h.
func (h *Handler) doActiveHealthChecksForAllHosts() { func (h *Handler) doActiveHealthCheckForAllHosts() {
hosts.Range(func(key, value interface{}) bool { for _, upstream := range h.Upstreams {
networkAddr := key.(string) go func(upstream *Upstream) {
host := value.(Host) networkAddr := upstream.Dial
go func(networkAddr string, host Host) {
addr, err := caddy.ParseNetworkAddress(networkAddr) addr, err := caddy.ParseNetworkAddress(networkAddr)
if err != nil { if err != nil {
h.HealthChecks.Active.logger.Error("bad network address", h.HealthChecks.Active.logger.Error("bad network address",
@ -165,18 +164,15 @@ func (h *Handler) doActiveHealthChecksForAllHosts() {
// so use a fake Host value instead; unix sockets are usually local // so use a fake Host value instead; unix sockets are usually local
hostAddr = "localhost" hostAddr = "localhost"
} }
err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: hostAddr}, hostAddr, host) err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: hostAddr}, hostAddr, upstream.Host)
if err != nil { if err != nil {
h.HealthChecks.Active.logger.Error("active health check failed", h.HealthChecks.Active.logger.Error("active health check failed",
zap.String("address", networkAddr), zap.String("address", networkAddr),
zap.Error(err), zap.Error(err),
) )
} }
}(networkAddr, host) }(upstream)
}
// continue to iterate all hosts
return true
})
} }
// doActiveHealthCheck performs a health check to host which // doActiveHealthCheck performs a health check to host which
@ -209,7 +205,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host H
u.Host = net.JoinHostPort(host, portStr) u.Host = net.JoinHostPort(host, portStr)
} }
// attach dialing information to this request // attach dialing information to this request - TODO: use caddy.Context's context
// so it can be canceled on config reload
ctx := context.Background() ctx := context.Background()
ctx = context.WithValue(ctx, caddy.ReplacerCtxKey, caddy.NewReplacer()) ctx = context.WithValue(ctx, caddy.ReplacerCtxKey, caddy.NewReplacer())
ctx = context.WithValue(ctx, caddyhttp.VarsCtxKey, map[string]interface{}{ ctx = context.WithValue(ctx, caddyhttp.VarsCtxKey, map[string]interface{}{

View file

@ -168,38 +168,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
return err return err
} }
// if active health checks are enabled, configure them and start a worker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")
timeout := time.Duration(h.HealthChecks.Active.Timeout)
if timeout == 0 {
timeout = 5 * time.Second
}
h.HealthChecks.Active.stopChan = make(chan struct{})
h.HealthChecks.Active.httpClient = &http.Client{
Timeout: timeout,
Transport: h.Transport,
}
if h.HealthChecks.Active.Interval == 0 {
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
}
if h.HealthChecks.Active.ExpectBody != "" {
var err error
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
if err != nil {
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
}
}
go h.activeHealthChecker()
}
// set up upstreams // set up upstreams
for _, upstream := range h.Upstreams { for _, upstream := range h.Upstreams {
// create or get the host representation for this upstream // create or get the host representation for this upstream
@ -235,6 +203,38 @@ func (h *Handler) Provision(ctx caddy.Context) error {
} }
} }
// if active health checks are enabled, configure them and start a worker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")
timeout := time.Duration(h.HealthChecks.Active.Timeout)
if timeout == 0 {
timeout = 5 * time.Second
}
h.HealthChecks.Active.stopChan = make(chan struct{})
h.HealthChecks.Active.httpClient = &http.Client{
Timeout: timeout,
Transport: h.Transport,
}
if h.HealthChecks.Active.Interval == 0 {
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
}
if h.HealthChecks.Active.ExpectBody != "" {
var err error
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
if err != nil {
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
}
}
go h.activeHealthChecker()
}
return nil return nil
} }
@ -244,6 +244,7 @@ func (h *Handler) Cleanup() error {
if h.HealthChecks != nil && if h.HealthChecks != nil &&
h.HealthChecks.Active != nil && h.HealthChecks.Active != nil &&
h.HealthChecks.Active.stopChan != nil { h.HealthChecks.Active.stopChan != nil {
// TODO: consider using context cancellation, could be much simpler
close(h.HealthChecks.Active.stopChan) close(h.HealthChecks.Active.stopChan)
} }