2017-05-27 22:30:11 +03:00
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TODO: turn off the serve goroutine when idle, so
// an idle conn only has the readFrames goroutine active. (which could
// also be optimized probably to pin less memory in crypto/tls). This
// would involve tracking when the serve goroutine is active (atomic
// int32 read/CAS probably?) and starting it up when frames arrive,
// and shutting it down when all handlers exit. the occasional PING
// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
// (which is a no-op if already running) and then queue the PING write
// as normal. The serve loop would then exit in most cases (if no
// Handlers running) and not be woken up again until the PING packet
// returns.
// TODO (maybe): add a mechanism for Handlers to going into
// half-closed-local mode (rw.(io.Closer) test?) but not exit their
// handler, and continue to be able to read from the
// Request.Body. This would be a somewhat semantic change from HTTP/1
// (or at least what we expose in net/http), so I'd probably want to
// add it there too. For now, this package says that returning from
// the Handler ServeHTTP function means you're both done reading and
// done writing, without a way to stop just one or the other.
package http2
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"math"
"net"
"net/http"
"net/textproto"
"net/url"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/http2/hpack"
)
const (
prefaceTimeout = 10 * time . Second
firstSettingsTimeout = 2 * time . Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10
defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
)
var (
errClientDisconnected = errors . New ( "client disconnected" )
errClosedBody = errors . New ( "body closed by handler" )
errHandlerComplete = errors . New ( "http2: request body closed due to handler exiting" )
errStreamClosed = errors . New ( "http2: stream closed" )
)
var responseWriterStatePool = sync . Pool {
New : func ( ) interface { } {
rws := & responseWriterState { }
rws . bw = bufio . NewWriterSize ( chunkWriter { rws } , handlerChunkWriteSize )
return rws
} ,
}
// Test hooks.
var (
testHookOnConn func ( )
testHookGetServerConn func ( * serverConn )
testHookOnPanicMu * sync . Mutex // nil except in tests
testHookOnPanic func ( sc * serverConn , panicVal interface { } ) ( rePanic bool )
)
// Server is an HTTP/2 server.
type Server struct {
// MaxHandlers limits the number of http.Handler ServeHTTP goroutines
// which may run at a time over all connections.
// Negative or zero no limit.
// TODO: implement
MaxHandlers int
// MaxConcurrentStreams optionally specifies the number of
// concurrent streams that each client may have open at a
// time. This is unrelated to the number of http.Handler goroutines
// which may be active globally, which is MaxHandlers.
// If zero, MaxConcurrentStreams defaults to at least 100, per
// the HTTP/2 spec's recommendations.
MaxConcurrentStreams uint32
// MaxReadFrameSize optionally specifies the largest frame
// this server is willing to read. A valid value is between
// 16k and 16M, inclusive. If zero or otherwise invalid, a
// default value is used.
MaxReadFrameSize uint32
// PermitProhibitedCipherSuites, if true, permits the use of
// cipher suites prohibited by the HTTP/2 spec.
PermitProhibitedCipherSuites bool
// IdleTimeout specifies how long until idle clients should be
// closed with a GOAWAY frame. PING frames are not considered
// activity for the purposes of IdleTimeout.
IdleTimeout time . Duration
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
// If the value is outside this range, a default value will be
// used instead.
MaxUploadBufferPerConnection int32
// MaxUploadBufferPerStream is the size of the initial flow control
// window for each stream. The HTTP/2 spec does not allow this to
// be larger than 2^32-1. If the value is zero or larger than the
// maximum, a default value will be used instead.
MaxUploadBufferPerStream int32
// NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen.
NewWriteScheduler func ( ) WriteScheduler
// Internal state. This is a pointer (rather than embedded directly)
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state * serverInternalState
}
func ( s * Server ) initialConnRecvWindowSize ( ) int32 {
if s . MaxUploadBufferPerConnection > initialWindowSize {
return s . MaxUploadBufferPerConnection
}
return 1 << 20
}
func ( s * Server ) initialStreamRecvWindowSize ( ) int32 {
if s . MaxUploadBufferPerStream > 0 {
return s . MaxUploadBufferPerStream
}
return 1 << 20
}
func ( s * Server ) maxReadFrameSize ( ) uint32 {
if v := s . MaxReadFrameSize ; v >= minMaxFrameSize && v <= maxFrameSize {
return v
}
return defaultMaxReadFrameSize
}
func ( s * Server ) maxConcurrentStreams ( ) uint32 {
if v := s . MaxConcurrentStreams ; v > 0 {
return v
}
return defaultMaxStreams
}
type serverInternalState struct {
mu sync . Mutex
activeConns map [ * serverConn ] struct { }
}
func ( s * serverInternalState ) registerConn ( sc * serverConn ) {
if s == nil {
return // if the Server was used without calling ConfigureServer
}
s . mu . Lock ( )
s . activeConns [ sc ] = struct { } { }
s . mu . Unlock ( )
}
func ( s * serverInternalState ) unregisterConn ( sc * serverConn ) {
if s == nil {
return // if the Server was used without calling ConfigureServer
}
s . mu . Lock ( )
delete ( s . activeConns , sc )
s . mu . Unlock ( )
}
func ( s * serverInternalState ) startGracefulShutdown ( ) {
if s == nil {
return // if the Server was used without calling ConfigureServer
}
s . mu . Lock ( )
for sc := range s . activeConns {
sc . startGracefulShutdown ( )
}
s . mu . Unlock ( )
}
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
//
// ConfigureServer must be called before s begins serving.
func ConfigureServer ( s * http . Server , conf * Server ) error {
if s == nil {
panic ( "nil *http.Server" )
}
if conf == nil {
conf = new ( Server )
}
conf . state = & serverInternalState { activeConns : make ( map [ * serverConn ] struct { } ) }
if err := configureServer18 ( s , conf ) ; err != nil {
return err
}
if err := configureServer19 ( s , conf ) ; err != nil {
return err
}
if s . TLSConfig == nil {
s . TLSConfig = new ( tls . Config )
} else if s . TLSConfig . CipherSuites != nil {
// If they already provided a CipherSuite list, return
// an error if it has a bad order or is missing
// ECDHE_RSA_WITH_AES_128_GCM_SHA256.
const requiredCipher = tls . TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
haveRequired := false
sawBad := false
for i , cs := range s . TLSConfig . CipherSuites {
if cs == requiredCipher {
haveRequired = true
}
if isBadCipher ( cs ) {
sawBad = true
} else if sawBad {
return fmt . Errorf ( "http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection." , i , cs )
}
}
if ! haveRequired {
return fmt . Errorf ( "http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" )
}
}
// Note: not setting MinVersion to tls.VersionTLS12,
// as we don't want to interfere with HTTP/1.1 traffic
// on the user's server. We enforce TLS 1.2 later once
// we accept a connection. Ideally this should be done
// during next-proto selection, but using TLS <1.2 with
// HTTP/2 is still the client's bug.
s . TLSConfig . PreferServerCipherSuites = true
haveNPN := false
for _ , p := range s . TLSConfig . NextProtos {
if p == NextProtoTLS {
haveNPN = true
break
}
}
if ! haveNPN {
s . TLSConfig . NextProtos = append ( s . TLSConfig . NextProtos , NextProtoTLS )
}
if s . TLSNextProto == nil {
s . TLSNextProto = map [ string ] func ( * http . Server , * tls . Conn , http . Handler ) { }
}
protoHandler := func ( hs * http . Server , c * tls . Conn , h http . Handler ) {
if testHookOnConn != nil {
testHookOnConn ( )
}
conf . ServeConn ( c , & ServeConnOpts {
Handler : h ,
BaseConfig : hs ,
} )
}
s . TLSNextProto [ NextProtoTLS ] = protoHandler
return nil
}
// ServeConnOpts are options for the Server.ServeConn method.
type ServeConnOpts struct {
// BaseConfig optionally sets the base configuration
// for values. If nil, defaults are used.
BaseConfig * http . Server
// Handler specifies which handler to use for processing
// requests. If nil, BaseConfig.Handler is used. If BaseConfig
// or BaseConfig.Handler is nil, http.DefaultServeMux is used.
Handler http . Handler
}
func ( o * ServeConnOpts ) baseConfig ( ) * http . Server {
if o != nil && o . BaseConfig != nil {
return o . BaseConfig
}
return new ( http . Server )
}
func ( o * ServeConnOpts ) handler ( ) http . Handler {
if o != nil {
if o . Handler != nil {
return o . Handler
}
if o . BaseConfig != nil && o . BaseConfig . Handler != nil {
return o . BaseConfig . Handler
}
}
return http . DefaultServeMux
}
// ServeConn serves HTTP/2 requests on the provided connection and
// blocks until the connection is no longer readable.
//
// ServeConn starts speaking HTTP/2 assuming that c has not had any
// reads or writes. It writes its initial settings frame and expects
// to be able to read the preface and settings frame from the
// client. If c has a ConnectionState method like a *tls.Conn, the
// ConnectionState is used to verify the TLS ciphersuite and to set
// the Request.TLS field in Handlers.
//
// ServeConn does not support h2c by itself. Any h2c support must be
// implemented in terms of providing a suitably-behaving net.Conn.
//
// The opts parameter is optional. If nil, default values are used.
func ( s * Server ) ServeConn ( c net . Conn , opts * ServeConnOpts ) {
baseCtx , cancel := serverConnBaseContext ( c , opts )
defer cancel ( )
sc := & serverConn {
srv : s ,
hs : opts . baseConfig ( ) ,
conn : c ,
baseCtx : baseCtx ,
remoteAddrStr : c . RemoteAddr ( ) . String ( ) ,
bw : newBufferedWriter ( c ) ,
handler : opts . handler ( ) ,
streams : make ( map [ uint32 ] * stream ) ,
readFrameCh : make ( chan readFrameResult ) ,
wantWriteFrameCh : make ( chan FrameWriteRequest , 8 ) ,
serveMsgCh : make ( chan interface { } , 8 ) ,
wroteFrameCh : make ( chan frameWriteResult , 1 ) , // buffered; one send in writeFrameAsync
bodyReadCh : make ( chan bodyReadMsg ) , // buffering doesn't matter either way
doneServing : make ( chan struct { } ) ,
clientMaxStreams : math . MaxUint32 , // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams : s . maxConcurrentStreams ( ) ,
initialStreamSendWindowSize : initialWindowSize ,
maxFrameSize : initialMaxFrameSize ,
headerTableSize : initialHeaderTableSize ,
serveG : newGoroutineLock ( ) ,
pushEnabled : true ,
}
s . state . registerConn ( sc )
defer s . state . unregisterConn ( sc )
// The net/http package sets the write deadline from the
// http.Server.WriteTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already set.
// Write deadlines are set per stream in serverConn.newStream.
// Disarm the net.Conn write deadline here.
if sc . hs . WriteTimeout != 0 {
sc . conn . SetWriteDeadline ( time . Time { } )
}
if s . NewWriteScheduler != nil {
sc . writeSched = s . NewWriteScheduler ( )
} else {
sc . writeSched = NewRandomWriteScheduler ( )
}
// These start at the RFC-specified defaults. If there is a higher
// configured value for inflow, that will be updated when we send a
// WINDOW_UPDATE shortly after sending SETTINGS.
sc . flow . add ( initialWindowSize )
sc . inflow . add ( initialWindowSize )
sc . hpackEncoder = hpack . NewEncoder ( & sc . headerWriteBuf )
fr := NewFramer ( sc . bw , c )
fr . ReadMetaHeaders = hpack . NewDecoder ( initialHeaderTableSize , nil )
fr . MaxHeaderListSize = sc . maxHeaderListSize ( )
fr . SetMaxReadFrameSize ( s . maxReadFrameSize ( ) )
sc . framer = fr
if tc , ok := c . ( connectionStater ) ; ok {
sc . tlsState = new ( tls . ConnectionState )
* sc . tlsState = tc . ConnectionState ( )
// 9.2 Use of TLS Features
// An implementation of HTTP/2 over TLS MUST use TLS
// 1.2 or higher with the restrictions on feature set
// and cipher suite described in this section. Due to
// implementation limitations, it might not be
// possible to fail TLS negotiation. An endpoint MUST
// immediately terminate an HTTP/2 connection that
// does not meet the TLS requirements described in
// this section with a connection error (Section
// 5.4.1) of type INADEQUATE_SECURITY.
if sc . tlsState . Version < tls . VersionTLS12 {
sc . rejectConn ( ErrCodeInadequateSecurity , "TLS version too low" )
return
}
if sc . tlsState . ServerName == "" {
// Client must use SNI, but we don't enforce that anymore,
// since it was causing problems when connecting to bare IP
// addresses during development.
//
// TODO: optionally enforce? Or enforce at the time we receive
// a new request, and verify the the ServerName matches the :authority?
// But that precludes proxy situations, perhaps.
//
// So for now, do nothing here again.
}
if ! s . PermitProhibitedCipherSuites && isBadCipher ( sc . tlsState . CipherSuite ) {
// "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
//
// We choose that. In my opinion, the spec is weak
// here. It also says both parties must support at least
// TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
// excuses here. If we really must, we could allow an
// "AllowInsecureWeakCiphers" option on the server later.
// Let's see how it plays out first.
sc . rejectConn ( ErrCodeInadequateSecurity , fmt . Sprintf ( "Prohibited TLS 1.2 Cipher Suite: %x" , sc . tlsState . CipherSuite ) )
return
}
}
if hook := testHookGetServerConn ; hook != nil {
hook ( sc )
}
sc . serve ( )
}
func ( sc * serverConn ) rejectConn ( err ErrCode , debug string ) {
sc . vlogf ( "http2: server rejecting conn: %v, %s" , err , debug )
// ignoring errors. hanging up anyway.
sc . framer . WriteGoAway ( 0 , err , [ ] byte ( debug ) )
sc . bw . Flush ( )
sc . conn . Close ( )
}
type serverConn struct {
// Immutable:
srv * Server
hs * http . Server
conn net . Conn
bw * bufferedWriter // writing to conn
handler http . Handler
baseCtx contextContext
framer * Framer
doneServing chan struct { } // closed when serverConn.serve ends
readFrameCh chan readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
serveMsgCh chan interface { } // misc messages & code to send to / run on the serve loop
flow flow // conn-wide (not stream-specific) outbound flow control
inflow flow // conn-wide inbound flow control
tlsState * tls . ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched WriteScheduler
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
pushEnabled bool
sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool
unackedSettings int // how many SETTINGS have we sent without ACKs?
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map [ uint32 ] * stream
initialStreamSendWindowSize int32
maxFrameSize int32
headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map [ string ] string // http2-lower-case -> Go-Canonical-Case
writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode
shutdownTimer * time . Timer // nil until used
idleTimer * time . Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes . Buffer
hpackEncoder * hpack . Encoder
// Used by startGracefulShutdown.
shutdownOnce sync . Once
}
func ( sc * serverConn ) maxHeaderListSize ( ) uint32 {
n := sc . hs . MaxHeaderBytes
if n <= 0 {
n = http . DefaultMaxHeaderBytes
}
// http2's count is in a slightly different unit and includes 32 bytes per pair.
// So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
const perFieldOverhead = 32 // per http2 spec
const typicalHeaders = 10 // conservative
return uint32 ( n + typicalHeaders * perFieldOverhead )
}
func ( sc * serverConn ) curOpenStreams ( ) uint32 {
sc . serveG . check ( )
return sc . curClientStreams + sc . curPushedStreams
}
// stream represents a stream. This is the minimal metadata needed by
// the serve goroutine. Most of the actual stream state is owned by
// the http.Handler's goroutine in the responseWriter. Because the
// responseWriter's responseWriterState is recycled at the end of a
// handler, this struct intentionally has no pointer to the
// *responseWriter{,State} itself, as the Handler ending nils out the
// responseWriter's state field.
type stream struct {
// immutable:
sc * serverConn
id uint32
body * pipe // non-nil if expecting DATA frames
cw closeWaiter // closed wait stream transitions to closed state
ctx contextContext
cancelCtx func ( )
// owned by serverConn's serve loop:
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
parent * stream // or nil
numTrailerValues int64
weight uint8
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
writeDeadline * time . Timer // nil if unused
trailer http . Header // accumulated trailers
reqTrailer http . Header // handler's Request.Trailer
}
func ( sc * serverConn ) Framer ( ) * Framer { return sc . framer }
func ( sc * serverConn ) CloseConn ( ) error { return sc . conn . Close ( ) }
func ( sc * serverConn ) Flush ( ) error { return sc . bw . Flush ( ) }
func ( sc * serverConn ) HeaderEncoder ( ) ( * hpack . Encoder , * bytes . Buffer ) {
return sc . hpackEncoder , & sc . headerWriteBuf
}
func ( sc * serverConn ) state ( streamID uint32 ) ( streamState , * stream ) {
sc . serveG . check ( )
// http://tools.ietf.org/html/rfc7540#section-5.1
if st , ok := sc . streams [ streamID ] ; ok {
return st . state , st
}
// "The first use of a new stream identifier implicitly closes all
// streams in the "idle" state that might have been initiated by
// that peer with a lower-valued stream identifier. For example, if
// a client sends a HEADERS frame on stream 7 without ever sending a
// frame on stream 5, then stream 5 transitions to the "closed"
// state when the first frame for stream 7 is sent or received."
if streamID % 2 == 1 {
if streamID <= sc . maxClientStreamID {
return stateClosed , nil
}
} else {
if streamID <= sc . maxPushPromiseID {
return stateClosed , nil
}
}
return stateIdle , nil
}
// setConnState calls the net/http ConnState hook for this connection, if configured.
// Note that the net/http package does StateNew and StateClosed for us.
// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
func ( sc * serverConn ) setConnState ( state http . ConnState ) {
if sc . hs . ConnState != nil {
sc . hs . ConnState ( sc . conn , state )
}
}
func ( sc * serverConn ) vlogf ( format string , args ... interface { } ) {
if VerboseLogs {
sc . logf ( format , args ... )
}
}
func ( sc * serverConn ) logf ( format string , args ... interface { } ) {
if lg := sc . hs . ErrorLog ; lg != nil {
lg . Printf ( format , args ... )
} else {
log . Printf ( format , args ... )
}
}
// errno returns v's underlying uintptr, else 0.
//
// TODO: remove this helper function once http2 can use build
// tags. See comment in isClosedConnError.
func errno ( v error ) uintptr {
if rv := reflect . ValueOf ( v ) ; rv . Kind ( ) == reflect . Uintptr {
return uintptr ( rv . Uint ( ) )
}
return 0
}
// isClosedConnError reports whether err is an error from use of a closed
// network connection.
func isClosedConnError ( err error ) bool {
if err == nil {
return false
}
// TODO: remove this string search and be more like the Windows
// case below. That might involve modifying the standard library
// to return better error types.
str := err . Error ( )
if strings . Contains ( str , "use of closed network connection" ) {
return true
}
// TODO(bradfitz): x/tools/cmd/bundle doesn't really support
// build tags, so I can't make an http2_windows.go file with
// Windows-specific stuff. Fix that and move this, once we
// have a way to bundle this into std's net/http somehow.
if runtime . GOOS == "windows" {
if oe , ok := err . ( * net . OpError ) ; ok && oe . Op == "read" {
if se , ok := oe . Err . ( * os . SyscallError ) ; ok && se . Syscall == "wsarecv" {
const WSAECONNABORTED = 10053
const WSAECONNRESET = 10054
if n := errno ( se . Err ) ; n == WSAECONNRESET || n == WSAECONNABORTED {
return true
}
}
}
}
return false
}
func ( sc * serverConn ) condlogf ( err error , format string , args ... interface { } ) {
if err == nil {
return
}
if err == io . EOF || err == io . ErrUnexpectedEOF || isClosedConnError ( err ) {
// Boring, expected errors.
sc . vlogf ( format , args ... )
} else {
sc . logf ( format , args ... )
}
}
func ( sc * serverConn ) canonicalHeader ( v string ) string {
sc . serveG . check ( )
cv , ok := commonCanonHeader [ v ]
if ok {
return cv
}
cv , ok = sc . canonHeader [ v ]
if ok {
return cv
}
if sc . canonHeader == nil {
sc . canonHeader = make ( map [ string ] string )
}
cv = http . CanonicalHeaderKey ( v )
sc . canonHeader [ v ] = cv
return cv
}
type readFrameResult struct {
f Frame // valid until readMore is called
err error
// readMore should be called once the consumer no longer needs or
// retains f. After readMore, f is invalid and more frames can be
// read.
readMore func ( )
}
// readFrames is the loop that reads incoming frames.
// It takes care to only read one frame at a time, blocking until the
// consumer is done with the frame.
// It's run on its own goroutine.
func ( sc * serverConn ) readFrames ( ) {
gate := make ( gate )
gateDone := gate . Done
for {
f , err := sc . framer . ReadFrame ( )
select {
case sc . readFrameCh <- readFrameResult { f , err , gateDone } :
case <- sc . doneServing :
return
}
select {
case <- gate :
case <- sc . doneServing :
return
}
if terminalReadFrameError ( err ) {
return
}
}
}
// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
type frameWriteResult struct {
wr FrameWriteRequest // what was written (or attempted)
err error // result of the writeFrame call
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func ( sc * serverConn ) writeFrameAsync ( wr FrameWriteRequest ) {
err := wr . write . writeFrame ( sc )
sc . wroteFrameCh <- frameWriteResult { wr , err }
}
func ( sc * serverConn ) closeAllStreamsOnConnClose ( ) {
sc . serveG . check ( )
for _ , st := range sc . streams {
sc . closeStream ( st , errClientDisconnected )
}
}
func ( sc * serverConn ) stopShutdownTimer ( ) {
sc . serveG . check ( )
if t := sc . shutdownTimer ; t != nil {
t . Stop ( )
}
}
func ( sc * serverConn ) notePanic ( ) {
// Note: this is for serverConn.serve panicking, not http.Handler code.
if testHookOnPanicMu != nil {
testHookOnPanicMu . Lock ( )
defer testHookOnPanicMu . Unlock ( )
}
if testHookOnPanic != nil {
if e := recover ( ) ; e != nil {
if testHookOnPanic ( sc , e ) {
panic ( e )
}
}
}
}
func ( sc * serverConn ) serve ( ) {
sc . serveG . check ( )
defer sc . notePanic ( )
defer sc . conn . Close ( )
defer sc . closeAllStreamsOnConnClose ( )
defer sc . stopShutdownTimer ( )
defer close ( sc . doneServing ) // unblocks handlers trying to send
if VerboseLogs {
sc . vlogf ( "http2: server connection from %v on %p" , sc . conn . RemoteAddr ( ) , sc . hs )
}
sc . writeFrame ( FrameWriteRequest {
write : writeSettings {
{ SettingMaxFrameSize , sc . srv . maxReadFrameSize ( ) } ,
{ SettingMaxConcurrentStreams , sc . advMaxStreams } ,
{ SettingMaxHeaderListSize , sc . maxHeaderListSize ( ) } ,
{ SettingInitialWindowSize , uint32 ( sc . srv . initialStreamRecvWindowSize ( ) ) } ,
} ,
} )
sc . unackedSettings ++
// Each connection starts with intialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
if diff := sc . srv . initialConnRecvWindowSize ( ) - initialWindowSize ; diff > 0 {
sc . sendWindowUpdate ( nil , int ( diff ) )
}
if err := sc . readPreface ( ) ; err != nil {
sc . condlogf ( err , "http2: server: error reading preface from client %v: %v" , sc . conn . RemoteAddr ( ) , err )
return
}
// Now that we've got the preface, get us out of the
// "StateNew" state. We can't go directly to idle, though.
// Active means we read some data and anticipate a request. We'll
// do another Active when we get a HEADERS frame.
sc . setConnState ( http . StateActive )
sc . setConnState ( http . StateIdle )
if sc . srv . IdleTimeout != 0 {
sc . idleTimer = time . AfterFunc ( sc . srv . IdleTimeout , sc . onIdleTimer )
defer sc . idleTimer . Stop ( )
}
go sc . readFrames ( ) // closed by defer sc.conn.Close above
settingsTimer := time . AfterFunc ( firstSettingsTimeout , sc . onSettingsTimer )
defer settingsTimer . Stop ( )
loopNum := 0
for {
loopNum ++
select {
case wr := <- sc . wantWriteFrameCh :
if se , ok := wr . write . ( StreamError ) ; ok {
sc . resetStream ( se )
break
}
sc . writeFrame ( wr )
case res := <- sc . wroteFrameCh :
sc . wroteFrame ( res )
case res := <- sc . readFrameCh :
if ! sc . processFrameFromReader ( res ) {
return
}
res . readMore ( )
if settingsTimer != nil {
settingsTimer . Stop ( )
settingsTimer = nil
}
case m := <- sc . bodyReadCh :
sc . noteBodyRead ( m . st , m . n )
case msg := <- sc . serveMsgCh :
switch v := msg . ( type ) {
case func ( int ) :
v ( loopNum ) // for testing
case * serverMessage :
switch v {
case settingsTimerMsg :
sc . logf ( "timeout waiting for SETTINGS frames from %v" , sc . conn . RemoteAddr ( ) )
return
case idleTimerMsg :
sc . vlogf ( "connection is idle" )
sc . goAway ( ErrCodeNo )
case shutdownTimerMsg :
sc . vlogf ( "GOAWAY close timer fired; closing conn from %v" , sc . conn . RemoteAddr ( ) )
return
case gracefulShutdownMsg :
sc . startGracefulShutdownInternal ( )
default :
panic ( "unknown timer" )
}
case * startPushRequest :
sc . startPush ( v )
default :
panic ( fmt . Sprintf ( "unexpected type %T" , v ) )
}
}
if sc . inGoAway && sc . curOpenStreams ( ) == 0 && ! sc . needToSendGoAway && ! sc . writingFrame {
return
}
}
}
func ( sc * serverConn ) awaitGracefulShutdown ( sharedCh <- chan struct { } , privateCh chan struct { } ) {
select {
case <- sc . doneServing :
case <- sharedCh :
close ( privateCh )
}
}
type serverMessage int
// Message values sent to serveMsgCh.
var (
settingsTimerMsg = new ( serverMessage )
idleTimerMsg = new ( serverMessage )
shutdownTimerMsg = new ( serverMessage )
gracefulShutdownMsg = new ( serverMessage )
)
func ( sc * serverConn ) onSettingsTimer ( ) { sc . sendServeMsg ( settingsTimerMsg ) }
func ( sc * serverConn ) onIdleTimer ( ) { sc . sendServeMsg ( idleTimerMsg ) }
func ( sc * serverConn ) onShutdownTimer ( ) { sc . sendServeMsg ( shutdownTimerMsg ) }
func ( sc * serverConn ) sendServeMsg ( msg interface { } ) {
sc . serveG . checkNotOn ( ) // NOT
select {
case sc . serveMsgCh <- msg :
case <- sc . doneServing :
}
}
// readPreface reads the ClientPreface greeting from the peer
// or returns an error on timeout or an invalid greeting.
func ( sc * serverConn ) readPreface ( ) error {
errc := make ( chan error , 1 )
go func ( ) {
// Read the client preface
buf := make ( [ ] byte , len ( ClientPreface ) )
if _ , err := io . ReadFull ( sc . conn , buf ) ; err != nil {
errc <- err
} else if ! bytes . Equal ( buf , clientPreface ) {
errc <- fmt . Errorf ( "bogus greeting %q" , buf )
} else {
errc <- nil
}
} ( )
timer := time . NewTimer ( prefaceTimeout ) // TODO: configurable on *Server?
defer timer . Stop ( )
select {
case <- timer . C :
return errors . New ( "timeout waiting for client preface" )
case err := <- errc :
if err == nil {
if VerboseLogs {
sc . vlogf ( "http2: server: client %v said hello" , sc . conn . RemoteAddr ( ) )
}
}
return err
}
}
var errChanPool = sync . Pool {
New : func ( ) interface { } { return make ( chan error , 1 ) } ,
}
var writeDataPool = sync . Pool {
New : func ( ) interface { } { return new ( writeData ) } ,
}
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func ( sc * serverConn ) writeDataFromHandler ( stream * stream , data [ ] byte , endStream bool ) error {
ch := errChanPool . Get ( ) . ( chan error )
writeArg := writeDataPool . Get ( ) . ( * writeData )
* writeArg = writeData { stream . id , data , endStream }
err := sc . writeFrameFromHandler ( FrameWriteRequest {
write : writeArg ,
stream : stream ,
done : ch ,
} )
if err != nil {
return err
}
var frameWriteDone bool // the frame write is done (successfully or not)
select {
case err = <- ch :
frameWriteDone = true
case <- sc . doneServing :
return errClientDisconnected
case <- stream . cw :
// If both ch and stream.cw were ready (as might
// happen on the final Write after an http.Handler
// ends), prefer the write result. Otherwise this
// might just be us successfully closing the stream.
// The writeFrameAsync and serve goroutines guarantee
// that the ch send will happen before the stream.cw
// close.
select {
case err = <- ch :
frameWriteDone = true
default :
return errStreamClosed
}
}
errChanPool . Put ( ch )
if frameWriteDone {
writeDataPool . Put ( writeArg )
}
return err
}
// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
// if the connection has gone away.
//
// This must not be run from the serve goroutine itself, else it might
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
func ( sc * serverConn ) writeFrameFromHandler ( wr FrameWriteRequest ) error {
sc . serveG . checkNotOn ( ) // NOT
select {
case sc . wantWriteFrameCh <- wr :
return nil
case <- sc . doneServing :
// Serve loop is gone.
// Client has closed their connection to the server.
return errClientDisconnected
}
}
// writeFrame schedules a frame to write and sends it if there's nothing
// already being written.
//
// There is no pushback here (the serve goroutine never blocks). It's
// the http.Handlers that block, waiting for their previous frames to
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
func ( sc * serverConn ) writeFrame ( wr FrameWriteRequest ) {
sc . serveG . check ( )
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
// We are not allowed to write frames on closed streams. RFC 7540 Section
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
// a closed stream." Our server never sends PRIORITY, so that exception
// does not apply.
//
// The serverConn might close an open stream while the stream's handler
// is still running. For example, the server might close a stream when it
// receives bad data from the client. If this happens, the handler might
// attempt to write a frame after the stream has been closed (since the
// handler hasn't yet been notified of the close). In this case, we simply
// ignore the frame. The handler will notice that the stream is closed when
// it waits for the frame to be written.
//
// As an exception to this rule, we allow sending RST_STREAM after close.
// This allows us to immediately reject new streams without tracking any
// state for those streams (except for the queued RST_STREAM frame). This
// may result in duplicate RST_STREAMs in some cases, but the client should
// ignore those.
if wr . StreamID ( ) != 0 {
_ , isReset := wr . write . ( StreamError )
if state , _ := sc . state ( wr . StreamID ( ) ) ; state == stateClosed && ! isReset {
ignoreWrite = true
}
}
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
switch wr . write . ( type ) {
case * writeResHeaders :
wr . stream . wroteHeaders = true
case write100ContinueHeadersFrame :
if wr . stream . wroteHeaders {
// We do not need to notify wr.done because this frame is
// never written with wr.done != nil.
if wr . done != nil {
panic ( "wr.done != nil for write100ContinueHeadersFrame" )
}
ignoreWrite = true
}
}
if ! ignoreWrite {
sc . writeSched . Push ( wr )
}
sc . scheduleFrameWrite ( )
}
// startFrameWrite starts a goroutine to write wr (in a separate
// goroutine since that might block on the network), and updates the
// serve goroutine's state about the world, updated from info in wr.
func ( sc * serverConn ) startFrameWrite ( wr FrameWriteRequest ) {
sc . serveG . check ( )
if sc . writingFrame {
panic ( "internal error: can only be writing one frame at a time" )
}
st := wr . stream
if st != nil {
switch st . state {
case stateHalfClosedLocal :
switch wr . write . ( type ) {
case StreamError , handlerPanicRST , writeWindowUpdate :
// RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
// in this state. (We never send PRIORITY from the server, so that is not checked.)
default :
panic ( fmt . Sprintf ( "internal error: attempt to send frame on a half-closed-local stream: %v" , wr ) )
}
case stateClosed :
panic ( fmt . Sprintf ( "internal error: attempt to send frame on a closed stream: %v" , wr ) )
}
}
if wpp , ok := wr . write . ( * writePushPromise ) ; ok {
var err error
wpp . promisedID , err = wpp . allocatePromisedID ( )
if err != nil {
sc . writingFrameAsync = false
wr . replyToWriter ( err )
return
}
}
sc . writingFrame = true
sc . needsFrameFlush = true
if wr . write . staysWithinBuffer ( sc . bw . Available ( ) ) {
sc . writingFrameAsync = false
err := wr . write . writeFrame ( sc )
sc . wroteFrame ( frameWriteResult { wr , err } )
} else {
sc . writingFrameAsync = true
go sc . writeFrameAsync ( wr )
}
}
// errHandlerPanicked is the error given to any callers blocked in a read from
// Request.Body when the main goroutine panics. Since most handlers read in the
// the main ServeHTTP goroutine, this will show up rarely.
var errHandlerPanicked = errors . New ( "http2: handler panicked" )
// wroteFrame is called on the serve goroutine with the result of
// whatever happened on writeFrameAsync.
func ( sc * serverConn ) wroteFrame ( res frameWriteResult ) {
sc . serveG . check ( )
if ! sc . writingFrame {
panic ( "internal error: expected to be already writing a frame" )
}
sc . writingFrame = false
sc . writingFrameAsync = false
wr := res . wr
if writeEndsStream ( wr . write ) {
st := wr . stream
if st == nil {
panic ( "internal error: expecting non-nil stream" )
}
switch st . state {
case stateOpen :
// Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
// for closing a ResponseWriter while still
// reading data (see possible TODO at top of
// this file), we go into closed state here
// anyway, after telling the peer we're
// hanging up on them. We'll transition to
// stateClosed after the RST_STREAM frame is
// written.
st . state = stateHalfClosedLocal
// Section 8.1: a server MAY request that the client abort
// transmission of a request without error by sending a
// RST_STREAM with an error code of NO_ERROR after sending
// a complete response.
sc . resetStream ( streamError ( st . id , ErrCodeNo ) )
case stateHalfClosedRemote :
sc . closeStream ( st , errHandlerComplete )
}
} else {
switch v := wr . write . ( type ) {
case StreamError :
// st may be unknown if the RST_STREAM was generated to reject bad input.
if st , ok := sc . streams [ v . StreamID ] ; ok {
sc . closeStream ( st , v )
}
case handlerPanicRST :
sc . closeStream ( wr . stream , errHandlerPanicked )
}
}
// Reply (if requested) to unblock the ServeHTTP goroutine.
wr . replyToWriter ( res . err )
sc . scheduleFrameWrite ( )
}
// scheduleFrameWrite tickles the frame writing scheduler.
//
// If a frame is already being written, nothing happens. This will be called again
// when the frame is done being written.
//
// If a frame isn't being written we need to send one, the best frame
// to send is selected, preferring first things that aren't
// stream-specific (e.g. ACKing settings), and then finding the
// highest priority stream.
//
// If a frame isn't being written and there's nothing else to send, we
// flush the write buffer.
func ( sc * serverConn ) scheduleFrameWrite ( ) {
sc . serveG . check ( )
if sc . writingFrame || sc . inFrameScheduleLoop {
return
}
sc . inFrameScheduleLoop = true
for ! sc . writingFrameAsync {
if sc . needToSendGoAway {
sc . needToSendGoAway = false
sc . startFrameWrite ( FrameWriteRequest {
write : & writeGoAway {
maxStreamID : sc . maxClientStreamID ,
code : sc . goAwayCode ,
} ,
} )
continue
}
if sc . needToSendSettingsAck {
sc . needToSendSettingsAck = false
sc . startFrameWrite ( FrameWriteRequest { write : writeSettingsAck { } } )
continue
}
if ! sc . inGoAway || sc . goAwayCode == ErrCodeNo {
if wr , ok := sc . writeSched . Pop ( ) ; ok {
sc . startFrameWrite ( wr )
continue
}
}
if sc . needsFrameFlush {
sc . startFrameWrite ( FrameWriteRequest { write : flushFrameWriter { } } )
sc . needsFrameFlush = false // after startFrameWrite, since it sets this true
continue
}
break
}
sc . inFrameScheduleLoop = false
}
// startGracefulShutdown gracefully shuts down a connection. This
// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
// shutting down. The connection isn't closed until all current
// streams are done.
//
// startGracefulShutdown returns immediately; it does not wait until
// the connection has shut down.
func ( sc * serverConn ) startGracefulShutdown ( ) {
sc . serveG . checkNotOn ( ) // NOT
sc . shutdownOnce . Do ( func ( ) { sc . sendServeMsg ( gracefulShutdownMsg ) } )
}
func ( sc * serverConn ) startGracefulShutdownInternal ( ) {
sc . goAwayIn ( ErrCodeNo , 0 )
}
func ( sc * serverConn ) goAway ( code ErrCode ) {
sc . serveG . check ( )
var forceCloseIn time . Duration
if code != ErrCodeNo {
forceCloseIn = 250 * time . Millisecond
} else {
// TODO: configurable
forceCloseIn = 1 * time . Second
}
sc . goAwayIn ( code , forceCloseIn )
}
func ( sc * serverConn ) goAwayIn ( code ErrCode , forceCloseIn time . Duration ) {
sc . serveG . check ( )
if sc . inGoAway {
return
}
if forceCloseIn != 0 {
sc . shutDownIn ( forceCloseIn )
}
sc . inGoAway = true
sc . needToSendGoAway = true
sc . goAwayCode = code
sc . scheduleFrameWrite ( )
}
func ( sc * serverConn ) shutDownIn ( d time . Duration ) {
sc . serveG . check ( )
sc . shutdownTimer = time . AfterFunc ( d , sc . onShutdownTimer )
}
func ( sc * serverConn ) resetStream ( se StreamError ) {
sc . serveG . check ( )
sc . writeFrame ( FrameWriteRequest { write : se } )
if st , ok := sc . streams [ se . StreamID ] ; ok {
st . resetQueued = true
}
}
// processFrameFromReader processes the serve loop's read from readFrameCh from the
// frame-reading goroutine.
// processFrameFromReader returns whether the connection should be kept open.
func ( sc * serverConn ) processFrameFromReader ( res readFrameResult ) bool {
sc . serveG . check ( )
err := res . err
if err != nil {
if err == ErrFrameTooLarge {
sc . goAway ( ErrCodeFrameSize )
return true // goAway will close the loop
}
clientGone := err == io . EOF || err == io . ErrUnexpectedEOF || isClosedConnError ( err )
if clientGone {
// TODO: could we also get into this state if
// the peer does a half close
// (e.g. CloseWrite) because they're done
// sending frames but they're still wanting
// our open replies? Investigate.
// TODO: add CloseWrite to crypto/tls.Conn first
// so we have a way to test this? I suppose
// just for testing we could have a non-TLS mode.
return false
}
} else {
f := res . f
if VerboseLogs {
sc . vlogf ( "http2: server read frame %v" , summarizeFrame ( f ) )
}
err = sc . processFrame ( f )
if err == nil {
return true
}
}
switch ev := err . ( type ) {
case StreamError :
sc . resetStream ( ev )
return true
case goAwayFlowError :
sc . goAway ( ErrCodeFlowControl )
return true
case ConnectionError :
sc . logf ( "http2: server connection error from %v: %v" , sc . conn . RemoteAddr ( ) , ev )
sc . goAway ( ErrCode ( ev ) )
return true // goAway will handle shutdown
default :
if res . err != nil {
sc . vlogf ( "http2: server closing client connection; error reading frame from client %s: %v" , sc . conn . RemoteAddr ( ) , err )
} else {
sc . logf ( "http2: server closing client connection: %v" , err )
}
return false
}
}
func ( sc * serverConn ) processFrame ( f Frame ) error {
sc . serveG . check ( )
// First frame received must be SETTINGS.
if ! sc . sawFirstSettings {
if _ , ok := f . ( * SettingsFrame ) ; ! ok {
return ConnectionError ( ErrCodeProtocol )
}
sc . sawFirstSettings = true
}
switch f := f . ( type ) {
case * SettingsFrame :
return sc . processSettings ( f )
case * MetaHeadersFrame :
return sc . processHeaders ( f )
case * WindowUpdateFrame :
return sc . processWindowUpdate ( f )
case * PingFrame :
return sc . processPing ( f )
case * DataFrame :
return sc . processData ( f )
case * RSTStreamFrame :
return sc . processResetStream ( f )
case * PriorityFrame :
return sc . processPriority ( f )
case * GoAwayFrame :
return sc . processGoAway ( f )
case * PushPromiseFrame :
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError ( ErrCodeProtocol )
default :
sc . vlogf ( "http2: server ignoring frame: %v" , f . Header ( ) )
return nil
}
}
func ( sc * serverConn ) processPing ( f * PingFrame ) error {
sc . serveG . check ( )
if f . IsAck ( ) {
// 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil
}
if f . StreamID != 0 {
// "PING frames are not associated with any individual
// stream. If a PING frame is received with a stream
// identifier field value other than 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR."
return ConnectionError ( ErrCodeProtocol )
}
if sc . inGoAway && sc . goAwayCode != ErrCodeNo {
return nil
}
sc . writeFrame ( FrameWriteRequest { write : writePingAck { f } } )
return nil
}
func ( sc * serverConn ) processWindowUpdate ( f * WindowUpdateFrame ) error {
sc . serveG . check ( )
switch {
case f . StreamID != 0 : // stream-level flow control
state , st := sc . state ( f . StreamID )
if state == stateIdle {
// Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return ConnectionError ( ErrCodeProtocol )
}
if st == nil {
// "WINDOW_UPDATE can be sent by a peer that has sent a
// frame bearing the END_STREAM flag. This means that a
// receiver could receive a WINDOW_UPDATE frame on a "half
// closed (remote)" or "closed" stream. A receiver MUST
// NOT treat this as an error, see Section 5.1."
return nil
}
if ! st . flow . add ( int32 ( f . Increment ) ) {
return streamError ( f . StreamID , ErrCodeFlowControl )
}
default : // connection-level flow control
if ! sc . flow . add ( int32 ( f . Increment ) ) {
return goAwayFlowError { }
}
}
sc . scheduleFrameWrite ( )
return nil
}
func ( sc * serverConn ) processResetStream ( f * RSTStreamFrame ) error {
sc . serveG . check ( )
state , st := sc . state ( f . StreamID )
if state == stateIdle {
// 6.4 "RST_STREAM frames MUST NOT be sent for a
// stream in the "idle" state. If a RST_STREAM frame
// identifying an idle stream is received, the
// recipient MUST treat this as a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError ( ErrCodeProtocol )
}
if st != nil {
st . cancelCtx ( )
sc . closeStream ( st , streamError ( f . StreamID , f . ErrCode ) )
}
return nil
}
func ( sc * serverConn ) closeStream ( st * stream , err error ) {
sc . serveG . check ( )
if st . state == stateIdle || st . state == stateClosed {
panic ( fmt . Sprintf ( "invariant; can't close stream in state %v" , st . state ) )
}
st . state = stateClosed
if st . writeDeadline != nil {
st . writeDeadline . Stop ( )
}
if st . isPushed ( ) {
sc . curPushedStreams --
} else {
sc . curClientStreams --
}
delete ( sc . streams , st . id )
if len ( sc . streams ) == 0 {
sc . setConnState ( http . StateIdle )
if sc . srv . IdleTimeout != 0 {
sc . idleTimer . Reset ( sc . srv . IdleTimeout )
}
if h1ServerKeepAlivesDisabled ( sc . hs ) {
sc . startGracefulShutdownInternal ( )
}
}
if p := st . body ; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
sc . sendWindowUpdate ( nil , p . Len ( ) )
p . CloseWithError ( err )
}
st . cw . Close ( ) // signals Handler's CloseNotifier, unblocks writes, etc
sc . writeSched . CloseStream ( st . id )
}
func ( sc * serverConn ) processSettings ( f * SettingsFrame ) error {
sc . serveG . check ( )
if f . IsAck ( ) {
sc . unackedSettings --
if sc . unackedSettings < 0 {
// Why is the peer ACKing settings we never sent?
// The spec doesn't mention this case, but
// hang up on them anyway.
return ConnectionError ( ErrCodeProtocol )
}
return nil
}
if err := f . ForeachSetting ( sc . processSetting ) ; err != nil {
return err
}
sc . needToSendSettingsAck = true
sc . scheduleFrameWrite ( )
return nil
}
func ( sc * serverConn ) processSetting ( s Setting ) error {
sc . serveG . check ( )
if err := s . Valid ( ) ; err != nil {
return err
}
if VerboseLogs {
sc . vlogf ( "http2: server processing setting %v" , s )
}
switch s . ID {
case SettingHeaderTableSize :
sc . headerTableSize = s . Val
sc . hpackEncoder . SetMaxDynamicTableSize ( s . Val )
case SettingEnablePush :
sc . pushEnabled = s . Val != 0
case SettingMaxConcurrentStreams :
sc . clientMaxStreams = s . Val
case SettingInitialWindowSize :
return sc . processSettingInitialWindowSize ( s . Val )
case SettingMaxFrameSize :
sc . maxFrameSize = int32 ( s . Val ) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize :
sc . peerMaxHeaderListSize = s . Val
default :
// Unknown setting: "An endpoint that receives a SETTINGS
// frame with any unknown or unsupported identifier MUST
// ignore that setting."
if VerboseLogs {
sc . vlogf ( "http2: server ignoring unknown setting %v" , s )
}
}
return nil
}
func ( sc * serverConn ) processSettingInitialWindowSize ( val uint32 ) error {
sc . serveG . check ( )
// Note: val already validated to be within range by
// processSetting's Valid call.
// "A SETTINGS frame can alter the initial flow control window
// size for all current streams. When the value of
// SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
// adjust the size of all stream flow control windows that it
// maintains by the difference between the new value and the
// old value."
old := sc . initialStreamSendWindowSize
sc . initialStreamSendWindowSize = int32 ( val )
growth := int32 ( val ) - old // may be negative
for _ , st := range sc . streams {
if ! st . flow . add ( growth ) {
// 6.9.2 Initial Flow Control Window Size
// "An endpoint MUST treat a change to
// SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
// control window to exceed the maximum size as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR."
return ConnectionError ( ErrCodeFlowControl )
}
}
return nil
}
func ( sc * serverConn ) processData ( f * DataFrame ) error {
sc . serveG . check ( )
if sc . inGoAway && sc . goAwayCode != ErrCodeNo {
return nil
}
data := f . Data ( )
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f . Header ( ) . StreamID
state , st := sc . state ( id )
if id == 0 || state == stateIdle {
// Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return ConnectionError ( ErrCodeProtocol )
}
if st == nil || state != stateOpen || st . gotTrailerHeader || st . resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
// done writing). Try to stop the client from sending
// more DATA.
// But still enforce their connection-level flow control,
// and return any flow control bytes since we're not going
// to consume them.
if sc . inflow . available ( ) < int32 ( f . Length ) {
return streamError ( id , ErrCodeFlowControl )
}
// Deduct the flow control from inflow, since we're
// going to immediately add it back in
// sendWindowUpdate, which also schedules sending the
// frames.
sc . inflow . take ( int32 ( f . Length ) )
sc . sendWindowUpdate ( nil , int ( f . Length ) ) // conn-level
if st != nil && st . resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
return streamError ( id , ErrCodeStreamClosed )
}
if st . body == nil {
panic ( "internal error: should have a body in this state" )
}
// Sender sending more than they'd declared?
if st . declBodyBytes != - 1 && st . bodyBytes + int64 ( len ( data ) ) > st . declBodyBytes {
st . body . CloseWithError ( fmt . Errorf ( "sender tried to send more than declared Content-Length of %d bytes" , st . declBodyBytes ) )
return streamError ( id , ErrCodeStreamClosed )
}
if f . Length > 0 {
// Check whether the client has flow control quota.
if st . inflow . available ( ) < int32 ( f . Length ) {
return streamError ( id , ErrCodeFlowControl )
}
st . inflow . take ( int32 ( f . Length ) )
if len ( data ) > 0 {
wrote , err := st . body . Write ( data )
if err != nil {
return streamError ( id , ErrCodeStreamClosed )
}
if wrote != len ( data ) {
panic ( "internal error: bad Writer" )
}
st . bodyBytes += int64 ( len ( data ) )
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
if pad := int32 ( f . Length ) - int32 ( len ( data ) ) ; pad > 0 {
sc . sendWindowUpdate32 ( nil , pad )
sc . sendWindowUpdate32 ( st , pad )
}
}
if f . StreamEnded ( ) {
st . endStream ( )
}
return nil
}
func ( sc * serverConn ) processGoAway ( f * GoAwayFrame ) error {
sc . serveG . check ( )
if f . ErrCode != ErrCodeNo {
sc . logf ( "http2: received GOAWAY %+v, starting graceful shutdown" , f )
} else {
sc . vlogf ( "http2: received GOAWAY %+v, starting graceful shutdown" , f )
}
sc . startGracefulShutdownInternal ( )
// http://tools.ietf.org/html/rfc7540#section-6.8
// We should not create any new streams, which means we should disable push.
sc . pushEnabled = false
return nil
}
// isPushed reports whether the stream is server-initiated.
func ( st * stream ) isPushed ( ) bool {
return st . id % 2 == 0
}
// endStream closes a Request.Body's pipe. It is called when a DATA
// frame says a request body is over (or after trailers).
func ( st * stream ) endStream ( ) {
sc := st . sc
sc . serveG . check ( )
if st . declBodyBytes != - 1 && st . declBodyBytes != st . bodyBytes {
st . body . CloseWithError ( fmt . Errorf ( "request declared a Content-Length of %d but only wrote %d bytes" ,
st . declBodyBytes , st . bodyBytes ) )
} else {
st . body . closeWithErrorAndCode ( io . EOF , st . copyTrailersToHandlerRequest )
st . body . CloseWithError ( io . EOF )
}
st . state = stateHalfClosedRemote
}
// copyTrailersToHandlerRequest is run in the Handler's goroutine in
// its Request.Body.Read just before it gets io.EOF.
func ( st * stream ) copyTrailersToHandlerRequest ( ) {
for k , vv := range st . trailer {
if _ , ok := st . reqTrailer [ k ] ; ok {
// Only copy it over it was pre-declared.
st . reqTrailer [ k ] = vv
}
}
}
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
// when the stream's WriteTimeout has fired.
func ( st * stream ) onWriteTimeout ( ) {
st . sc . writeFrameFromHandler ( FrameWriteRequest { write : streamError ( st . id , ErrCodeInternal ) } )
}
func ( sc * serverConn ) processHeaders ( f * MetaHeadersFrame ) error {
sc . serveG . check ( )
id := f . StreamID
if sc . inGoAway {
// Ignore.
return nil
}
// http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
if id % 2 != 1 {
return ConnectionError ( ErrCodeProtocol )
}
// A HEADERS frame can be used to create a new stream or
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
if st := sc . streams [ f . StreamID ] ; st != nil {
if st . resetQueued {
// We're sending RST_STREAM to close the stream, so don't bother
// processing this frame.
return nil
}
return st . processTrailerHeaders ( f )
}
// [...] The identifier of a newly established stream MUST be
// numerically greater than all streams that the initiating
// endpoint has opened or reserved. [...] An endpoint that
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc . maxClientStreamID {
return ConnectionError ( ErrCodeProtocol )
}
sc . maxClientStreamID = id
if sc . idleTimer != nil {
sc . idleTimer . Stop ( )
}
// http://tools.ietf.org/html/rfc7540#section-5.1.2
// [...] Endpoints MUST NOT exceed the limit set by their peer. An
// endpoint that receives a HEADERS frame that causes their
// advertised concurrent stream limit to be exceeded MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
// or REFUSED_STREAM.
if sc . curClientStreams + 1 > sc . advMaxStreams {
if sc . unackedSettings == 0 {
// They should know better.
return streamError ( id , ErrCodeProtocol )
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
return streamError ( id , ErrCodeRefusedStream )
}
initialState := stateOpen
if f . StreamEnded ( ) {
initialState = stateHalfClosedRemote
}
st := sc . newStream ( id , 0 , initialState )
if f . HasPriority ( ) {
if err := checkPriority ( f . StreamID , f . Priority ) ; err != nil {
return err
}
sc . writeSched . AdjustStream ( st . id , f . Priority )
}
rw , req , err := sc . newWriterAndRequest ( st , f )
if err != nil {
return err
}
st . reqTrailer = req . Trailer
if st . reqTrailer != nil {
st . trailer = make ( http . Header )
}
st . body = req . Body . ( * requestBody ) . pipe // may be nil
st . declBodyBytes = req . ContentLength
handler := sc . handler . ServeHTTP
if f . Truncated {
// Their header list was too long. Send a 431 error.
handler = handleHeaderListTooLong
} else if err := checkValidHTTP2RequestHeaders ( req . Header ) ; err != nil {
handler = new400Handler ( err )
}
// The net/http package sets the read deadline from the
// http.Server.ReadTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
// set. Disarm it here after the request headers are read,
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
if sc . hs . ReadTimeout != 0 {
sc . conn . SetReadDeadline ( time . Time { } )
}
go sc . runHandler ( rw , req , handler )
return nil
}
func ( st * stream ) processTrailerHeaders ( f * MetaHeadersFrame ) error {
sc := st . sc
sc . serveG . check ( )
if st . gotTrailerHeader {
return ConnectionError ( ErrCodeProtocol )
}
st . gotTrailerHeader = true
if ! f . StreamEnded ( ) {
return streamError ( st . id , ErrCodeProtocol )
}
if len ( f . PseudoFields ( ) ) > 0 {
return streamError ( st . id , ErrCodeProtocol )
}
if st . trailer != nil {
for _ , hf := range f . RegularFields ( ) {
key := sc . canonicalHeader ( hf . Name )
if ! ValidTrailerHeader ( key ) {
// TODO: send more details to the peer somehow. But http2 has
// no way to send debug data at a stream level. Discuss with
// HTTP folk.
return streamError ( st . id , ErrCodeProtocol )
}
st . trailer [ key ] = append ( st . trailer [ key ] , hf . Value )
}
}
st . endStream ( )
return nil
}
func checkPriority ( streamID uint32 , p PriorityParam ) error {
if streamID == p . StreamDep {
// Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
// Section 5.3.3 says that a stream can depend on one of its dependencies,
// so it's only self-dependencies that are forbidden.
return streamError ( streamID , ErrCodeProtocol )
}
return nil
}
func ( sc * serverConn ) processPriority ( f * PriorityFrame ) error {
if sc . inGoAway {
return nil
}
if err := checkPriority ( f . StreamID , f . PriorityParam ) ; err != nil {
return err
}
sc . writeSched . AdjustStream ( f . StreamID , f . PriorityParam )
return nil
}
func ( sc * serverConn ) newStream ( id , pusherID uint32 , state streamState ) * stream {
sc . serveG . check ( )
if id == 0 {
panic ( "internal error: cannot create stream with id 0" )
}
ctx , cancelCtx := contextWithCancel ( sc . baseCtx )
st := & stream {
sc : sc ,
id : id ,
state : state ,
ctx : ctx ,
cancelCtx : cancelCtx ,
}
st . cw . Init ( )
st . flow . conn = & sc . flow // link to conn-level counter
st . flow . add ( sc . initialStreamSendWindowSize )
st . inflow . conn = & sc . inflow // link to conn-level counter
st . inflow . add ( sc . srv . initialStreamRecvWindowSize ( ) )
if sc . hs . WriteTimeout != 0 {
st . writeDeadline = time . AfterFunc ( sc . hs . WriteTimeout , st . onWriteTimeout )
}
sc . streams [ id ] = st
sc . writeSched . OpenStream ( st . id , OpenStreamOptions { PusherID : pusherID } )
if st . isPushed ( ) {
sc . curPushedStreams ++
} else {
sc . curClientStreams ++
}
if sc . curOpenStreams ( ) == 1 {
sc . setConnState ( http . StateActive )
}
return st
}
func ( sc * serverConn ) newWriterAndRequest ( st * stream , f * MetaHeadersFrame ) ( * responseWriter , * http . Request , error ) {
sc . serveG . check ( )
rp := requestParam {
method : f . PseudoValue ( "method" ) ,
scheme : f . PseudoValue ( "scheme" ) ,
authority : f . PseudoValue ( "authority" ) ,
path : f . PseudoValue ( "path" ) ,
}
isConnect := rp . method == "CONNECT"
if isConnect {
if rp . path != "" || rp . scheme != "" || rp . authority == "" {
return nil , nil , streamError ( f . StreamID , ErrCodeProtocol )
}
} else if rp . method == "" || rp . path == "" || ( rp . scheme != "https" && rp . scheme != "http" ) {
// See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
// MUST be treated as a stream error (Section 5.4.2)
// of type PROTOCOL_ERROR."
//
// 8.1.2.3 Request Pseudo-Header Fields
// "All HTTP/2 requests MUST include exactly one valid
// value for the :method, :scheme, and :path
// pseudo-header fields"
return nil , nil , streamError ( f . StreamID , ErrCodeProtocol )
}
bodyOpen := ! f . StreamEnded ( )
if rp . method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
return nil , nil , streamError ( f . StreamID , ErrCodeProtocol )
}
rp . header = make ( http . Header )
for _ , hf := range f . RegularFields ( ) {
rp . header . Add ( sc . canonicalHeader ( hf . Name ) , hf . Value )
}
if rp . authority == "" {
rp . authority = rp . header . Get ( "Host" )
}
rw , req , err := sc . newWriterAndRequestNoBody ( st , rp )
if err != nil {
return nil , nil , err
}
if bodyOpen {
if vv , ok := rp . header [ "Content-Length" ] ; ok {
req . ContentLength , _ = strconv . ParseInt ( vv [ 0 ] , 10 , 64 )
} else {
req . ContentLength = - 1
}
req . Body . ( * requestBody ) . pipe = & pipe {
b : & dataBuffer { expected : req . ContentLength } ,
}
}
return rw , req , nil
}
type requestParam struct {
method string
scheme , authority , path string
header http . Header
}
func ( sc * serverConn ) newWriterAndRequestNoBody ( st * stream , rp requestParam ) ( * responseWriter , * http . Request , error ) {
sc . serveG . check ( )
var tlsState * tls . ConnectionState // nil if not scheme https
if rp . scheme == "https" {
tlsState = sc . tlsState
}
needsContinue := rp . header . Get ( "Expect" ) == "100-continue"
if needsContinue {
rp . header . Del ( "Expect" )
}
// Merge Cookie headers into one "; "-delimited value.
if cookies := rp . header [ "Cookie" ] ; len ( cookies ) > 1 {
rp . header . Set ( "Cookie" , strings . Join ( cookies , "; " ) )
}
// Setup Trailers
var trailer http . Header
for _ , v := range rp . header [ "Trailer" ] {
for _ , key := range strings . Split ( v , "," ) {
key = http . CanonicalHeaderKey ( strings . TrimSpace ( key ) )
switch key {
case "Transfer-Encoding" , "Trailer" , "Content-Length" :
// Bogus. (copy of http1 rules)
// Ignore.
default :
if trailer == nil {
trailer = make ( http . Header )
}
trailer [ key ] = nil
}
}
}
delete ( rp . header , "Trailer" )
var url_ * url . URL
var requestURI string
if rp . method == "CONNECT" {
url_ = & url . URL { Host : rp . authority }
requestURI = rp . authority // mimic HTTP/1 server behavior
} else {
var err error
url_ , err = url . ParseRequestURI ( rp . path )
if err != nil {
return nil , nil , streamError ( st . id , ErrCodeProtocol )
}
requestURI = rp . path
}
body := & requestBody {
conn : sc ,
stream : st ,
needsContinue : needsContinue ,
}
req := & http . Request {
Method : rp . method ,
URL : url_ ,
RemoteAddr : sc . remoteAddrStr ,
Header : rp . header ,
RequestURI : requestURI ,
Proto : "HTTP/2.0" ,
ProtoMajor : 2 ,
ProtoMinor : 0 ,
TLS : tlsState ,
Host : rp . authority ,
Body : body ,
Trailer : trailer ,
}
req = requestWithContext ( req , st . ctx )
rws := responseWriterStatePool . Get ( ) . ( * responseWriterState )
bwSave := rws . bw
* rws = responseWriterState { } // zero all the fields
rws . conn = sc
rws . bw = bwSave
rws . bw . Reset ( chunkWriter { rws } )
rws . stream = st
rws . req = req
rws . body = body
rw := & responseWriter { rws : rws }
return rw , req , nil
}
// Run on its own goroutine.
func ( sc * serverConn ) runHandler ( rw * responseWriter , req * http . Request , handler func ( http . ResponseWriter , * http . Request ) ) {
didPanic := true
defer func ( ) {
rw . rws . stream . cancelCtx ( )
if didPanic {
e := recover ( )
sc . writeFrameFromHandler ( FrameWriteRequest {
write : handlerPanicRST { rw . rws . stream . id } ,
stream : rw . rws . stream ,
} )
// Same as net/http:
if shouldLogPanic ( e ) {
const size = 64 << 10
buf := make ( [ ] byte , size )
buf = buf [ : runtime . Stack ( buf , false ) ]
sc . logf ( "http2: panic serving %v: %v\n%s" , sc . conn . RemoteAddr ( ) , e , buf )
}
return
}
rw . handlerDone ( )
} ( )
handler ( rw , req )
didPanic = false
}
func handleHeaderListTooLong ( w http . ResponseWriter , r * http . Request ) {
// 10.5.1 Limits on Header Block Size:
// .. "A server that receives a larger header block than it is
// willing to handle can send an HTTP 431 (Request Header Fields Too
// Large) status code"
const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
w . WriteHeader ( statusRequestHeaderFieldsTooLarge )
io . WriteString ( w , "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>" )
}
// called from handler goroutines.
// h may be nil.
func ( sc * serverConn ) writeHeaders ( st * stream , headerData * writeResHeaders ) error {
sc . serveG . checkNotOn ( ) // NOT on
var errc chan error
if headerData . h != nil {
// If there's a header map (which we don't own), so we have to block on
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = errChanPool . Get ( ) . ( chan error )
}
if err := sc . writeFrameFromHandler ( FrameWriteRequest {
write : headerData ,
stream : st ,
done : errc ,
} ) ; err != nil {
return err
}
if errc != nil {
select {
case err := <- errc :
errChanPool . Put ( errc )
return err
case <- sc . doneServing :
return errClientDisconnected
case <- st . cw :
return errStreamClosed
}
}
return nil
}
// called from handler goroutines.
func ( sc * serverConn ) write100ContinueHeaders ( st * stream ) {
sc . writeFrameFromHandler ( FrameWriteRequest {
write : write100ContinueHeadersFrame { st . id } ,
stream : st ,
} )
}
// A bodyReadMsg tells the server loop that the http.Handler read n
// bytes of the DATA from the client on the given stream.
type bodyReadMsg struct {
st * stream
n int
}
// called from handler goroutines.
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
func ( sc * serverConn ) noteBodyReadFromHandler ( st * stream , n int , err error ) {
sc . serveG . checkNotOn ( ) // NOT on
if n > 0 {
select {
case sc . bodyReadCh <- bodyReadMsg { st , n } :
case <- sc . doneServing :
}
}
}
func ( sc * serverConn ) noteBodyRead ( st * stream , n int ) {
sc . serveG . check ( )
sc . sendWindowUpdate ( nil , n ) // conn-level
if st . state != stateHalfClosedRemote && st . state != stateClosed {
// Don't send this WINDOW_UPDATE if the stream is closed
// remotely.
sc . sendWindowUpdate ( st , n )
}
}
// st may be nil for conn-level
func ( sc * serverConn ) sendWindowUpdate ( st * stream , n int ) {
sc . serveG . check ( )
// "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
// A Go Read call on 64-bit machines could in theory read
// a larger Read than this. Very unlikely, but we handle it here
// rather than elsewhere for now.
const maxUint31 = 1 << 31 - 1
for n >= maxUint31 {
sc . sendWindowUpdate32 ( st , maxUint31 )
n -= maxUint31
}
sc . sendWindowUpdate32 ( st , int32 ( n ) )
}
// st may be nil for conn-level
func ( sc * serverConn ) sendWindowUpdate32 ( st * stream , n int32 ) {
sc . serveG . check ( )
if n == 0 {
return
}
if n < 0 {
panic ( "negative update" )
}
var streamID uint32
if st != nil {
streamID = st . id
}
sc . writeFrame ( FrameWriteRequest {
write : writeWindowUpdate { streamID : streamID , n : uint32 ( n ) } ,
stream : st ,
} )
var ok bool
if st == nil {
ok = sc . inflow . add ( n )
} else {
ok = st . inflow . add ( n )
}
if ! ok {
panic ( "internal error; sent too many window updates without decrements?" )
}
}
// requestBody is the Handler's Request.Body type.
// Read and Close may be called concurrently.
type requestBody struct {
stream * stream
conn * serverConn
closed bool // for use by Close only
sawEOF bool // for use by Read only
pipe * pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func ( b * requestBody ) Close ( ) error {
if b . pipe != nil && ! b . closed {
b . pipe . BreakWithError ( errClosedBody )
}
b . closed = true
return nil
}
func ( b * requestBody ) Read ( p [ ] byte ) ( n int , err error ) {
if b . needsContinue {
b . needsContinue = false
b . conn . write100ContinueHeaders ( b . stream )
}
if b . pipe == nil || b . sawEOF {
return 0 , io . EOF
}
n , err = b . pipe . Read ( p )
if err == io . EOF {
b . sawEOF = true
}
if b . conn == nil && inTests {
return
}
b . conn . noteBodyReadFromHandler ( b . stream , n , err )
return
}
// responseWriter is the http.ResponseWriter implementation. It's
// intentionally small (1 pointer wide) to minimize garbage. The
// responseWriterState pointer inside is zeroed at the end of a
// request (in handlerDone) and calls on the responseWriter thereafter
// simply crash (caller's mistake), but the much larger responseWriterState
// and buffers are reused between multiple requests.
type responseWriter struct {
rws * responseWriterState
}
// Optional http.ResponseWriter interfaces implemented.
var (
_ http . CloseNotifier = ( * responseWriter ) ( nil )
_ http . Flusher = ( * responseWriter ) ( nil )
_ stringWriter = ( * responseWriter ) ( nil )
)
type responseWriterState struct {
// immutable within a request:
stream * stream
req * http . Request
body * requestBody // to close at end of request, if DATA frames didn't
conn * serverConn
// TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
bw * bufio . Writer // writing to a chunkWriter{this *responseWriterState}
// mutated by http.Handler goroutine:
handlerHeader http . Header // nil until called
snapHeader http . Header // snapshot of handlerHeader at WriteHeader time
trailers [ ] string // set in writeChunk
status int // status code passed to WriteHeader
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
2017-07-28 01:11:56 +03:00
dirty bool // a Write failed; don't reuse this responseWriterState
2017-05-27 22:30:11 +03:00
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
closeNotifierMu sync . Mutex // guards closeNotifierCh
closeNotifierCh chan bool // nil until first used
}
type chunkWriter struct { rws * responseWriterState }
func ( cw chunkWriter ) Write ( p [ ] byte ) ( n int , err error ) { return cw . rws . writeChunk ( p ) }
func ( rws * responseWriterState ) hasTrailers ( ) bool { return len ( rws . trailers ) != 0 }
// declareTrailer is called for each Trailer header when the
// response header is written. It notes that a header will need to be
// written in the trailers at the end of the response.
func ( rws * responseWriterState ) declareTrailer ( k string ) {
k = http . CanonicalHeaderKey ( k )
if ! ValidTrailerHeader ( k ) {
// Forbidden by RFC 2616 14.40.
rws . conn . logf ( "ignoring invalid trailer %q" , k )
return
}
if ! strSliceContains ( rws . trailers , k ) {
rws . trailers = append ( rws . trailers , k )
}
}
// writeChunk writes chunks from the bufio.Writer. But because
// bufio.Writer may bypass its chunking, sometimes p may be
// arbitrarily large.
//
// writeChunk is also responsible (on the first chunk) for sending the
// HEADER response.
func ( rws * responseWriterState ) writeChunk ( p [ ] byte ) ( n int , err error ) {
if ! rws . wroteHeader {
rws . writeHeader ( 200 )
}
isHeadResp := rws . req . Method == "HEAD"
if ! rws . sentHeader {
rws . sentHeader = true
var ctype , clen string
if clen = rws . snapHeader . Get ( "Content-Length" ) ; clen != "" {
rws . snapHeader . Del ( "Content-Length" )
clen64 , err := strconv . ParseInt ( clen , 10 , 64 )
if err == nil && clen64 >= 0 {
rws . sentContentLen = clen64
} else {
clen = ""
}
}
if clen == "" && rws . handlerDone && bodyAllowedForStatus ( rws . status ) && ( len ( p ) > 0 || ! isHeadResp ) {
clen = strconv . Itoa ( len ( p ) )
}
_ , hasContentType := rws . snapHeader [ "Content-Type" ]
if ! hasContentType && bodyAllowedForStatus ( rws . status ) {
ctype = http . DetectContentType ( p )
}
var date string
if _ , ok := rws . snapHeader [ "Date" ] ; ! ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = time . Now ( ) . UTC ( ) . Format ( http . TimeFormat )
}
for _ , v := range rws . snapHeader [ "Trailer" ] {
foreachHeaderElement ( v , rws . declareTrailer )
}
endStream := ( rws . handlerDone && ! rws . hasTrailers ( ) && len ( p ) == 0 ) || isHeadResp
err = rws . conn . writeHeaders ( rws . stream , & writeResHeaders {
streamID : rws . stream . id ,
httpResCode : rws . status ,
h : rws . snapHeader ,
endStream : endStream ,
contentType : ctype ,
contentLength : clen ,
date : date ,
} )
if err != nil {
2017-07-28 01:11:56 +03:00
rws . dirty = true
2017-05-27 22:30:11 +03:00
return 0 , err
}
if endStream {
return 0 , nil
}
}
if isHeadResp {
return len ( p ) , nil
}
if len ( p ) == 0 && ! rws . handlerDone {
return 0 , nil
}
if rws . handlerDone {
rws . promoteUndeclaredTrailers ( )
}
endStream := rws . handlerDone && ! rws . hasTrailers ( )
if len ( p ) > 0 || endStream {
// only send a 0 byte DATA frame if we're ending the stream.
if err := rws . conn . writeDataFromHandler ( rws . stream , p , endStream ) ; err != nil {
2017-07-28 01:11:56 +03:00
rws . dirty = true
2017-05-27 22:30:11 +03:00
return 0 , err
}
}
if rws . handlerDone && rws . hasTrailers ( ) {
err = rws . conn . writeHeaders ( rws . stream , & writeResHeaders {
streamID : rws . stream . id ,
h : rws . handlerHeader ,
trailers : rws . trailers ,
endStream : true ,
} )
2017-07-28 01:11:56 +03:00
if err != nil {
rws . dirty = true
}
2017-05-27 22:30:11 +03:00
return len ( p ) , err
}
return len ( p ) , nil
}
// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
// that, if present, signals that the map entry is actually for
// the response trailers, and not the response headers. The prefix
// is stripped after the ServeHTTP call finishes and the values are
// sent in the trailers.
//
// This mechanism is intended only for trailers that are not known
// prior to the headers being written. If the set of trailers is fixed
// or known before the header is written, the normal Go trailers mechanism
// is preferred:
// https://golang.org/pkg/net/http/#ResponseWriter
// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
const TrailerPrefix = "Trailer:"
// promoteUndeclaredTrailers permits http.Handlers to set trailers
// after the header has already been flushed. Because the Go
// ResponseWriter interface has no way to set Trailers (only the
// Header), and because we didn't want to expand the ResponseWriter
// interface, and because nobody used trailers, and because RFC 2616
// says you SHOULD (but not must) predeclare any trailers in the
// header, the official ResponseWriter rules said trailers in Go must
// be predeclared, and then we reuse the same ResponseWriter.Header()
// map to mean both Headers and Trailers. When it's time to write the
// Trailers, we pick out the fields of Headers that were declared as
// trailers. That worked for a while, until we found the first major
// user of Trailers in the wild: gRPC (using them only over http2),
// and gRPC libraries permit setting trailers mid-stream without
// predeclarnig them. So: change of plans. We still permit the old
// way, but we also permit this hack: if a Header() key begins with
// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
// invalid token byte anyway, there is no ambiguity. (And it's already
// filtered out) It's mildly hacky, but not terrible.
//
// This method runs after the Handler is done and promotes any Header
// fields to be trailers.
func ( rws * responseWriterState ) promoteUndeclaredTrailers ( ) {
for k , vv := range rws . handlerHeader {
if ! strings . HasPrefix ( k , TrailerPrefix ) {
continue
}
trailerKey := strings . TrimPrefix ( k , TrailerPrefix )
rws . declareTrailer ( trailerKey )
rws . handlerHeader [ http . CanonicalHeaderKey ( trailerKey ) ] = vv
}
if len ( rws . trailers ) > 1 {
sorter := sorterPool . Get ( ) . ( * sorter )
sorter . SortStrings ( rws . trailers )
sorterPool . Put ( sorter )
}
}
func ( w * responseWriter ) Flush ( ) {
rws := w . rws
if rws == nil {
panic ( "Header called after Handler finished" )
}
if rws . bw . Buffered ( ) > 0 {
if err := rws . bw . Flush ( ) ; err != nil {
// Ignore the error. The frame writer already knows.
return
}
} else {
// The bufio.Writer won't call chunkWriter.Write
// (writeChunk with zero bytes, so we have to do it
// ourselves to force the HTTP response header and/or
// final DATA frame (with END_STREAM) to be sent.
rws . writeChunk ( nil )
}
}
func ( w * responseWriter ) CloseNotify ( ) <- chan bool {
rws := w . rws
if rws == nil {
panic ( "CloseNotify called after Handler finished" )
}
rws . closeNotifierMu . Lock ( )
ch := rws . closeNotifierCh
if ch == nil {
ch = make ( chan bool , 1 )
rws . closeNotifierCh = ch
cw := rws . stream . cw
go func ( ) {
cw . Wait ( ) // wait for close
ch <- true
} ( )
}
rws . closeNotifierMu . Unlock ( )
return ch
}
func ( w * responseWriter ) Header ( ) http . Header {
rws := w . rws
if rws == nil {
panic ( "Header called after Handler finished" )
}
if rws . handlerHeader == nil {
rws . handlerHeader = make ( http . Header )
}
return rws . handlerHeader
}
func ( w * responseWriter ) WriteHeader ( code int ) {
rws := w . rws
if rws == nil {
panic ( "WriteHeader called after Handler finished" )
}
rws . writeHeader ( code )
}
func ( rws * responseWriterState ) writeHeader ( code int ) {
if ! rws . wroteHeader {
rws . wroteHeader = true
rws . status = code
if len ( rws . handlerHeader ) > 0 {
rws . snapHeader = cloneHeader ( rws . handlerHeader )
}
}
}
func cloneHeader ( h http . Header ) http . Header {
h2 := make ( http . Header , len ( h ) )
for k , vv := range h {
vv2 := make ( [ ] string , len ( vv ) )
copy ( vv2 , vv )
h2 [ k ] = vv2
}
return h2
}
// The Life Of A Write is like this:
//
// * Handler calls w.Write or w.WriteString ->
// * -> rws.bw (*bufio.Writer) ->
2017-07-28 01:11:56 +03:00
// * (Handler might call Flush)
2017-05-27 22:30:11 +03:00
// * -> chunkWriter{rws}
// * -> responseWriterState.writeChunk(p []byte)
// * -> responseWriterState.writeChunk (most of the magic; see comment there)
func ( w * responseWriter ) Write ( p [ ] byte ) ( n int , err error ) {
return w . write ( len ( p ) , p , "" )
}
func ( w * responseWriter ) WriteString ( s string ) ( n int , err error ) {
return w . write ( len ( s ) , nil , s )
}
// either dataB or dataS is non-zero.
func ( w * responseWriter ) write ( lenData int , dataB [ ] byte , dataS string ) ( n int , err error ) {
rws := w . rws
if rws == nil {
panic ( "Write called after Handler finished" )
}
if ! rws . wroteHeader {
w . WriteHeader ( 200 )
}
if ! bodyAllowedForStatus ( rws . status ) {
return 0 , http . ErrBodyNotAllowed
}
rws . wroteBytes += int64 ( len ( dataB ) ) + int64 ( len ( dataS ) ) // only one can be set
if rws . sentContentLen != 0 && rws . wroteBytes > rws . sentContentLen {
// TODO: send a RST_STREAM
return 0 , errors . New ( "http2: handler wrote more than declared Content-Length" )
}
if dataB != nil {
return rws . bw . Write ( dataB )
} else {
return rws . bw . WriteString ( dataS )
}
}
func ( w * responseWriter ) handlerDone ( ) {
rws := w . rws
2017-07-28 01:11:56 +03:00
dirty := rws . dirty
2017-05-27 22:30:11 +03:00
rws . handlerDone = true
w . Flush ( )
w . rws = nil
2017-07-28 01:11:56 +03:00
if ! dirty {
// Only recycle the pool if all prior Write calls to
// the serverConn goroutine completed successfully. If
// they returned earlier due to resets from the peer
// there might still be write goroutines outstanding
// from the serverConn referencing the rws memory. See
// issue 20704.
responseWriterStatePool . Put ( rws )
}
2017-05-27 22:30:11 +03:00
}
// Push errors.
var (
ErrRecursivePush = errors . New ( "http2: recursive push not allowed" )
ErrPushLimitReached = errors . New ( "http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS" )
)
// pushOptions is the internal version of http.PushOptions, which we
// cannot include here because it's only defined in Go 1.8 and later.
type pushOptions struct {
Method string
Header http . Header
}
func ( w * responseWriter ) push ( target string , opts pushOptions ) error {
st := w . rws . stream
sc := st . sc
sc . serveG . checkNotOn ( )
// No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
// http://tools.ietf.org/html/rfc7540#section-6.6
if st . isPushed ( ) {
return ErrRecursivePush
}
// Default options.
if opts . Method == "" {
opts . Method = "GET"
}
if opts . Header == nil {
opts . Header = http . Header { }
}
wantScheme := "http"
if w . rws . req . TLS != nil {
wantScheme = "https"
}
// Validate the request.
u , err := url . Parse ( target )
if err != nil {
return err
}
if u . Scheme == "" {
if ! strings . HasPrefix ( target , "/" ) {
return fmt . Errorf ( "target must be an absolute URL or an absolute path: %q" , target )
}
u . Scheme = wantScheme
u . Host = w . rws . req . Host
} else {
if u . Scheme != wantScheme {
return fmt . Errorf ( "cannot push URL with scheme %q from request with scheme %q" , u . Scheme , wantScheme )
}
if u . Host == "" {
return errors . New ( "URL must have a host" )
}
}
for k := range opts . Header {
if strings . HasPrefix ( k , ":" ) {
return fmt . Errorf ( "promised request headers cannot include pseudo header %q" , k )
}
// These headers are meaningful only if the request has a body,
// but PUSH_PROMISE requests cannot have a body.
// http://tools.ietf.org/html/rfc7540#section-8.2
// Also disallow Host, since the promised URL must be absolute.
switch strings . ToLower ( k ) {
case "content-length" , "content-encoding" , "trailer" , "te" , "expect" , "host" :
return fmt . Errorf ( "promised request headers cannot include %q" , k )
}
}
if err := checkValidHTTP2RequestHeaders ( opts . Header ) ; err != nil {
return err
}
// The RFC effectively limits promised requests to GET and HEAD:
// "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
// http://tools.ietf.org/html/rfc7540#section-8.2
if opts . Method != "GET" && opts . Method != "HEAD" {
return fmt . Errorf ( "method %q must be GET or HEAD" , opts . Method )
}
msg := & startPushRequest {
parent : st ,
method : opts . Method ,
url : u ,
header : cloneHeader ( opts . Header ) ,
done : errChanPool . Get ( ) . ( chan error ) ,
}
select {
case <- sc . doneServing :
return errClientDisconnected
case <- st . cw :
return errStreamClosed
case sc . serveMsgCh <- msg :
}
select {
case <- sc . doneServing :
return errClientDisconnected
case <- st . cw :
return errStreamClosed
case err := <- msg . done :
errChanPool . Put ( msg . done )
return err
}
}
type startPushRequest struct {
parent * stream
method string
url * url . URL
header http . Header
done chan error
}
func ( sc * serverConn ) startPush ( msg * startPushRequest ) {
sc . serveG . check ( )
// http://tools.ietf.org/html/rfc7540#section-6.6.
// PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
// is in either the "open" or "half-closed (remote)" state.
if msg . parent . state != stateOpen && msg . parent . state != stateHalfClosedRemote {
// responseWriter.Push checks that the stream is peer-initiaed.
msg . done <- errStreamClosed
return
}
// http://tools.ietf.org/html/rfc7540#section-6.6.
if ! sc . pushEnabled {
msg . done <- http . ErrNotSupported
return
}
// PUSH_PROMISE frames must be sent in increasing order by stream ID, so
// we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
// is written. Once the ID is allocated, we start the request handler.
allocatePromisedID := func ( ) ( uint32 , error ) {
sc . serveG . check ( )
// Check this again, just in case. Technically, we might have received
// an updated SETTINGS by the time we got around to writing this frame.
if ! sc . pushEnabled {
return 0 , http . ErrNotSupported
}
// http://tools.ietf.org/html/rfc7540#section-6.5.2.
if sc . curPushedStreams + 1 > sc . clientMaxStreams {
return 0 , ErrPushLimitReached
}
// http://tools.ietf.org/html/rfc7540#section-5.1.1.
// Streams initiated by the server MUST use even-numbered identifiers.
// A server that is unable to establish a new stream identifier can send a GOAWAY
// frame so that the client is forced to open a new connection for new streams.
if sc . maxPushPromiseID + 2 >= 1 << 31 {
sc . startGracefulShutdownInternal ( )
return 0 , ErrPushLimitReached
}
sc . maxPushPromiseID += 2
promisedID := sc . maxPushPromiseID
// http://tools.ietf.org/html/rfc7540#section-8.2.
// Strictly speaking, the new stream should start in "reserved (local)", then
// transition to "half closed (remote)" after sending the initial HEADERS, but
// we start in "half closed (remote)" for simplicity.
// See further comments at the definition of stateHalfClosedRemote.
promised := sc . newStream ( promisedID , msg . parent . id , stateHalfClosedRemote )
rw , req , err := sc . newWriterAndRequestNoBody ( promised , requestParam {
method : msg . method ,
scheme : msg . url . Scheme ,
authority : msg . url . Host ,
path : msg . url . RequestURI ( ) ,
header : cloneHeader ( msg . header ) , // clone since handler runs concurrently with writing the PUSH_PROMISE
} )
if err != nil {
// Should not happen, since we've already validated msg.url.
panic ( fmt . Sprintf ( "newWriterAndRequestNoBody(%+v): %v" , msg . url , err ) )
}
go sc . runHandler ( rw , req , sc . handler . ServeHTTP )
return promisedID , nil
}
sc . writeFrame ( FrameWriteRequest {
write : & writePushPromise {
streamID : msg . parent . id ,
method : msg . method ,
url : msg . url ,
h : msg . header ,
allocatePromisedID : allocatePromisedID ,
} ,
stream : msg . parent ,
done : msg . done ,
} )
}
// foreachHeaderElement splits v according to the "#rule" construction
// in RFC 2616 section 2.1 and calls fn for each non-empty element.
func foreachHeaderElement ( v string , fn func ( string ) ) {
v = textproto . TrimString ( v )
if v == "" {
return
}
if ! strings . Contains ( v , "," ) {
fn ( v )
return
}
for _ , f := range strings . Split ( v , "," ) {
if f = textproto . TrimString ( f ) ; f != "" {
fn ( f )
}
}
}
// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
var connHeaders = [ ] string {
"Connection" ,
"Keep-Alive" ,
"Proxy-Connection" ,
"Transfer-Encoding" ,
"Upgrade" ,
}
// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
// per RFC 7540 Section 8.1.2.2.
// The returned error is reported to users.
func checkValidHTTP2RequestHeaders ( h http . Header ) error {
for _ , k := range connHeaders {
if _ , ok := h [ k ] ; ok {
return fmt . Errorf ( "request header %q is not valid in HTTP/2" , k )
}
}
te := h [ "Te" ]
if len ( te ) > 0 && ( len ( te ) > 1 || ( te [ 0 ] != "trailers" && te [ 0 ] != "" ) ) {
return errors . New ( ` request header "TE" may only be "trailers" in HTTP/2 ` )
}
return nil
}
func new400Handler ( err error ) http . HandlerFunc {
return func ( w http . ResponseWriter , r * http . Request ) {
http . Error ( w , err . Error ( ) , http . StatusBadRequest )
}
}
// ValidTrailerHeader reports whether name is a valid header field name to appear
// in trailers.
// See: http://tools.ietf.org/html/rfc7230#section-4.1.2
func ValidTrailerHeader ( name string ) bool {
name = http . CanonicalHeaderKey ( name )
if strings . HasPrefix ( name , "If-" ) || badTrailer [ name ] {
return false
}
return true
}
var badTrailer = map [ string ] bool {
"Authorization" : true ,
"Cache-Control" : true ,
"Connection" : true ,
"Content-Encoding" : true ,
"Content-Length" : true ,
"Content-Range" : true ,
"Content-Type" : true ,
"Expect" : true ,
"Host" : true ,
"Keep-Alive" : true ,
"Max-Forwards" : true ,
"Pragma" : true ,
"Proxy-Authenticate" : true ,
"Proxy-Authorization" : true ,
"Proxy-Connection" : true ,
"Range" : true ,
"Realm" : true ,
"Te" : true ,
"Trailer" : true ,
"Transfer-Encoding" : true ,
"Www-Authenticate" : true ,
}
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
// disabled. See comments on h1ServerShutdownChan above for why
// the code is written this way.
func h1ServerKeepAlivesDisabled ( hs * http . Server ) bool {
var x interface { } = hs
type I interface {
doKeepAlives ( ) bool
}
if hs , ok := x . ( I ) ; ok {
return ! hs . doKeepAlives ( )
}
return false
}