Add fix and tests for FastCGI persistent connections (#1134)

* keep fastcgi connection open

* poor mans serialisation to make up for the lack of demuxing

* pointing includes to echse's repo

* Revert "pointing includes to echse's repo"

This reverts commit 281daad8d4.

* switch for persistent fcgi connections on/off added

* fixing ineffectual assignments

* camel case instead of _

* only activate persistent sockets on windows (and some naming conventions/cleanup)

* gitfm import sorting

* Revert "fixing ineffectual assignments"

This reverts commit 79760344e7.

# Conflicts:
#	caddyhttp/staticfiles/fileserver.go

* added another mutex and deleting map entries. thx to mholts QA comments!

* thinking about it, this RW lock was not a good idea here

* thread safety

* I keep learning about mutexs in go

* some cosmetics

* adding persistant fastcgi connections switch to directive

* Support for configurable connection pool.

* ensure positive integer pool size config

* abisofts pool fix + nicer logging for the fastcgi_test

* abisoft wants to have dialer comparison in _test instead of next to struct

* Do not put dead connections back into pool

* Fix fastcgi header error

* Do not put dead connections back into pool

* some code style improvements from the discussion in https://github.com/mholt/caddy/pull/1134

* abisofts naming convention
This commit is contained in:
Sebastian Schmittner 2016-09-28 02:12:22 +02:00 committed by Matt Holt
parent 871d11af00
commit 8cb4e90852
5 changed files with 184 additions and 8 deletions

View file

@ -27,7 +27,6 @@ type persistentDialer struct {
func (p *persistentDialer) Dial() (*FCGIClient, error) {
p.Lock()
// connection is available, return first one.
if len(p.pool) > 0 {
client := p.pool[0]

View file

@ -90,8 +90,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength)
}
defer fcgiBackend.Close()
if err != nil && err != io.EOF {
return http.StatusBadGateway, err
}
@ -105,6 +103,8 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
return http.StatusBadGateway, err
}
defer rule.dialer.Close(fcgiBackend)
// Log any stderr output from upstream
if fcgiBackend.stderr.Len() != 0 {
// Remove trailing newline, error logger already does this.
@ -269,6 +269,7 @@ func (h Handler) buildEnv(r *http.Request, rule Rule, fpath string) (map[string]
}
// Rule represents a FastCGI handling rule.
// It is parsed from the fastcgi directive in the Caddyfile, see setup.go.
type Rule struct {
// The base path to match. Required.
Path string

View file

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"sync"
"testing"
)
@ -51,6 +52,101 @@ func TestServeHTTP(t *testing.T) {
}
}
// connectionCounter in fact is a listener with an added counter to keep track
// of the number of accepted connections.
type connectionCounter struct {
net.Listener
sync.Mutex
counter int
}
func (l *connectionCounter) Accept() (net.Conn, error) {
l.Lock()
l.counter++
l.Unlock()
return l.Listener.Accept()
}
// TestPersistent ensures that persistent
// as well as the non-persistent fastCGI servers
// send the answers corresnponding to the correct request.
// It also checks the number of tcp connections used.
func TestPersistent(t *testing.T) {
numberOfRequests := 32
for _, poolsize := range []int{0, 1, 5, numberOfRequests} {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Unable to create listener for test: %v", err)
}
listener := &connectionCounter{l, *new(sync.Mutex), 0}
// this fcgi server replies with the request URL
go fcgi.Serve(listener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := "This answers a request to " + r.URL.Path
bodyLenStr := strconv.Itoa(len(body))
w.Header().Set("Content-Length", bodyLenStr)
w.Write([]byte(body))
}))
network, address := parseAddress(listener.Addr().String())
handler := Handler{
Next: nil,
Rules: []Rule{{Path: "/", Address: listener.Addr().String(), dialer: &persistentDialer{size: poolsize, network: network, address: address}}},
}
var semaphore sync.WaitGroup
serialMutex := new(sync.Mutex)
serialCounter := 0
parallelCounter := 0
// make some serial followed by some
// parallel requests to challenge the handler
for _, serialize := range []bool{true, false, false, false} {
if serialize {
serialCounter++
} else {
parallelCounter++
}
semaphore.Add(numberOfRequests)
for i := 0; i < numberOfRequests; i++ {
go func(i int, serialize bool) {
defer semaphore.Done()
if serialize {
serialMutex.Lock()
defer serialMutex.Unlock()
}
r, err := http.NewRequest("GET", "/"+strconv.Itoa(i), nil)
if err != nil {
t.Errorf("Unable to create request: %v", err)
}
w := httptest.NewRecorder()
status, err := handler.ServeHTTP(w, r)
if status != 0 {
t.Errorf("Handler(pool: %v) return status %v", poolsize, status)
}
if err != nil {
t.Errorf("Handler(pool: %v) Error: %v", poolsize, err)
}
want := "This answers a request to /" + strconv.Itoa(i)
if got := w.Body.String(); got != want {
t.Errorf("Expected response from handler(pool: %v) to be '%s', got: '%s'", poolsize, want, got)
}
}(i, serialize)
} //next request
semaphore.Wait()
} // next set of requests (serial/parallel)
listener.Close()
t.Logf("The pool: %v test used %v tcp connections to answer %v * %v serial and %v * %v parallel requests.", poolsize, listener.counter, serialCounter, numberOfRequests, parallelCounter, numberOfRequests)
} // next handler (persistent/non-persistent)
}
func TestRuleParseAddress(t *testing.T) {
getClientTestTable := []struct {
rule *Rule

View file

@ -138,7 +138,7 @@ func (rec *record) read(r io.Reader) (buf []byte, err error) {
return
}
if rec.h.Version != 1 {
err = errors.New("fcgi: invalid header version")
err = errInvalidHeaderVersion
return
}
if rec.h.Type == EndRequest {
@ -358,7 +358,9 @@ func (w *streamReader) Read(p []byte) (n int, err error) {
rec := &record{}
var buf []byte
buf, err = rec.read(w.c.rwc)
if err != nil {
if err == errInvalidHeaderVersion {
continue
} else if err != nil {
return
}
// standard error output
@ -561,3 +563,5 @@ func (c *FCGIClient) PostFile(p map[string]string, data url.Values, file map[str
// Checks whether chunked is part of the encodings stack
func chunked(te []string) bool { return len(te) > 0 && te[0] == "chunked" }
var errInvalidHeaderVersion = errors.New("fcgi: invalid header version")

View file

@ -2,6 +2,7 @@ package fastcgi
import (
"fmt"
"reflect"
"testing"
"github.com/mholt/caddy"
@ -35,7 +36,34 @@ func TestSetup(t *testing.T) {
}
func (p *persistentDialer) Equals(q *persistentDialer) bool {
if p.size != q.size {
return false
}
if p.network != q.network {
return false
}
if p.address != q.address {
return false
}
if len(p.pool) != len(q.pool) {
return false
}
for i, client := range p.pool {
if client != q.pool[i] {
return false
}
}
// ignore mutex state
return true
}
func TestFastcgiParse(t *testing.T) {
defaultAddress := "127.0.0.1:9001"
network, address := parseAddress(defaultAddress)
t.Logf("Address '%v' was parsed to network '%v' and address '%v'", defaultAddress, network, address)
tests := []struct {
inputFastcgiConfig string
shouldErr bool
@ -48,19 +76,21 @@ func TestFastcgiParse(t *testing.T) {
Address: "127.0.0.1:9000",
Ext: ".php",
SplitPath: ".php",
dialer: basicDialer{network: "tcp", address: "127.0.0.1:9000"},
IndexFiles: []string{"index.php"},
}}},
{`fastcgi / 127.0.0.1:9001 {
{`fastcgi / ` + defaultAddress + ` {
split .html
}`,
false, []Rule{{
Path: "/",
Address: "127.0.0.1:9001",
Address: defaultAddress,
Ext: "",
SplitPath: ".html",
dialer: basicDialer{network: network, address: address},
IndexFiles: []string{},
}}},
{`fastcgi / 127.0.0.1:9001 {
{`fastcgi / ` + defaultAddress + ` {
split .html
except /admin /user
}`,
@ -69,9 +99,32 @@ func TestFastcgiParse(t *testing.T) {
Address: "127.0.0.1:9001",
Ext: "",
SplitPath: ".html",
dialer: basicDialer{network: network, address: address},
IndexFiles: []string{},
IgnoredSubPaths: []string{"/admin", "/user"},
}}},
{`fastcgi / ` + defaultAddress + ` {
pool 0
}`,
false, []Rule{{
Path: "/",
Address: defaultAddress,
Ext: "",
SplitPath: "",
dialer: &persistentDialer{size: 0, network: network, address: address},
IndexFiles: []string{},
}}},
{`fastcgi / ` + defaultAddress + ` {
pool 5
}`,
false, []Rule{{
Path: "/",
Address: defaultAddress,
Ext: "",
SplitPath: "",
dialer: &persistentDialer{size: 5, network: network, address: address},
IndexFiles: []string{},
}}},
}
for i, test := range tests {
actualFastcgiConfigs, err := fastcgiParse(caddy.NewTestController("http", test.inputFastcgiConfig))
@ -107,6 +160,29 @@ func TestFastcgiParse(t *testing.T) {
i, j, test.expectedFastcgiConfig[j].SplitPath, actualFastcgiConfig.SplitPath)
}
if reflect.TypeOf(actualFastcgiConfig.dialer) != reflect.TypeOf(test.expectedFastcgiConfig[j].dialer) {
t.Errorf("Test %d expected %dth FastCGI dialer to be of type %T, but got %T",
i, j, test.expectedFastcgiConfig[j].dialer, actualFastcgiConfig.dialer)
} else {
equal := true
switch actual := actualFastcgiConfig.dialer.(type) {
case basicDialer:
equal = actualFastcgiConfig.dialer == test.expectedFastcgiConfig[j].dialer
case *persistentDialer:
if expected, ok := test.expectedFastcgiConfig[j].dialer.(*persistentDialer); ok {
equal = actual.Equals(expected)
} else {
equal = false
}
default:
t.Errorf("Unkonw dialer type %T", actualFastcgiConfig.dialer)
}
if !equal {
t.Errorf("Test %d expected %dth FastCGI dialer to be %v, but got %v",
i, j, test.expectedFastcgiConfig[j].dialer, actualFastcgiConfig.dialer)
}
}
if fmt.Sprint(actualFastcgiConfig.IndexFiles) != fmt.Sprint(test.expectedFastcgiConfig[j].IndexFiles) {
t.Errorf("Test %d expected %dth FastCGI IndexFiles to be %s , but got %s",
i, j, test.expectedFastcgiConfig[j].IndexFiles, actualFastcgiConfig.IndexFiles)