mirror of
https://github.com/caddyserver/caddy.git
synced 2025-01-06 10:58:49 +03:00
add multi proxy supprot based on urls
This commit is contained in:
parent
2b44a7d052
commit
345ece3850
2 changed files with 203 additions and 81 deletions
|
@ -77,86 +77,102 @@ var tryDuration = 60 * time.Second
|
||||||
|
|
||||||
// ServeHTTP satisfies the httpserver.Handler interface.
|
// ServeHTTP satisfies the httpserver.Handler interface.
|
||||||
func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
|
func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
|
||||||
for _, upstream := range p.Upstreams {
|
// Start by selecting most specific matching upstream config
|
||||||
if !httpserver.Path(r.URL.Path).Matches(upstream.From()) ||
|
upstream := p.match(r)
|
||||||
!upstream.AllowedPath(r.URL.Path) {
|
if upstream == nil {
|
||||||
continue
|
return p.Next.ServeHTTP(w, r)
|
||||||
}
|
|
||||||
|
|
||||||
var replacer httpserver.Replacer
|
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
outreq := createUpstreamRequest(r)
|
|
||||||
|
|
||||||
// Since Select() should give us "up" hosts, keep retrying
|
|
||||||
// hosts until timeout (or until we get a nil host).
|
|
||||||
for time.Now().Sub(start) < tryDuration {
|
|
||||||
host := upstream.Select()
|
|
||||||
if host == nil {
|
|
||||||
return http.StatusBadGateway, errUnreachable
|
|
||||||
}
|
|
||||||
if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil {
|
|
||||||
rr.Replacer.Set("upstream", host.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
outreq.Host = host.Name
|
|
||||||
if host.UpstreamHeaders != nil {
|
|
||||||
if replacer == nil {
|
|
||||||
rHost := r.Host
|
|
||||||
replacer = httpserver.NewReplacer(r, nil, "")
|
|
||||||
outreq.Host = rHost
|
|
||||||
}
|
|
||||||
if v, ok := host.UpstreamHeaders["Host"]; ok {
|
|
||||||
outreq.Host = replacer.Replace(v[len(v)-1])
|
|
||||||
}
|
|
||||||
// Modify headers for request that will be sent to the upstream host
|
|
||||||
upHeaders := createHeadersByRules(host.UpstreamHeaders, r.Header, replacer)
|
|
||||||
for k, v := range upHeaders {
|
|
||||||
outreq.Header[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var downHeaderUpdateFn respUpdateFn
|
|
||||||
if host.DownstreamHeaders != nil {
|
|
||||||
if replacer == nil {
|
|
||||||
rHost := r.Host
|
|
||||||
replacer = httpserver.NewReplacer(r, nil, "")
|
|
||||||
outreq.Host = rHost
|
|
||||||
}
|
|
||||||
//Creates a function that is used to update headers the response received by the reverse proxy
|
|
||||||
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxy := host.ReverseProxy
|
|
||||||
if baseURL, err := url.Parse(host.Name); err == nil {
|
|
||||||
r.Host = baseURL.Host
|
|
||||||
if proxy == nil {
|
|
||||||
proxy = NewSingleHostReverseProxy(baseURL, host.WithoutPathPrefix)
|
|
||||||
}
|
|
||||||
} else if proxy == nil {
|
|
||||||
return http.StatusInternalServerError, err
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddInt64(&host.Conns, 1)
|
|
||||||
backendErr := proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
|
|
||||||
atomic.AddInt64(&host.Conns, -1)
|
|
||||||
if backendErr == nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
timeout := host.FailTimeout
|
|
||||||
if timeout == 0 {
|
|
||||||
timeout = 10 * time.Second
|
|
||||||
}
|
|
||||||
atomic.AddInt32(&host.Fails, 1)
|
|
||||||
go func(host *UpstreamHost, timeout time.Duration) {
|
|
||||||
time.Sleep(timeout)
|
|
||||||
atomic.AddInt32(&host.Fails, -1)
|
|
||||||
}(host, timeout)
|
|
||||||
}
|
|
||||||
return http.StatusBadGateway, errUnreachable
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.Next.ServeHTTP(w, r)
|
var replacer httpserver.Replacer
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
outreq := createUpstreamRequest(r)
|
||||||
|
|
||||||
|
// Since Select() should give us "up" hosts, keep retrying
|
||||||
|
// hosts until timeout (or until we get a nil host).
|
||||||
|
for time.Now().Sub(start) < tryDuration {
|
||||||
|
host := upstream.Select()
|
||||||
|
if host == nil {
|
||||||
|
return http.StatusBadGateway, errUnreachable
|
||||||
|
}
|
||||||
|
if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil {
|
||||||
|
rr.Replacer.Set("upstream", host.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
outreq.Host = host.Name
|
||||||
|
if host.UpstreamHeaders != nil {
|
||||||
|
if replacer == nil {
|
||||||
|
rHost := r.Host
|
||||||
|
replacer = httpserver.NewReplacer(r, nil, "")
|
||||||
|
outreq.Host = rHost
|
||||||
|
}
|
||||||
|
if v, ok := host.UpstreamHeaders["Host"]; ok {
|
||||||
|
outreq.Host = replacer.Replace(v[len(v)-1])
|
||||||
|
}
|
||||||
|
// Modify headers for request that will be sent to the upstream host
|
||||||
|
upHeaders := createHeadersByRules(host.UpstreamHeaders, r.Header, replacer)
|
||||||
|
for k, v := range upHeaders {
|
||||||
|
outreq.Header[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var downHeaderUpdateFn respUpdateFn
|
||||||
|
if host.DownstreamHeaders != nil {
|
||||||
|
if replacer == nil {
|
||||||
|
rHost := r.Host
|
||||||
|
replacer = httpserver.NewReplacer(r, nil, "")
|
||||||
|
outreq.Host = rHost
|
||||||
|
}
|
||||||
|
//Creates a function that is used to update headers the response received by the reverse proxy
|
||||||
|
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy := host.ReverseProxy
|
||||||
|
if baseURL, err := url.Parse(host.Name); err == nil {
|
||||||
|
r.Host = baseURL.Host
|
||||||
|
if proxy == nil {
|
||||||
|
proxy = NewSingleHostReverseProxy(baseURL, host.WithoutPathPrefix)
|
||||||
|
}
|
||||||
|
} else if proxy == nil {
|
||||||
|
return http.StatusInternalServerError, err
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt64(&host.Conns, 1)
|
||||||
|
backendErr := proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
|
||||||
|
atomic.AddInt64(&host.Conns, -1)
|
||||||
|
if backendErr == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
timeout := host.FailTimeout
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = 10 * time.Second
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&host.Fails, 1)
|
||||||
|
go func(host *UpstreamHost, timeout time.Duration) {
|
||||||
|
time.Sleep(timeout)
|
||||||
|
atomic.AddInt32(&host.Fails, -1)
|
||||||
|
}(host, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
return http.StatusBadGateway, errUnreachable
|
||||||
|
}
|
||||||
|
|
||||||
|
// match finds the best match for a proxy config based
|
||||||
|
// on r.
|
||||||
|
func (p Proxy) match(r *http.Request) Upstream {
|
||||||
|
var u Upstream
|
||||||
|
var longestMatch int
|
||||||
|
for _, upstream := range p.Upstreams {
|
||||||
|
basePath := upstream.From()
|
||||||
|
if !httpserver.Path(r.URL.Path).Matches(basePath) || !upstream.AllowedPath(r.URL.Path) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(basePath) > longestMatch {
|
||||||
|
longestMatch = len(basePath)
|
||||||
|
u = upstream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return u
|
||||||
}
|
}
|
||||||
|
|
||||||
// createUpstremRequest shallow-copies r into a new request
|
// createUpstremRequest shallow-copies r into a new request
|
||||||
|
|
|
@ -40,6 +40,7 @@ func TestReverseProxy(t *testing.T) {
|
||||||
|
|
||||||
// set up proxy
|
// set up proxy
|
||||||
p := &Proxy{
|
p := &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{newFakeUpstream(backend.URL, false)},
|
Upstreams: []Upstream{newFakeUpstream(backend.URL, false)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,6 +81,7 @@ func TestReverseProxyInsecureSkipVerify(t *testing.T) {
|
||||||
|
|
||||||
// set up proxy
|
// set up proxy
|
||||||
p := &Proxy{
|
p := &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{newFakeUpstream(backend.URL, true)},
|
Upstreams: []Upstream{newFakeUpstream(backend.URL, true)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,6 +374,7 @@ func TestUpstreamHeadersUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
// set up proxy
|
// set up proxy
|
||||||
p := &Proxy{
|
p := &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{upstream},
|
Upstreams: []Upstream{upstream},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,6 +444,7 @@ func TestDownstreamHeadersUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
// set up proxy
|
// set up proxy
|
||||||
p := &Proxy{
|
p := &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{upstream},
|
Upstreams: []Upstream{upstream},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -485,10 +489,98 @@ func TestDownstreamHeadersUpdate(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
upstreamResp1 = []byte("Hello, /")
|
||||||
|
upstreamResp2 = []byte("Hello, /api/")
|
||||||
|
)
|
||||||
|
|
||||||
|
func newMultiHostTestProxy() *Proxy {
|
||||||
|
// No-op backends.
|
||||||
|
upstreamServer1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, "%s", upstreamResp1)
|
||||||
|
}))
|
||||||
|
upstreamServer2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, "%s", upstreamResp2)
|
||||||
|
}))
|
||||||
|
p := &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
|
Upstreams: []Upstream{
|
||||||
|
// The order is important; the short path should go first to ensure
|
||||||
|
// we choose the most specific route, not the first one.
|
||||||
|
&fakeUpstream{
|
||||||
|
name: upstreamServer1.URL,
|
||||||
|
from: "/",
|
||||||
|
},
|
||||||
|
&fakeUpstream{
|
||||||
|
name: upstreamServer2.URL,
|
||||||
|
from: "/api",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReverseProxyFromClient(t *testing.T) {
|
||||||
|
p := newMultiHostTestProxy()
|
||||||
|
|
||||||
|
// This is a full end-end test, so the proxy handler.
|
||||||
|
proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
p.ServeHTTP(w, r)
|
||||||
|
}))
|
||||||
|
defer proxy.Close()
|
||||||
|
|
||||||
|
// Table tests.
|
||||||
|
var multiProxy = []struct {
|
||||||
|
url string
|
||||||
|
body []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"/",
|
||||||
|
upstreamResp1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"/api/",
|
||||||
|
upstreamResp2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"/messages/",
|
||||||
|
upstreamResp1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"/api/messages/?text=cat",
|
||||||
|
upstreamResp2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range multiProxy {
|
||||||
|
// Create client request
|
||||||
|
reqURL := singleJoiningSlash(proxy.URL, tt.url)
|
||||||
|
req, err := http.NewRequest("GET", reqURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create request: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make request: %v", err)
|
||||||
|
}
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(body, tt.body) {
|
||||||
|
t.Errorf("Expected '%s' but got '%s' instead", tt.body, body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newFakeUpstream(name string, insecure bool) *fakeUpstream {
|
func newFakeUpstream(name string, insecure bool) *fakeUpstream {
|
||||||
uri, _ := url.Parse(name)
|
uri, _ := url.Parse(name)
|
||||||
u := &fakeUpstream{
|
u := &fakeUpstream{
|
||||||
name: name,
|
name: name,
|
||||||
|
from: "/",
|
||||||
host: &UpstreamHost{
|
host: &UpstreamHost{
|
||||||
Name: name,
|
Name: name,
|
||||||
ReverseProxy: NewSingleHostReverseProxy(uri, ""),
|
ReverseProxy: NewSingleHostReverseProxy(uri, ""),
|
||||||
|
@ -501,15 +593,27 @@ func newFakeUpstream(name string, insecure bool) *fakeUpstream {
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeUpstream struct {
|
type fakeUpstream struct {
|
||||||
name string
|
name string
|
||||||
host *UpstreamHost
|
host *UpstreamHost
|
||||||
|
from string
|
||||||
|
without string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *fakeUpstream) From() string {
|
func (u *fakeUpstream) From() string {
|
||||||
return "/"
|
return u.from
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *fakeUpstream) Select() *UpstreamHost {
|
func (u *fakeUpstream) Select() *UpstreamHost {
|
||||||
|
if u.host == nil {
|
||||||
|
uri, err := url.Parse(u.name)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to url.Parse %s: %v", u.name, err)
|
||||||
|
}
|
||||||
|
u.host = &UpstreamHost{
|
||||||
|
Name: u.name,
|
||||||
|
ReverseProxy: NewSingleHostReverseProxy(uri, u.without),
|
||||||
|
}
|
||||||
|
}
|
||||||
return u.host
|
return u.host
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,12 +627,14 @@ func (u *fakeUpstream) AllowedPath(requestPath string) bool {
|
||||||
// proxy.
|
// proxy.
|
||||||
func newWebSocketTestProxy(backendAddr string) *Proxy {
|
func newWebSocketTestProxy(backendAddr string) *Proxy {
|
||||||
return &Proxy{
|
return &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{&fakeWsUpstream{name: backendAddr, without: ""}},
|
Upstreams: []Upstream{&fakeWsUpstream{name: backendAddr, without: ""}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPrefixedWebSocketTestProxy(backendAddr string, prefix string) *Proxy {
|
func newPrefixedWebSocketTestProxy(backendAddr string, prefix string) *Proxy {
|
||||||
return &Proxy{
|
return &Proxy{
|
||||||
|
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
|
||||||
Upstreams: []Upstream{&fakeWsUpstream{name: backendAddr, without: prefix}},
|
Upstreams: []Upstream{&fakeWsUpstream{name: backendAddr, without: prefix}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue