Skip to content

Commit

Permalink
fix: release pending requests when the connection is close
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Sep 11, 2024
1 parent 69749e1 commit e9582e6
Showing 1 changed file with 43 additions and 80 deletions.
123 changes: 43 additions & 80 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type partitionConsumer struct {

// the size of the queue channel for buffering messages
maxQueueSize int32
queueCh chan []*message
queueCh chan *message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

Expand Down Expand Up @@ -331,7 +331,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
Expand Down Expand Up @@ -1059,37 +1059,33 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return fmt.Errorf("discarding message on decryption error :%v", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
},
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}

pc.queueCh <- messages
pc.queueCh <- &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
}
return nil
}
}
Expand Down Expand Up @@ -1258,14 +1254,12 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
}

pc.log.Info("R " + msg.msgID.String())

pc.options.interceptors.BeforeConsume(ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
})

messages = append(messages, msg)
pc.queueCh <- msg
bytesReceived += msg.size()
}

Expand All @@ -1279,8 +1273,6 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.availablePermits.add(skippedMessages)
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
}

Expand Down Expand Up @@ -1436,36 +1428,7 @@ func (pc *partitionConsumer) dispatcher() {
defer func() {
pc.log.Debug("exiting dispatch loop")
}()
var messages []*message
for {
var queueCh chan []*message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int

// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}
nextMessageSize = messages[0].size()

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
queueCh = pc.queueCh
}

select {
case <-pc.closeCh:
return
Expand All @@ -1476,8 +1439,6 @@ func (pc *partitionConsumer) dispatcher() {
}
pc.log.Debug("dispatcher received connection event")

messages = nil

// reset available permits
pc.availablePermits.reset()

Expand All @@ -1494,28 +1455,33 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}

case msgs, ok := <-queueCh:
case msg, ok := <-pc.queueCh:
if !ok {
return
}
// we only read messages here after the consumer has processed all messages
// in the previous batch
messages = msgs

// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
consumerMessage := ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
}
if pc.dlq.shouldSendToDlq(&consumerMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
pc.dlq.Chan() <- consumerMessage
} else {
// pass the message to application channel
pc.messageCh <- consumerMessage
}

pc.availablePermits.inc()

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
pc.client.memLimit.ReleaseMemory(int64(msg.size()))
pc.expectMoreIncomingMessages()
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(msg.payLoad)))
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
Expand All @@ -1529,15 +1495,12 @@ func (pc *partitionConsumer) dispatcher() {
if m == nil {
break
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
nextMessageInQueue = toTrackingMessageID(m.msgID)
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Sub(int32(len(m)))
pc.incomingMessages.Sub(int32(1))
}
}

messages = nil

clearQueueCb(nextMessageInQueue)
}
}
Expand Down

0 comments on commit e9582e6

Please sign in to comment.