From 69749e1c5c19ccca7206073c7b13e13ad2d35e23 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 6 Aug 2024 12:07:41 +0800 Subject: [PATCH] fix: fix seek and reconnect race --- pulsar/consumer_partition.go | 52 +++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 50d1569ed..a00bca05f 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -24,6 +24,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -182,6 +183,8 @@ type partitionConsumer struct { lastMessageInBroker *trackingMessageID redirectedClusterURI string + + seekCh atomic.Pointer[chan struct{}] } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -847,6 +850,8 @@ func (pc *partitionConsumer) Close() { <-req.doneCh } +var errSeekInProgress = errors.New("seek operation is already in progress") + func (pc *partitionConsumer) Seek(msgID MessageID) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer") @@ -861,6 +866,9 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error { req := &seekRequest{ doneCh: make(chan struct{}), } + if !pc.seekCh.CompareAndSwap(nil, &req.doneCh) { + return errSeekInProgress + } if cmid, ok := msgID.(*chunkMessageID); ok { req.msgID = cmid.firstChunkID } else { @@ -877,7 +885,6 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error { } func (pc *partitionConsumer) internalSeek(seek *seekRequest) { - defer close(seek.doneCh) seek.err = pc.requestSeek(seek.msgID) } func (pc *partitionConsumer) requestSeek(msgID *messageID) error { @@ -926,6 +933,9 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error { doneCh: make(chan struct{}), publishTime: time, } + if !pc.seekCh.CompareAndSwap(nil, &req.doneCh) { + return errSeekInProgress + } pc.ackGroupingTracker.flushAndClean() pc.eventsCh <- req @@ -935,8 +945,6 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error { } func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) { - defer close(seek.doneCh) - state := pc.getConsumerState() if state == consumerClosing || state == consumerClosed { pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed") @@ -1589,26 +1597,24 @@ type seekByTimeRequest struct { func (pc *partitionConsumer) runEventsLoop() { defer func() { + load := pc.seekCh.Load() + if load != nil { + *load <- struct{}{} + } pc.log.Debug("exiting events loop") }() pc.log.Debug("get into runEventsLoop") - go func() { - for { - select { - case <-pc.closeCh: - pc.log.Info("close consumer, exit reconnect") - return - case connectionClosed := <-pc.connectClosedCh: - pc.log.Debug("runEventsLoop will reconnect") - pc.reconnectToBroker(connectionClosed) - } - } - }() - for { - for i := range pc.eventsCh { - switch v := i.(type) { + select { + case <-pc.closeCh: + pc.log.Info("close consumer, exit reconnect") + return + case connectionClosed := <-pc.connectClosedCh: + pc.log.Debug("runEventsLoop will reconnect") + pc.reconnectToBroker(connectionClosed) + case event := <-pc.eventsCh: + switch v := event.(type) { case *ackRequest: pc.internalAck(v) case *ackWithTxnRequest: @@ -1686,6 +1692,16 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { } func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) { + cleanupSeekChFn := func() { + seekCh := pc.seekCh.Swap(nil) + if seekCh != nil { + *seekCh <- struct{}{} + } + } + defer func() { + cleanupSeekChFn() + }() + var maxRetry int if pc.options.maxReconnectToBroker == nil {