reverseproxy: Multiple dynamic upstreams

This allows users to, for example, get upstreams from multiple SRV
endpoints in order (such as primary and secondary clusters).

Also, gofmt went to town on the comments, sigh
This commit is contained in:
Matthew Holt 2022-08-25 21:42:48 -06:00
parent 2cc5d38229
commit 5fb5b81439
No known key found for this signature in database
GPG key ID: 2A349DD577D586A5
2 changed files with 218 additions and 120 deletions

View file

@ -52,73 +52,73 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
// reverse_proxy [<matcher>] [<upstreams...>] {
// # backends
// to <upstreams...>
// dynamic <name> [...]
// reverse_proxy [<matcher>] [<upstreams...>] {
// # backends
// to <upstreams...>
// dynamic <name> [...]
//
// # load balancing
// lb_policy <name> [<options...>]
// lb_retries <retries>
// lb_try_duration <duration>
// lb_try_interval <interval>
// lb_retry_match <request-matcher>
// # load balancing
// lb_policy <name> [<options...>]
// lb_retries <retries>
// lb_try_duration <duration>
// lb_try_interval <interval>
// lb_retry_match <request-matcher>
//
// # active health checking
// health_uri <uri>
// health_port <port>
// health_interval <interval>
// health_timeout <duration>
// health_status <status>
// health_body <regexp>
// health_headers {
// <field> [<values...>]
// }
// # active health checking
// health_uri <uri>
// health_port <port>
// health_interval <interval>
// health_timeout <duration>
// health_status <status>
// health_body <regexp>
// health_headers {
// <field> [<values...>]
// }
//
// # passive health checking
// fail_duration <duration>
// max_fails <num>
// unhealthy_status <status>
// unhealthy_latency <duration>
// unhealthy_request_count <num>
// # passive health checking
// fail_duration <duration>
// max_fails <num>
// unhealthy_status <status>
// unhealthy_latency <duration>
// unhealthy_request_count <num>
//
// # streaming
// flush_interval <duration>
// buffer_requests
// buffer_responses
// max_buffer_size <size>
// # streaming
// flush_interval <duration>
// buffer_requests
// buffer_responses
// max_buffer_size <size>
//
// # request manipulation
// trusted_proxies [private_ranges] <ranges...>
// header_up [+|-]<field> [<value|regexp> [<replacement>]]
// header_down [+|-]<field> [<value|regexp> [<replacement>]]
// method <method>
// rewrite <to>
// # request manipulation
// trusted_proxies [private_ranges] <ranges...>
// header_up [+|-]<field> [<value|regexp> [<replacement>]]
// header_down [+|-]<field> [<value|regexp> [<replacement>]]
// method <method>
// rewrite <to>
//
// # round trip
// transport <name> {
// ...
// }
// # round trip
// transport <name> {
// ...
// }
//
// # optionally intercept responses from upstream
// @name {
// status <code...>
// header <field> [<value>]
// }
// replace_status [<matcher>] <status_code>
// handle_response [<matcher>] {
// <directives...>
// # optionally intercept responses from upstream
// @name {
// status <code...>
// header <field> [<value>]
// }
// replace_status [<matcher>] <status_code>
// handle_response [<matcher>] {
// <directives...>
//
// # special directives only available in handle_response
// copy_response [<matcher>] [<status>] {
// status <status>
// }
// copy_response_headers [<matcher>] {
// include <fields...>
// exclude <fields...>
// }
// }
// }
// # special directives only available in handle_response
// copy_response [<matcher>] [<status>] {
// status <status>
// }
// copy_response_headers [<matcher>] {
// include <fields...>
// exclude <fields...>
// }
// }
// }
//
// Proxy upstream addresses should be network dial addresses such
// as `host:port`, or a URL such as `scheme://host:port`. Scheme
@ -824,33 +824,32 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
// transport http {
// read_buffer <size>
// write_buffer <size>
// max_response_header <size>
// dial_timeout <duration>
// dial_fallback_delay <duration>
// response_header_timeout <duration>
// expect_continue_timeout <duration>
// resolvers <resolvers...>
// tls
// tls_client_auth <automate_name> | <cert_file> <key_file>
// tls_insecure_skip_verify
// tls_timeout <duration>
// tls_trusted_ca_certs <cert_files...>
// tls_server_name <sni>
// tls_renegotiation <level>
// tls_except_ports <ports...>
// keepalive [off|<duration>]
// keepalive_interval <interval>
// keepalive_idle_conns <max_count>
// keepalive_idle_conns_per_host <count>
// versions <versions...>
// compression off
// max_conns_per_host <count>
// max_idle_conns_per_host <count>
// }
//
// transport http {
// read_buffer <size>
// write_buffer <size>
// max_response_header <size>
// dial_timeout <duration>
// dial_fallback_delay <duration>
// response_header_timeout <duration>
// expect_continue_timeout <duration>
// resolvers <resolvers...>
// tls
// tls_client_auth <automate_name> | <cert_file> <key_file>
// tls_insecure_skip_verify
// tls_timeout <duration>
// tls_trusted_ca_certs <cert_files...>
// tls_server_name <sni>
// tls_renegotiation <level>
// tls_except_ports <ports...>
// keepalive [off|<duration>]
// keepalive_interval <interval>
// keepalive_idle_conns <max_count>
// keepalive_idle_conns_per_host <count>
// versions <versions...>
// compression off
// max_conns_per_host <count>
// max_idle_conns_per_host <count>
// }
func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
for d.NextBlock(0) {
@ -1138,10 +1137,9 @@ func parseCopyResponseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHan
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
// copy_response [<matcher>] [<status>] {
// status <status>
// }
//
// copy_response [<matcher>] [<status>] {
// status <status>
// }
func (h *CopyResponseHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@ -1178,11 +1176,10 @@ func parseCopyResponseHeadersCaddyfile(h httpcaddyfile.Helper) (caddyhttp.Middle
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
//
// copy_response_headers [<matcher>] {
// include <fields...>
// exclude <fields...>
// }
//
// copy_response_headers [<matcher>] {
// include <fields...>
// exclude <fields...>
// }
func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@ -1208,16 +1205,15 @@ func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser)
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
// dynamic srv [<name>] {
// service <service>
// proto <proto>
// name <name>
// refresh <interval>
// resolvers <resolvers...>
// dial_timeout <timeout>
// dial_fallback_delay <timeout>
// }
//
// dynamic srv [<name>] {
// service <service>
// proto <proto>
// name <name>
// refresh <interval>
// resolvers <resolvers...>
// dial_timeout <timeout>
// dial_fallback_delay <timeout>
// }
func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@ -1307,15 +1303,14 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
// dynamic a [<name> <port] {
// name <name>
// port <port>
// refresh <interval>
// resolvers <resolvers...>
// dial_timeout <timeout>
// dial_fallback_delay <timeout>
// }
//
// dynamic a [<name> <port] {
// name <name>
// port <port>
// refresh <interval>
// resolvers <resolvers...>
// dial_timeout <timeout>
// dial_fallback_delay <timeout>
// }
func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
args := d.RemainingArgs()
@ -1324,7 +1319,9 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
if len(args) > 0 {
u.Name = args[0]
u.Port = args[1]
if len(args) == 2 {
u.Port = args[1]
}
}
for d.NextBlock(0) {
@ -1395,6 +1392,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return nil
}
// UnmarshalCaddyfile deserializes Caddyfile tokens into h.
//
// dynamic multi {
// <source> [...]
// }
func (u *MultiUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.Next() {
if d.NextArg() {
return d.ArgErr()
}
for nesting := d.Nesting(); d.NextBlock(nesting); {
dynModule := d.Val()
modID := "http.reverse_proxy.upstreams." + dynModule
unm, err := caddyfile.UnmarshalModule(d, modID)
if err != nil {
return err
}
source, ok := unm.(UpstreamSource)
if !ok {
return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm)
}
u.SourcesRaw = append(u.SourcesRaw, caddyconfig.JSONModuleObject(source, "source", dynModule, nil))
}
}
return nil
}
const matcherPrefix = "@"
// Interface guards
@ -1403,4 +1429,5 @@ var (
_ caddyfile.Unmarshaler = (*HTTPTransport)(nil)
_ caddyfile.Unmarshaler = (*SRVUpstreams)(nil)
_ caddyfile.Unmarshaler = (*AUpstreams)(nil)
_ caddyfile.Unmarshaler = (*MultiUpstreams)(nil)
)

View file

@ -2,6 +2,7 @@ package reverseproxy
import (
"context"
"encoding/json"
"fmt"
weakrand "math/rand"
"net"
@ -18,6 +19,7 @@ import (
func init() {
caddy.RegisterModule(SRVUpstreams{})
caddy.RegisterModule(AUpstreams{})
caddy.RegisterModule(MultiUpstreams{})
}
// SRVUpstreams provides upstreams from SRV lookups.
@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool {
return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh)
}
var (
srvs = make(map[string]srvLookup)
srvsMu sync.RWMutex
)
// AUpstreams provides upstreams from A/AAAA lookups.
// Results are cached and refreshed at the configured
// refresh interval.
@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool {
return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh)
}
// MultiUpstreams is a single dynamic upstream source that
// aggregates the results of multiple dynamic upstream sources.
// All configured sources will be queried in order, with their
// results appended to the end of the list. Errors returned
// from individual sources will be logged and the next source
// will continue to be invoked.
//
// This module makes it easy to implement redundant cluster
// failovers, especially in conjunction with the `first` load
// balancing policy: if the first source returns an error or
// no upstreams, the second source's upstreams will be used
// naturally.
type MultiUpstreams struct {
// The list of upstream source modules to get upstreams from.
// They will be queried in order, with their results appended
// in the order they are returned.
SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
sources []UpstreamSource
logger *zap.Logger
}
// CaddyModule returns the Caddy module information.
func (MultiUpstreams) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "http.reverse_proxy.upstreams.multi",
New: func() caddy.Module { return new(MultiUpstreams) },
}
}
func (mu *MultiUpstreams) Provision(ctx caddy.Context) error {
mu.logger = ctx.Logger(mu)
if mu.SourcesRaw != nil {
mod, err := ctx.LoadModule(mu, "SourcesRaw")
if err != nil {
return fmt.Errorf("loading upstream source modules: %v", err)
}
for _, src := range mod.([]any) {
mu.sources = append(mu.sources, src.(UpstreamSource))
}
}
return nil
}
func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
var upstreams []*Upstream
for i, src := range mu.sources {
select {
case <-r.Context().Done():
return upstreams, context.Canceled
default:
}
up, err := src.GetUpstreams(r)
if err != nil {
mu.logger.Error("upstream source returned error",
zap.Int("source_idx", i),
zap.Error(err))
} else if len(up) == 0 {
mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i))
} else {
upstreams = append(upstreams, up...)
}
}
return upstreams, nil
}
// UpstreamResolver holds the set of addresses of DNS resolvers of
// upstream addresses
type UpstreamResolver struct {
@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error {
}
var (
srvs = make(map[string]srvLookup)
srvsMu sync.RWMutex
aAaaa = make(map[string]aLookup)
aAaaaMu sync.RWMutex
)