mirror of
https://github.com/caddyserver/caddy.git
synced 2025-03-30 09:09:03 +03:00
reverseproxy: Minor fixes and cleanup
Now use context cancellation to stop active health checker, which is simpler than and just as effective as using a separate stop channel.
This commit is contained in:
parent
65a09524c3
commit
e2f913bb7f
3 changed files with 50 additions and 48 deletions
|
@ -78,7 +78,6 @@ type ActiveHealthChecks struct {
|
||||||
// body of a healthy backend.
|
// body of a healthy backend.
|
||||||
ExpectBody string `json:"expect_body,omitempty"`
|
ExpectBody string `json:"expect_body,omitempty"`
|
||||||
|
|
||||||
stopChan chan struct{}
|
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
bodyRegexp *regexp.Regexp
|
bodyRegexp *regexp.Regexp
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
@ -137,8 +136,7 @@ func (h *Handler) activeHealthChecker() {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
h.doActiveHealthCheckForAllHosts()
|
h.doActiveHealthCheckForAllHosts()
|
||||||
case <-h.HealthChecks.Active.stopChan:
|
case <-h.ctx.Done():
|
||||||
// TODO: consider using a Context for cancellation instead
|
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -341,8 +339,8 @@ func (h *Handler) countFailure(upstream *Upstream) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.HealthChecks.Passive.logger.Error("could not count failure",
|
h.HealthChecks.Passive.logger.Error("could not count failure",
|
||||||
zap.String("host", upstream.Dial),
|
zap.String("host", upstream.Dial),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// forget it later
|
// forget it later
|
||||||
|
@ -357,8 +355,7 @@ func (h *Handler) countFailure(upstream *Upstream) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.HealthChecks.Passive.logger.Error("could not forget failure",
|
h.HealthChecks.Passive.logger.Error("could not forget failure",
|
||||||
zap.String("host", upstream.Dial),
|
zap.String("host", upstream.Dial),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}(upstream.Host, failDuration)
|
}(upstream.Host, failDuration)
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) {
|
||||||
// of the state of a remote host. It implements the
|
// of the state of a remote host. It implements the
|
||||||
// Host interface.
|
// Host interface.
|
||||||
type upstreamHost struct {
|
type upstreamHost struct {
|
||||||
numRequests int64 // must be first field to be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
|
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
|
||||||
fails int64
|
fails int64
|
||||||
unhealthy int32
|
unhealthy int32
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,7 @@ type Handler struct {
|
||||||
Transport http.RoundTripper `json:"-"`
|
Transport http.RoundTripper `json:"-"`
|
||||||
CB CircuitBreaker `json:"-"`
|
CB CircuitBreaker `json:"-"`
|
||||||
|
|
||||||
|
ctx caddy.Context
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,6 +126,7 @@ func (Handler) CaddyModule() caddy.ModuleInfo {
|
||||||
|
|
||||||
// Provision ensures that h is set up properly before use.
|
// Provision ensures that h is set up properly before use.
|
||||||
func (h *Handler) Provision(ctx caddy.Context) error {
|
func (h *Handler) Provision(ctx caddy.Context) error {
|
||||||
|
h.ctx = ctx
|
||||||
h.logger = ctx.Logger(h)
|
h.logger = ctx.Logger(h)
|
||||||
|
|
||||||
// start by loading modules
|
// start by loading modules
|
||||||
|
@ -235,36 +237,43 @@ func (h *Handler) Provision(ctx caddy.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if active health checks are enabled, configure them and start a worker
|
if h.HealthChecks != nil {
|
||||||
if h.HealthChecks != nil &&
|
// set defaults on passive health checks, if necessary
|
||||||
h.HealthChecks.Active != nil &&
|
if h.HealthChecks.Passive != nil {
|
||||||
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
|
if h.HealthChecks.Passive.FailDuration > 0 && h.HealthChecks.Passive.MaxFails == 0 {
|
||||||
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")
|
h.HealthChecks.Passive.MaxFails = 1
|
||||||
|
|
||||||
timeout := time.Duration(h.HealthChecks.Active.Timeout)
|
|
||||||
if timeout == 0 {
|
|
||||||
timeout = 5 * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
h.HealthChecks.Active.stopChan = make(chan struct{})
|
|
||||||
h.HealthChecks.Active.httpClient = &http.Client{
|
|
||||||
Timeout: timeout,
|
|
||||||
Transport: h.Transport,
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.HealthChecks.Active.Interval == 0 {
|
|
||||||
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.HealthChecks.Active.ExpectBody != "" {
|
|
||||||
var err error
|
|
||||||
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go h.activeHealthChecker()
|
// if active health checks are enabled, configure them and start a worker
|
||||||
|
if h.HealthChecks.Active != nil &&
|
||||||
|
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
|
||||||
|
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")
|
||||||
|
|
||||||
|
timeout := time.Duration(h.HealthChecks.Active.Timeout)
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = 5 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
h.HealthChecks.Active.httpClient = &http.Client{
|
||||||
|
Timeout: timeout,
|
||||||
|
Transport: h.Transport,
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.HealthChecks.Active.Interval == 0 {
|
||||||
|
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.HealthChecks.Active.ExpectBody != "" {
|
||||||
|
var err error
|
||||||
|
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go h.activeHealthChecker()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set up any response routes
|
// set up any response routes
|
||||||
|
@ -280,14 +289,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
|
||||||
|
|
||||||
// Cleanup cleans up the resources made by h during provisioning.
|
// Cleanup cleans up the resources made by h during provisioning.
|
||||||
func (h *Handler) Cleanup() error {
|
func (h *Handler) Cleanup() error {
|
||||||
// stop the active health checker
|
|
||||||
if h.HealthChecks != nil &&
|
|
||||||
h.HealthChecks.Active != nil &&
|
|
||||||
h.HealthChecks.Active.stopChan != nil {
|
|
||||||
// TODO: consider using context cancellation, could be much simpler
|
|
||||||
close(h.HealthChecks.Active.stopChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Close keepalive connections on reload? https://github.com/caddyserver/caddy/pull/2507/files#diff-70219fd88fe3f36834f474ce6537ed26R762
|
// TODO: Close keepalive connections on reload? https://github.com/caddyserver/caddy/pull/2507/files#diff-70219fd88fe3f36834f474ce6537ed26R762
|
||||||
|
|
||||||
// remove hosts from our config from the pool
|
// remove hosts from our config from the pool
|
||||||
|
@ -351,7 +352,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
|
||||||
if proxyErr == nil {
|
if proxyErr == nil {
|
||||||
proxyErr = fmt.Errorf("no upstreams available")
|
proxyErr = fmt.Errorf("no upstreams available")
|
||||||
}
|
}
|
||||||
if !h.LoadBalancing.tryAgain(start, proxyErr, r) {
|
if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -410,7 +411,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
|
||||||
h.countFailure(upstream)
|
h.countFailure(upstream)
|
||||||
|
|
||||||
// if we've tried long enough, break
|
// if we've tried long enough, break
|
||||||
if !h.LoadBalancing.tryAgain(start, proxyErr, r) {
|
if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -661,7 +662,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di Dia
|
||||||
// long enough before the next retry (i.e. no more sleeping is
|
// long enough before the next retry (i.e. no more sleeping is
|
||||||
// needed). If false is returned, the handler should stop trying to
|
// needed). If false is returned, the handler should stop trying to
|
||||||
// proxy the request.
|
// proxy the request.
|
||||||
func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Request) bool {
|
func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, proxyErr error, req *http.Request) bool {
|
||||||
// if we've tried long enough, break
|
// if we've tried long enough, break
|
||||||
if time.Since(start) >= time.Duration(lb.TryDuration) {
|
if time.Since(start) >= time.Duration(lb.TryDuration) {
|
||||||
return false
|
return false
|
||||||
|
@ -687,8 +688,12 @@ func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Requ
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise, wait and try the next available host
|
// otherwise, wait and try the next available host
|
||||||
time.Sleep(time.Duration(lb.TryInterval))
|
select {
|
||||||
return true
|
case <-time.After(time.Duration(lb.TryInterval)):
|
||||||
|
return true
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// directRequest modifies only req.URL so that it points to the upstream
|
// directRequest modifies only req.URL so that it points to the upstream
|
||||||
|
|
Loading…
Reference in a new issue