From 17f9f974f89002116b5528e1f2a3b0844bf5a3c5 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Tue, 13 Sep 2022 23:57:17 -0600 Subject: [PATCH] core: Wrap listeners with type that can pipe (Does not apply to PacketConn or quic.EarlyListener types.) --- listen.go | 4 ++-- listen_linux.go | 5 ++-- listeners.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/listen.go b/listen.go index 2c4a0b28..4672442e 100644 --- a/listen.go +++ b/listen.go @@ -15,7 +15,7 @@ import ( func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Listener, error) { // check to see if plugin provides listener if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { - return ln, err + return acceptPipe(ln), err } lnKey := listenerKey(network, addr) @@ -29,7 +29,7 @@ func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Lis } return nil, err } - return &sharedListener{Listener: ln, key: lnKey}, nil + return &sharedListener{Listener: acceptPipe(ln), key: lnKey}, nil }) if err != nil { return nil, err diff --git a/listen_linux.go b/listen_linux.go index b1220ce4..6d29563e 100644 --- a/listen_linux.go +++ b/listen_linux.go @@ -14,11 +14,12 @@ import ( func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) { // check to see if plugin provides listener if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { - return ln, err + return pipeable(ln), err } config := &net.ListenConfig{Control: reusePort, KeepAlive: keepalivePeriod} - return config.Listen(context.Background(), network, addr) + ln, err := config.Listen(context.Background(), network, addr) + return pipeable(ln), err } func reusePort(network, address string, conn syscall.RawConn) error { diff --git a/listeners.go b/listeners.go index 6a23c61a..08835b24 100644 --- a/listeners.go +++ b/listeners.go @@ -45,6 +45,68 @@ func Listen(network, addr string) (net.Listener, error) { return ListenTimeout(network, addr, 0) } +// pipeableListener wraps an underlying listener so +// that connections can be given to a server that is +// calling Accept(). +type pipeableListener struct { + net.Listener + bridge chan connAccept + done chan struct{} + closed *int32 // accessed atomically +} + +func (pln pipeableListener) Accept() (net.Conn, error) { + accept := <-pln.bridge + return accept.conn, accept.err +} + +func (pln pipeableListener) Close() error { + if atomic.CompareAndSwapInt32(pln.closed, 0, 1) { + close(pln.done) + } + return pln.Listener.Close() +} + +// pump pipes real connections from the underlying listener's +// Accept() up to the callers of our own Accept(). +func (pln pipeableListener) pump() { + for { + select { + case <-pln.done: + return + default: + pln.Pipe(pln.Listener.Accept()) + } + } +} + +// Pipe gives a connection (or an error) to an active Accept() call +// on this listener. +func (pln pipeableListener) Pipe(conn net.Conn, err error) { + pln.bridge <- connAccept{conn, err} +} + +// pipeable wraps listener so that it can be given connections +// for its caller/server to Accept() and use. +func pipeable(listener net.Listener) net.Listener { + if listener == nil { + return listener // don't start a goroutine + } + pln := pipeableListener{ + Listener: listener, + bridge: make(chan connAccept), + done: make(chan struct{}), + closed: new(int32), + } + go pln.pump() + return pln +} + +type connAccept struct { + conn net.Conn + err error +} + // getListenerFromPlugin returns a listener on the given network and address // if a plugin has registered the network name. It may return (nil, nil) if // no plugin can provide a listener.