diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index c1f38f898..8a6c002a3 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -1070,6 +1070,13 @@ func (c *connection) Close() { close(c.closeCh) + c.pendingLock.Lock() + for id, req := range c.pendingReqs { + delete(c.pendingReqs, id) + req.callback(nil, errors.New("connection closed")) + } + c.pendingLock.Unlock() + listeners := make(map[uint64]ConnectionListener) c.listenersLock.Lock() for id, listener := range c.listeners {