Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcher: major error handling overhaul #45

Merged
merged 2 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 89 additions & 66 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package batch
import (
"context"
"errors"
"fmt"
"sync"
"time"

"log/slog"

"github.com/runreveal/kawa"
"github.com/segmentio/ksuid"
)

type Flusher[T any] interface {
Expand All @@ -21,6 +23,16 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
return ff(c, msgs)
}

type ErrorHandler[T any] interface {
HandleError(context.Context, error, []kawa.Message[T]) error
}

type ErrorHandle[T any] func(context.Context, error, []kawa.Message[T]) error

func (ef ErrorHandle[T]) HandleError(c context.Context, err error, msgs []kawa.Message[T]) error {
return ef(c, err, msgs)
}

// Destination is a batching destination that will buffer messages until the
// FlushLength limit is reached or the FlushFrequency timer fires, whichever
// comes first.
Expand All @@ -31,12 +43,15 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
// getting read.
type Destination[T any] struct {
flusher Flusher[T]
flushq chan func()
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushcan map[string]context.CancelFunc
stopTimeout time.Duration

errorHandler ErrorHandler[T]
flusherr chan error

messages chan msgAck[T]
buf []msgAck[T]

Expand Down Expand Up @@ -78,8 +93,16 @@ func StopTimeout(d time.Duration) func(*Opts) {
}
}

func DiscardHandler[T any]() ErrorHandler[T] {
return ErrorHandle[T](func(context.Context, error, []kawa.Message[T]) error { return nil })
}

func Raise[T any]() ErrorHandler[T] {
return ErrorHandle[T](func(_ context.Context, err error, _ []kawa.Message[T]) error { return err })
}

// NewDestination instantiates a new batcher.
func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *Destination[T] {
cfg := Opts{
FlushLength: 100,
FlushFrequency: 1 * time.Second,
Expand All @@ -95,18 +118,24 @@ func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
if cfg.FlushParallelism < 1 {
panic("FlushParallelism must be greater than or equal to 1")
}
if e == nil {
panic("ErrorHandler must not be nil")
}
if cfg.StopTimeout < 0 {
cfg.StopTimeout = 0
}

return &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan func(), cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flushq: make(chan struct{}, cfg.FlushParallelism),
flusher: f,
flushcan: make(map[string]context.CancelFunc),
flushfreq: cfg.FlushFrequency,
stopTimeout: cfg.StopTimeout,

errorHandler: e,
flusherr: make(chan error, cfg.FlushParallelism),

messages: make(chan msgAck[T]),
}

Expand All @@ -131,7 +160,7 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess

for _, m := range msgs {
select {
case d.messages <- msgAck[T]{msg: m, ack: callMe}:
case d.messages <- msgAck[T]{msg: m, ack: callMe}: // Here
case <-ctx.Done():
// TODO: one more flush?
return ctx.Err()
Expand All @@ -147,7 +176,6 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess
// Upon cancellation, Run will flush any remaining messages in the buffer and
// return any flush errors that occur
func (d *Destination[T]) Run(ctx context.Context) error {
var err error
var epoch uint64
epochC := make(chan uint64)
setTimer := true
Expand All @@ -160,16 +188,17 @@ func (d *Destination[T]) Run(ctx context.Context) error {
}
d.syncMu.Unlock()

var err error
loop:
for {
select {
case msg := <-d.messages:
case msg := <-d.messages: // Here
d.count++
if setTimer {
// copy the epoch to send on the chan after the timer fires
epc := epoch
time.AfterFunc(d.flushfreq, func() {
epochC <- epc
epochC <- epc // Here
})
setTimer = false
}
Expand All @@ -186,58 +215,35 @@ loop:
d.flush(ctx)
setTimer = true
}
case err = <-d.flusherr:
slog.Error("flush error", "error", err)
break loop

case <-ctx.Done():
if len(d.buf) > 0 {
// optimistic final flush.
// launched in a goroutine to avoid deadlock acuqiring a flushq slot
// might be better to return and not try to write this batch.
go d.flush(context.Background())
}
// on shutdown, don't attempt final flush even if buffer is not empty
break loop
case err = <-d.flusherr:
break loop
}
}

// we're done, no flushes in flight
if len(d.flushq) == 0 {
return err
}

slog.Info("stopping batcher. waiting for remaining flushes to finish.", "len", len(d.flushq))
timeout:
for {
// Wait for flushes to finish
select {
case <-time.After(d.stopTimeout):
break timeout
case e2 := <-d.flusherr:
if e2 != nil {
slog.Info("flush error", "error", err)
if err == nil {
err = e2
}
}
for i := 10 * time.Millisecond; i < d.stopTimeout; i = i + 10*time.Millisecond {
if len(d.flushq) == 0 {
return err
}
time.Sleep(10 * time.Millisecond)
}
if len(d.flushq) == 0 {
return err
}

drain:
// flushes still active after timeout
// cancel them.
for {
select {
case cncl := <-d.flushq:
cncl()
default:
break drain
}
d.syncMu.Lock()
for k, v := range d.flushcan {
v()
fmt.Println("timeout cancel for id", k)
}
err = errDeadlock
return err
d.syncMu.Unlock()
return errDeadlock
}

var errDeadlock = errors.New("batcher: flushes timed out waiting for completion after context stopped.")
Expand All @@ -249,46 +255,63 @@ func (d *Destination[T]) flush(ctx context.Context) {
// If we did, then the flusher would likely be canceled before it could
// finish flushing.
flctx, cancel := context.WithCancel(context.Background())

id := ksuid.New().String()
d.syncMu.Lock()
d.flushcan[id] = cancel
d.syncMu.Unlock()

// block until a slot is available, or until a timeout is reached in the
// parent context
select {
case d.flushq <- cancel:
case d.flushq <- struct{}{}:
case <-ctx.Done():
cancel()
return
}

// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
go func() {
d.doflush(flctx, msgs)
msgs, acks := make([]kawa.Message[T], len(d.buf)), make([]func(), len(d.buf))
for i, m := range d.buf {
msgs[i] = m.msg
acks[i] = m.ack
}
go func(id string, msgs []kawa.Message[T], acks []func()) {
d.doflush(flctx, msgs, acks)
// clear flush slot
cncl := <-d.flushq
<-d.flushq
// clear cancel
d.syncMu.Lock()
cncl := d.flushcan[id]
delete(d.flushcan, id)
d.syncMu.Unlock()
cncl()
}()
}(id, msgs, acks)
// Clear the buffer for the next batch
d.buf = d.buf[:0]
}

func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
func (d *Destination[T]) doflush(ctx context.Context, msgs []kawa.Message[T], acks []func()) {
// This not ideal.
kawaMsgs := make([]kawa.Message[T], 0, len(msgs))
for _, m := range msgs {
kawaMsgs = append(kawaMsgs, m.msg)
}
// kawaMsgs := make([]kawa.Message[T], 0, len(msgs))
// for _, m := range msgs {
// kawaMsgs = append(kawaMsgs, m.msg)
// }

err := d.flusher.Flush(ctx, kawaMsgs)
err := d.flusher.Flush(ctx, msgs)
if err != nil {
slog.Debug("flush err", "error", err)
}
if err != nil {
d.flusherr <- err
return
err := d.errorHandler.HandleError(ctx, err, msgs)
if err != nil {
d.flusherr <- err
// if error handler returns an error, then we exit
return
}
}

for _, m := range msgs {
if m.ack != nil {
m.ack()
for _, ack := range acks {
if ack != nil {
ack()
}
}
}
Expand Down
Loading
Loading