core: Simplify shared listeners, fix deadline bug

When this listener code was first written, UsagePool didn't exist. We
can simplify much of the wrapped listener logic by utilizing UsagePool.

This also fixes a bug where new servers were able to clear deadlines
set by old servers, even if the old server didn't get booted out of its
Accept() call yet. And with the deadline cleared, they never would.
(Sometimes. Based on reports and difficulty of reproducing the bug,
this behavior was extremely rare.) I don't know why that happened
exactly, maybe some polling mechanism in the kernel and if the timings
worked out just wrong it would expose the bug.

Anyway, now we ensure that only the closer that set the deadline is the
same one that clears it, ensuring that old servers always return out of
Accept(), because the deadline doesn't get cleared until they do.

Of course, all this hinges on the hope that my suspicions in the middle
of the night are correct and that kernels work the way I think they do
in my head.

Also minor enhancement to UsagePool where if a value errors upon
construction (a very real possibility with listeners), it is removed from
the pool. Not 100% sure the sync logic is correct there, or maybe we
don't have to even put it in the pool until after construction, but it's
subtle either way and I think this is safe... right?
This commit is contained in:
Matthew Holt 2022-01-10 23:24:58 -07:00
parent c634bbe9cc
commit 64a3218f5c
2 changed files with 146 additions and 168 deletions

View file

