Skip to content

Commit

Permalink
fix: fix seek and reconnect race
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Sep 11, 2024
1 parent a9abb68 commit 69749e1
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -182,6 +183,8 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID

redirectedClusterURI string

seekCh atomic.Pointer[chan struct{}]
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -847,6 +850,8 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}

var errSeekInProgress = errors.New("seek operation is already in progress")

func (pc *partitionConsumer) Seek(msgID MessageID) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer")
Expand All @@ -861,6 +866,9 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
}
if !pc.seekCh.CompareAndSwap(nil, &req.doneCh) {
return errSeekInProgress
}
if cmid, ok := msgID.(*chunkMessageID); ok {
req.msgID = cmid.firstChunkID
} else {
Expand All @@ -877,7 +885,6 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {
}

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID)
}
func (pc *partitionConsumer) requestSeek(msgID *messageID) error {
Expand Down Expand Up @@ -926,6 +933,9 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {
doneCh: make(chan struct{}),
publishTime: time,
}
if !pc.seekCh.CompareAndSwap(nil, &req.doneCh) {
return errSeekInProgress
}
pc.ackGroupingTracker.flushAndClean()
pc.eventsCh <- req

Expand All @@ -935,8 +945,6 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {
}

func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
defer close(seek.doneCh)

state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
Expand Down Expand Up @@ -1589,26 +1597,24 @@ type seekByTimeRequest struct {

func (pc *partitionConsumer) runEventsLoop() {
defer func() {
load := pc.seekCh.Load()
if load != nil {
*load <- struct{}{}
}
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")

go func() {
for {
select {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
}
}
}()

for {
for i := range pc.eventsCh {
switch v := i.(type) {
select {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
case event := <-pc.eventsCh:
switch v := event.(type) {
case *ackRequest:
pc.internalAck(v)
case *ackWithTxnRequest:
Expand Down Expand Up @@ -1686,6 +1692,16 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
cleanupSeekChFn := func() {
seekCh := pc.seekCh.Swap(nil)
if seekCh != nil {
*seekCh <- struct{}{}
}
}
defer func() {
cleanupSeekChFn()
}()

var maxRetry int

if pc.options.maxReconnectToBroker == nil {
Expand Down

0 comments on commit 69749e1

Please sign in to comment.