reverse_proxy: Ability to mutate headers; set upstream placeholders

This commit is contained in:
Matthew Holt 2019-09-14 13:25:26 -06:00
parent 2fd22139c6
commit e73b117332
No known key found for this signature in database
GPG key ID: 2A349DD577D586A5
3 changed files with 74 additions and 12 deletions

View file

@ -111,10 +111,10 @@ func (h *Handler) doActiveHealthChecksForAllHosts() {
if network == "unix" || network == "unixgram" || network == "unixpacket" { if network == "unix" || network == "unixgram" || network == "unixpacket" {
// this will be used as the Host portion of a http.Request URL, and // this will be used as the Host portion of a http.Request URL, and
// paths to socket files would produce an error when creating URL, // paths to socket files would produce an error when creating URL,
// so use a fake Host value instead // so use a fake Host value instead; unix sockets are usually local
hostAddr = network hostAddr = "localhost"
} }
err = h.doActiveHealthCheck(DialInfo{network, addrs[0]}, hostAddr, host) err = h.doActiveHealthCheck(NewDialInfo(network, addrs[0]), hostAddr, host)
if err != nil { if err != nil {
log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err) log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", networkAddr, err)
} }

View file

@ -16,6 +16,8 @@ package reverseproxy
import ( import (
"fmt" "fmt"
"net"
"strings"
"sync/atomic" "sync/atomic"
"github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2"
@ -173,6 +175,27 @@ type DialInfo struct {
// The address to dial. Follows the same // The address to dial. Follows the same
// semantics and rules as net.Dial. // semantics and rules as net.Dial.
Address string Address string
// Host and Port are components of Address,
// pre-split for convenience.
Host, Port string
}
// NewDialInfo creates and populates a DialInfo
// for the given network and address. It splits
// the address into host and port values if the
// network type supports them, or uses the whole
// address as the port if splitting fails.
func NewDialInfo(network, address string) DialInfo {
var addrHost, addrPort string
if !strings.Contains(network, "unix") {
var err error
addrHost, addrPort, err = net.SplitHostPort(address)
if err != nil {
addrHost = address // assume there was no port
}
}
return DialInfo{network, address, addrHost, addrPort}
} }
// String returns the Caddy network address form // String returns the Caddy network address form

View file

@ -21,11 +21,13 @@ import (
"net" "net"
"net/http" "net/http"
"regexp" "regexp"
"strconv"
"strings" "strings"
"time" "time"
"github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddyhttp/headers"
"golang.org/x/net/http/httpguts" "golang.org/x/net/http/httpguts"
) )
@ -35,12 +37,13 @@ func init() {
// Handler implements a highly configurable and production-ready reverse proxy. // Handler implements a highly configurable and production-ready reverse proxy.
type Handler struct { type Handler struct {
TransportRaw json.RawMessage `json:"transport,omitempty"` TransportRaw json.RawMessage `json:"transport,omitempty"`
CBRaw json.RawMessage `json:"circuit_breaker,omitempty"` CBRaw json.RawMessage `json:"circuit_breaker,omitempty"`
LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"` LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"`
HealthChecks *HealthChecks `json:"health_checks,omitempty"` HealthChecks *HealthChecks `json:"health_checks,omitempty"`
Upstreams UpstreamPool `json:"upstreams,omitempty"` Upstreams UpstreamPool `json:"upstreams,omitempty"`
FlushInterval caddy.Duration `json:"flush_interval,omitempty"` FlushInterval caddy.Duration `json:"flush_interval,omitempty"`
Headers *headers.Handler `json:"headers,omitempty"`
Transport http.RoundTripper `json:"-"` Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"` CB CircuitBreaker `json:"-"`
@ -178,7 +181,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// make a new upstream based on the original // make a new upstream based on the original
// that has a singular dial address // that has a singular dial address
upstreamCopy := *upstream upstreamCopy := *upstream
upstreamCopy.dialInfo = DialInfo{network, addr} upstreamCopy.dialInfo = NewDialInfo(network, addr)
upstreamCopy.Dial = upstreamCopy.dialInfo.String() upstreamCopy.Dial = upstreamCopy.dialInfo.String()
upstreamCopy.cb = h.CB upstreamCopy.cb = h.CB
@ -187,7 +190,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// TODO: make hosts modular, so that their state can be distributed in enterprise for example // TODO: make hosts modular, so that their state can be distributed in enterprise for example
// TODO: If distributed, the pool should be stored in storage... // TODO: If distributed, the pool should be stored in storage...
var host Host = new(upstreamHost) var host Host = new(upstreamHost)
activeHost, loaded := hosts.LoadOrStore(upstreamCopy.Dial, host) activeHost, loaded := hosts.LoadOrStore(upstreamCopy.dialInfo.String(), host)
if loaded { if loaded {
host = activeHost.(Host) host = activeHost.(Host)
} }
@ -243,6 +246,8 @@ func (h *Handler) Cleanup() error {
} }
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error { func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error {
repl := r.Context().Value(caddy.ReplacerCtxKey).(caddy.Replacer)
// prepare the request for proxying; this is needed only once // prepare the request for proxying; this is needed only once
err := h.prepareRequest(r) err := h.prepareRequest(r)
if err != nil { if err != nil {
@ -250,6 +255,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
fmt.Errorf("preparing request for upstream round-trip: %v", err)) fmt.Errorf("preparing request for upstream round-trip: %v", err))
} }
// we will need the original headers and Host
// value if header operations are configured
reqHeader := r.Header
reqHost := r.Host
start := time.Now() start := time.Now()
var proxyErr error var proxyErr error
@ -258,7 +268,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, r) upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, r)
if upstream == nil { if upstream == nil {
if proxyErr == nil { if proxyErr == nil {
proxyErr = fmt.Errorf("no available upstreams") proxyErr = fmt.Errorf("no upstreams available")
} }
if !h.tryAgain(start, proxyErr) { if !h.tryAgain(start, proxyErr) {
break break
@ -272,6 +282,26 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo) ctx := context.WithValue(r.Context(), DialInfoCtxKey, upstream.dialInfo)
r = r.WithContext(ctx) r = r.WithContext(ctx)
// set placeholders with information about this upstream
repl.Set("http.handlers.reverse_proxy.upstream.address", upstream.dialInfo.String())
repl.Set("http.handlers.reverse_proxy.upstream.hostport", upstream.dialInfo.Address)
repl.Set("http.handlers.reverse_proxy.upstream.host", upstream.dialInfo.Host)
repl.Set("http.handlers.reverse_proxy.upstream.port", upstream.dialInfo.Port)
repl.Set("http.handlers.reverse_proxy.upstream.requests", strconv.Itoa(upstream.Host.NumRequests()))
repl.Set("http.handlers.reverse_proxy.upstream.max_requests", strconv.Itoa(upstream.MaxRequests))
repl.Set("http.handlers.reverse_proxy.upstream.fails", strconv.Itoa(upstream.Host.Fails()))
// mutate request headers according to this upstream;
// because we're in a retry loop, we have to copy
// headers (and the r.Host value) from the original
// so that each retry is identical to the first
if h.Headers != nil && h.Headers.Request != nil {
r.Header = make(http.Header)
copyHeader(r.Header, reqHeader)
r.Host = reqHost
h.Headers.Request.ApplyToRequest(r)
}
// proxy the request to that upstream // proxy the request to that upstream
proxyErr = h.reverseProxy(w, r, upstream) proxyErr = h.reverseProxy(w, r, upstream)
if proxyErr == nil || proxyErr == context.Canceled { if proxyErr == nil || proxyErr == context.Canceled {
@ -428,6 +458,15 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", ")) rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
} }
// apply any response header operations
if h.Headers != nil && h.Headers.Response != nil {
if h.Headers.Response.Require == nil ||
h.Headers.Response.Require.Match(res.StatusCode, rw.Header()) {
repl := req.Context().Value(caddy.ReplacerCtxKey).(caddy.Replacer)
h.Headers.Response.ApplyTo(rw.Header(), repl)
}
}
rw.WriteHeader(res.StatusCode) rw.WriteHeader(res.StatusCode)
err = h.copyResponse(rw, res.Body, h.flushInterval(req, res)) err = h.copyResponse(rw, res.Body, h.flushInterval(req, res))