core: Wrap listeners with type that can pipe

(Does not apply to PacketConn or quic.EarlyListener types.)
This commit is contained in:
Matthew Holt 2022-09-13 23:57:17 -06:00
parent 754fe4f7b4
commit 17f9f974f8
No known key found for this signature in database
GPG key ID: 2A349DD577D586A5
3 changed files with 67 additions and 4 deletions

View file

@ -15,7 +15,7 @@ import (
func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Listener, error) { func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Listener, error) {
// check to see if plugin provides listener // check to see if plugin provides listener
if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil {
return ln, err return acceptPipe(ln), err
} }
lnKey := listenerKey(network, addr) lnKey := listenerKey(network, addr)
@ -29,7 +29,7 @@ func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Lis
} }
return nil, err return nil, err
} }
return &sharedListener{Listener: ln, key: lnKey}, nil return &sharedListener{Listener: acceptPipe(ln), key: lnKey}, nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -14,11 +14,12 @@ import (
func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) { func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) {
// check to see if plugin provides listener // check to see if plugin provides listener
if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil {
return ln, err return pipeable(ln), err
} }
config := &net.ListenConfig{Control: reusePort, KeepAlive: keepalivePeriod} config := &net.ListenConfig{Control: reusePort, KeepAlive: keepalivePeriod}
return config.Listen(context.Background(), network, addr) ln, err := config.Listen(context.Background(), network, addr)
return pipeable(ln), err
} }
func reusePort(network, address string, conn syscall.RawConn) error { func reusePort(network, address string, conn syscall.RawConn) error {

View file

@ -45,6 +45,68 @@ func Listen(network, addr string) (net.Listener, error) {
return ListenTimeout(network, addr, 0) return ListenTimeout(network, addr, 0)
} }
// pipeableListener wraps an underlying listener so
// that connections can be given to a server that is
// calling Accept().
type pipeableListener struct {
net.Listener
bridge chan connAccept
done chan struct{}
closed *int32 // accessed atomically
}
func (pln pipeableListener) Accept() (net.Conn, error) {
accept := <-pln.bridge
return accept.conn, accept.err
}
func (pln pipeableListener) Close() error {
if atomic.CompareAndSwapInt32(pln.closed, 0, 1) {
close(pln.done)
}
return pln.Listener.Close()
}
// pump pipes real connections from the underlying listener's
// Accept() up to the callers of our own Accept().
func (pln pipeableListener) pump() {
for {
select {
case <-pln.done:
return
default:
pln.Pipe(pln.Listener.Accept())
}
}
}
// Pipe gives a connection (or an error) to an active Accept() call
// on this listener.
func (pln pipeableListener) Pipe(conn net.Conn, err error) {
pln.bridge <- connAccept{conn, err}
}
// pipeable wraps listener so that it can be given connections
// for its caller/server to Accept() and use.
func pipeable(listener net.Listener) net.Listener {
if listener == nil {
return listener // don't start a goroutine
}
pln := pipeableListener{
Listener: listener,
bridge: make(chan connAccept),
done: make(chan struct{}),
closed: new(int32),
}
go pln.pump()
return pln
}
type connAccept struct {
conn net.Conn
err error
}
// getListenerFromPlugin returns a listener on the given network and address // getListenerFromPlugin returns a listener on the given network and address
// if a plugin has registered the network name. It may return (nil, nil) if // if a plugin has registered the network name. It may return (nil, nil) if
// no plugin can provide a listener. // no plugin can provide a listener.