@ -16,7 +16,6 @@ package caddy
import ( import (
"fmt" "fmt"
"log"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -26,124 +25,90 @@ import (
"time" "time"
) )
// Listen returns a listener suitable for use in a Caddy module. // Listen is like net.Listen, except Caddy's listeners can overlap
// Always be sure to close listeners when you are done with them. // each other: multiple listeners may be created on the same socket
// at the same time. This is useful because during config changes,
// the new config is started while the old config is still running.
// When Caddy listeners are closed, the closing logic is virtualized
// so the underlying socket isn't actually closed until all uses of
// the socket have been finished. Always be sure to close listeners
// when you are done with them, just like normal listeners.
func Listen(network, addr string) (net.Listener, error) { func Listen(network, addr string) (net.Listener, error) {
lnKey := network + "/" + addr lnKey := network + "/" + addr
listenersMu.Lock() sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
defer listenersMu.Unlock() ln, err := net.Listen(network, addr)
if err != nil {
// if listener already exists, increment usage counter, then return listener return nil, err
if lnGlobal, ok := listeners[lnKey]; ok { }
atomic.AddInt32(&lnGlobal.usage, 1) return &sharedListener{Listener: ln, key: lnKey}, nil
return &fakeCloseListener{ })
usage: &lnGlobal.usage,
deadline: &lnGlobal.deadline,
deadlineMu: &lnGlobal.deadlineMu,
key: lnKey,
Listener: lnGlobal.ln,
}, nil
}
// or, create new one and save it
ln, err := net.Listen(network, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// make sure to start its usage counter at 1 return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener)}, nil
lnGlobal := &globalListener{usage: 1, ln: ln}
listeners[lnKey] = lnGlobal
return &fakeCloseListener{
usage: &lnGlobal.usage,
deadline: &lnGlobal.deadline,
deadlineMu: &lnGlobal.deadlineMu,
key: lnKey,
Listener: ln,
}, nil
} }
// ListenPacket returns a net.PacketConn suitable for use in a Caddy module. // ListenPacket returns a net.PacketConn suitable for use in a Caddy module.
// It is like Listen except for PacketConns.
// Always be sure to close the PacketConn when you are done. // Always be sure to close the PacketConn when you are done.
func ListenPacket(network, addr string) (net.PacketConn, error) { func ListenPacket(network, addr string) (net.PacketConn, error) {
lnKey := network + "/" + addr lnKey := network + "/" + addr
listenersMu.Lock() sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
defer listenersMu.Unlock() pc, err := net.ListenPacket(network, addr)
if err != nil {
// if listener already exists, increment usage counter, then return listener return nil, err
if lnGlobal, ok := listeners[lnKey]; ok { }
atomic.AddInt32(&lnGlobal.usage, 1) return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil
log.Printf("[DEBUG] %s: Usage counter should not go above 2 or maybe 3, is now: %d", lnKey, atomic.LoadInt32(&lnGlobal.usage)) // TODO: remove })
return &fakeClosePacketConn{usage: &lnGlobal.usage, key: lnKey, PacketConn: lnGlobal.pc}, nil
}
// or, create new one and save it
pc, err := net.ListenPacket(network, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// make sure to start its usage counter at 1 return &fakeClosePacketConn{sharedPacketConn: sharedPc.(*sharedPacketConn)}, nil
lnGlobal := &globalListener{usage: 1, pc: pc}
listeners[lnKey] = lnGlobal
return &fakeClosePacketConn{usage: &lnGlobal.usage, key: lnKey, PacketConn: pc}, nil
} }
// fakeCloseListener's Close() method is a no-op. This allows // fakeCloseListener is a private wrapper over a listener that
// stopping servers that are using the listener without giving // is shared. The state of fakeCloseListener is not shared.
// up the socket; thus, servers become hot-swappable while the // This allows one user of a socket to "close" the listener
// listener remains running. Listeners should be re-wrapped in // while in reality the socket stays open for other users of
// a new fakeCloseListener each time the listener is reused. // the listener. In this way, servers become hot-swappable
// Other than the 'closed' field (which pertains to this value // while the listener remains running. Listeners should be
// only), the other fields in this struct should be pointers to // re-wrapped in a new fakeCloseListener each time the listener
// the associated globalListener's struct fields (except 'key' // is reused. This type is atomic and values must not be copied.
// which is there for read-only purposes, so it can be a copy).
type fakeCloseListener struct { type fakeCloseListener struct {
closed int32 // accessed atomically; belongs to this struct only closed int32 // accessed atomically; belongs to this struct only
usage *int32 // accessed atomically; global *sharedListener // embedded, so we also become a net.Listener
deadline *bool // protected by deadlineMu; global
deadlineMu *sync.Mutex // global
key string // global, but read-only, so can be copy
net.Listener // global
} }
// Accept accepts connections until Close() is called.
func (fcl *fakeCloseListener) Accept() (net.Conn, error) { func (fcl *fakeCloseListener) Accept() (net.Conn, error) {
// if the listener is already "closed", return error // if the listener is already "closed", return error
if atomic.LoadInt32(&fcl.closed) == 1 { if atomic.LoadInt32(&fcl.closed) == 1 {
return nil, fcl.fakeClosedErr() return nil, fcl.fakeClosedErr()
} }
// wrap underlying accept // call underlying accept
conn, err := fcl.Listener.Accept() conn, err := fcl.sharedListener.Accept()
if err == nil { if err == nil {
return conn, nil return conn, nil
} }
// accept returned with error // since Accept() returned an error, it may be because our reference to
// TODO: This may be better as a condition variable so the deadline is cleared only once? // the listener (this fakeCloseListener) may have been closed, i.e. the
fcl.deadlineMu.Lock() // server is shutting down; in that case, we need to clear the deadline
if *fcl.deadline { // that we set when Close() was called, and return a non-temporary and
switch ln := fcl.Listener.(type) { // non-timeout error value to the caller, masking the "true" error, so
case *net.TCPListener: // that server loops / goroutines won't retry, linger, and leak
_ = ln.SetDeadline(time.Time{})
case *net.UnixListener:
_ = ln.SetDeadline(time.Time{})
}
*fcl.deadline = false
}
fcl.deadlineMu.Unlock()
if atomic.LoadInt32(&fcl.closed) == 1 { if atomic.LoadInt32(&fcl.closed) == 1 {
// if we canceled the Accept() by setting a deadline // we dereference the sharedListener explicitly even though it's embedded
// on the listener, we need to make sure any callers of // so that it's clear in the code that side-effects are shared with other
// Accept() think the listener was actually closed; // users of this listener, not just our own reference to it; we also don't
// if we return the timeout error instead, callers might // do anything with the error because all we could do is log it, but we
// simply retry, leaking goroutines for longer // expliclty assign it to nothing so we don't forget it's there if needed
_ = fcl.sharedListener.clearDeadline()
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return nil, fcl.fakeClosedErr() return nil, fcl.fakeClosedErr()
} }
@ -152,82 +117,54 @@ func (fcl *fakeCloseListener) Accept() (net.Conn, error) {
return nil, err return nil, err
} }
// Close stops accepting new connections without // Close stops accepting new connections without closing the
// closing the underlying listener, unless no one // underlying listener. The underlying listener is only closed
// else is using it. // if the caller is the last known user of the socket.
func (fcl *fakeCloseListener) Close() error { func (fcl *fakeCloseListener) Close() error {
if atomic.CompareAndSwapInt32(&fcl.closed, 0, 1) { if atomic.CompareAndSwapInt32(&fcl.closed, 0, 1) {
// unfortunately, there is no way to cancel any // There are two ways I know of to get an Accept()
// currently-blocking calls to Accept() that are // function to return to the server loop that called
// awaiting connections since we're not actually // it: close the listener, or set a deadline in the
// closing the listener; so we cheat by setting // past. Obviously, we can't close the socket yet
// a deadline in the past, which forces it to // since others may be using it (hence this whole
// time out; note that this only works for // file). But we can set the deadline in the past,
// certain types of listeners... // and this is kind of cheating, but it works, and
fcl.deadlineMu.Lock() // it apparently even works on Windows.
if !*fcl.deadline { _ = fcl.sharedListener.setDeadline()
switch ln := fcl.Listener.(type) { listenerPool.Delete(fcl.sharedListener.key)
case *net.TCPListener:
_ = ln.SetDeadline(time.Now().Add(-1 * time.Minute))
case *net.UnixListener:
_ = ln.SetDeadline(time.Now().Add(-1 * time.Minute))
}
*fcl.deadline = true
}
fcl.deadlineMu.Unlock()
// since we're no longer using this listener,
// decrement the usage counter and, if no one
// else is using it, close underlying listener
if atomic.AddInt32(fcl.usage, -1) == 0 {
listenersMu.Lock()
delete(listeners, fcl.key)
listenersMu.Unlock()
err := fcl.Listener.Close()
if err != nil {
return err
}
}
} }
return nil return nil
} }
// fakeClosedErr returns an error value that is not temporary
// nor a timeout, suitable for making the caller think the
// listener is actually closed
func (fcl *fakeCloseListener) fakeClosedErr() error { func (fcl *fakeCloseListener) fakeClosedErr() error {
return &net.OpError{ return &net.OpError{
Op: "accept", Op: "accept",
Net: fcl.Listener.Addr().Network(), Net: fcl.Addr().Network(),
Addr: fcl.Listener.Addr(), Addr: fcl.Addr(),
Err: errFakeClosed, Err: errFakeClosed,
} }
} }
// ErrFakeClosed is the underlying error value returned by
// fakeCloseListener.Accept() after Close() has been called,
// indicating that it is pretending to be closed so that the
// server using it can terminate, while the underlying
// socket is actually left open.
var errFakeClosed = fmt.Errorf("listener 'closed' 😉")
// fakeClosePacketConn is like fakeCloseListener, but for PacketConns.
type fakeClosePacketConn struct { type fakeClosePacketConn struct {
closed int32 // accessed atomically closed int32 // accessed atomically; belongs to this struct only
usage *int32 // accessed atomically *sharedPacketConn // embedded, so we also become a net.PacketConn
key string
net.PacketConn
} }
func (fcpc *fakeClosePacketConn) Close() error { func (fcpc *fakeClosePacketConn) Close() error {
log.Println("[DEBUG] Fake-closing underlying packet conn") // TODO: remove this
if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) { if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
// since we're no longer using this listener, listenerPool.Delete(fcpc.sharedPacketConn.key)
// decrement the usage counter and, if no one
// else is using it, close underlying listener
if atomic.AddInt32(fcpc.usage, -1) == 0 {
listenersMu.Lock()
delete(listeners, fcpc.key)
listenersMu.Unlock()
err := fcpc.PacketConn.Close()
if err != nil {
return err
}
}
} }
return nil return nil
} }
@ -249,28 +186,64 @@ func (fcpc fakeClosePacketConn) SyscallConn() (syscall.RawConn, error) {
return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn) return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn)
} }
// ErrFakeClosed is the underlying error value returned by // sharedListener is a wrapper over an underlying listener. The listener
// fakeCloseListener.Accept() after Close() has been called, // and the other fields on the struct are shared state that is synchronized,
// indicating that it is pretending to be closed so that the // so sharedListener structs must never be copied (always use a pointer).
// server using it can terminate, while the underlying type sharedListener struct {
// socket is actually left open. net.Listener
var errFakeClosed = fmt.Errorf("listener 'closed' 😉") key string // uniquely identifies this listener
deadline bool // whether a deadline is currently set
// globalListener keeps global state for a listener
// that may be shared by multiple servers. In other
// words, values in this struct exist only once and
// all other uses of these values point to the ones
// in this struct. In particular, the usage count
// (how many callers are using the listener), the
// actual listener, and synchronization of the
// listener's deadline changes are singular, global
// values that must not be copied.
type globalListener struct {
usage int32 // accessed atomically
deadline bool
deadlineMu sync.Mutex deadlineMu sync.Mutex
ln net.Listener }
pc net.PacketConn
func (sl *sharedListener) clearDeadline() error {
var err error
sl.deadlineMu.Lock()
if sl.deadline {
switch ln := sl.Listener.(type) {
case *net.TCPListener:
err = ln.SetDeadline(time.Time{})
case *net.UnixListener:
err = ln.SetDeadline(time.Time{})
}
sl.deadline = false
}
sl.deadlineMu.Unlock()
return err
}
func (sl *sharedListener) setDeadline() error {
timeInPast := time.Now().Add(-1 * time.Minute)
var err error
sl.deadlineMu.Lock()
if !sl.deadline {
switch ln := sl.Listener.(type) {
case *net.TCPListener:
err = ln.SetDeadline(timeInPast)
case *net.UnixListener:
err = ln.SetDeadline(timeInPast)
}
sl.deadline = true
}
sl.deadlineMu.Unlock()
return err
}
// Destruct is called by the UsagePool when the listener is
// finally not being used anymore. It closes the socket.
func (sl *sharedListener) Destruct() error {
return sl.Listener.Close()
}
// sharedPacketConn is like sharedListener, but for net.PacketConns.
type sharedPacketConn struct {
net.PacketConn
key string
}
// Destruct closes the underlying socket.
func (spc *sharedPacketConn) Destruct() error {
return spc.PacketConn.Close()
} }
// NetworkAddress contains the individual components // NetworkAddress contains the individual components
@ -445,10 +418,8 @@ type ListenerWrapper interface {
WrapListener(net.Listener) net.Listener WrapListener(net.Listener) net.Listener
} }
var ( // listenerPool stores and allows reuse of active listeners.
listeners = make(map[string]*globalListener) var listenerPool = NewUsagePool()
listenersMu sync.Mutex
)
const maxPortSpan = 65535 const maxPortSpan = 65535

View file

@ -94,8 +94,15 @@ func (up *UsagePool) LoadOrNew(key interface{}, construct Constructor) (value in
if err == nil { if err == nil {
upv.value = value upv.value = value
} else { } else {
// TODO: remove error'ed entries from map
upv.err = err upv.err = err
up.Lock()
// this *should* be safe, I think, because we have a
// write lock on upv, but we might also need to ensure
// that upv.err is nil before doing this, since we
// released the write lock on up during construct...
// but then again it's also after midnight...
delete(up.pool, key)
up.Unlock()
} }
upv.Unlock() upv.Unlock()
} }