Skip to content

Commit

Permalink
Merge pull request #45 from runreveal/alan/batcher-errors
Browse files Browse the repository at this point in the history
batcher: major error handling overhaul
  • Loading branch information
abraithwaite committed Jun 1, 2024
2 parents 4fa34af + 1eb0223 commit 2a9c7aa
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 77 deletions.
155 changes: 89 additions & 66 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package batch
import (
"context"
"errors"
"fmt"
"sync"
"time"

"log/slog"

"github.com/runreveal/kawa"
"github.com/segmentio/ksuid"
)

type Flusher[T any] interface {
Expand All @@ -21,6 +23,16 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
return ff(c, msgs)
}

type ErrorHandler[T any] interface {
HandleError(context.Context, error, []kawa.Message[T]) error
}

type ErrorHandle[T any] func(context.Context, error, []kawa.Message[T]) error

func (ef ErrorHandle[T]) HandleError(c context.Context, err error, msgs []kawa.Message[T]) error {
return ef(c, err, msgs)
}

// Destination is a batching destination that will buffer messages until the
// FlushLength limit is reached or the FlushFrequency timer fires, whichever
// comes first.
Expand All @@ -31,12 +43,15 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
// getting read.
type Destination[T any] struct {
flusher Flusher[T]
flushq chan func()
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushcan map[string]context.CancelFunc
stopTimeout time.Duration

errorHandler ErrorHandler[T]
flusherr chan error

messages chan msgAck[T]
buf []msgAck[T]

Expand Down Expand Up @@ -78,8 +93,16 @@ func StopTimeout(d time.Duration) func(*Opts) {
}
}

func DiscardHandler[T any]() ErrorHandler[T] {
return ErrorHandle[T](func(context.Context, error, []kawa.Message[T]) error { return nil })
}

func Raise[T any]() ErrorHandler[T] {
return ErrorHandle[T](func(_ context.Context, err error, _ []kawa.Message[T]) error { return err })
}

// NewDestination instantiates a new batcher.
func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *Destination[T] {
cfg := Opts{
FlushLength: 100,
FlushFrequency: 1 * time.Second,
Expand All @@ -95,18 +118,24 @@ func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
if cfg.FlushParallelism < 1 {
panic("FlushParallelism must be greater than or equal to 1")
}
if e == nil {
panic("ErrorHandler must not be nil")
}
if cfg.StopTimeout < 0 {
cfg.StopTimeout = 0
}

return &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan func(), cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flushq: make(chan struct{}, cfg.FlushParallelism),
flusher: f,
flushcan: make(map[string]context.CancelFunc),
flushfreq: cfg.FlushFrequency,
stopTimeout: cfg.StopTimeout,

errorHandler: e,
flusherr: make(chan error, cfg.FlushParallelism),

messages: make(chan msgAck[T]),
}

Expand All @@ -131,7 +160,7 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess

for _, m := range msgs {
select {
case d.messages <- msgAck[T]{msg: m, ack: callMe}:
case d.messages <- msgAck[T]{msg: m, ack: callMe}: // Here
case <-ctx.Done():
// TODO: one more flush?
return ctx.Err()
Expand All @@ -147,7 +176,6 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess
// Upon cancellation, Run will flush any remaining messages in the buffer and
// return any flush errors that occur
func (d *Destination[T]) Run(ctx context.Context) error {
var err error
var epoch uint64
epochC := make(chan uint64)
setTimer := true
Expand All @@ -160,16 +188,17 @@ func (d *Destination[T]) Run(ctx context.Context) error {
}
d.syncMu.Unlock()

var err error
loop:
for {
select {
case msg := <-d.messages:
case msg := <-d.messages: // Here
d.count++
if setTimer {
// copy the epoch to send on the chan after the timer fires
epc := epoch
time.AfterFunc(d.flushfreq, func() {
epochC <- epc
epochC <- epc // Here
})
setTimer = false
}
Expand All @@ -186,58 +215,35 @@ loop:
d.flush(ctx)
setTimer = true
}
case err = <-d.flusherr:
slog.Error("flush error", "error", err)
break loop

case <-ctx.Done():
if len(d.buf) > 0 {
// optimistic final flush.
// launched in a goroutine to avoid deadlock acuqiring a flushq slot
// might be better to return and not try to write this batch.
go d.flush(context.Background())
}
// on shutdown, don't attempt final flush even if buffer is not empty
break loop
case err = <-d.flusherr:
break loop
}
}

// we're done, no flushes in flight
if len(d.flushq) == 0 {
return err
}

slog.Info("stopping batcher. waiting for remaining flushes to finish.", "len", len(d.flushq))
timeout:
for {
// Wait for flushes to finish
select {
case <-time.After(d.stopTimeout):
break timeout
case e2 := <-d.flusherr:
if e2 != nil {
slog.Info("flush error", "error", err)
if err == nil {
err = e2
}
}
for i := 10 * time.Millisecond; i < d.stopTimeout; i = i + 10*time.Millisecond {
if len(d.flushq) == 0 {
return err
}
time.Sleep(10 * time.Millisecond)
}
if len(d.flushq) == 0 {
return err
}

drain:
// flushes still active after timeout
// cancel them.
for {
select {
case cncl := <-d.flushq:
cncl()
default:
break drain
}
d.syncMu.Lock()
for k, v := range d.flushcan {
v()
fmt.Println("timeout cancel for id", k)
}
err = errDeadlock
return err
d.syncMu.Unlock()
return errDeadlock
}

var errDeadlock = errors.New("batcher: flushes timed out waiting for completion after context stopped.")
Expand All @@ -249,46 +255,63 @@ func (d *Destination[T]) flush(ctx context.Context) {
// If we did, then the flusher would likely be canceled before it could
// finish flushing.
flctx, cancel := context.WithCancel(context.Background())

id := ksuid.New().String()
d.syncMu.Lock()
d.flushcan[id] = cancel
d.syncMu.Unlock()

// block until a slot is available, or until a timeout is reached in the
// parent context
select {
case d.flushq <- cancel:
case d.flushq <- struct{}{}:
case <-ctx.Done():
cancel()
return
}

// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
go func() {
d.doflush(flctx, msgs)
msgs, acks := make([]kawa.Message[T], len(d.buf)), make([]func(), len(d.buf))
for i, m := range d.buf {
msgs[i] = m.msg
acks[i] = m.ack
}
go func(id string, msgs []kawa.Message[T], acks []func()) {
d.doflush(flctx, msgs, acks)
// clear flush slot
cncl := <-d.flushq
<-d.flushq
// clear cancel
d.syncMu.Lock()
cncl := d.flushcan[id]
delete(d.flushcan, id)
d.syncMu.Unlock()
cncl()
}()
}(id, msgs, acks)
// Clear the buffer for the next batch
d.buf = d.buf[:0]
}

func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
func (d *Destination[T]) doflush(ctx context.Context, msgs []kawa.Message[T], acks []func()) {
// This not ideal.
kawaMsgs := make([]kawa.Message[T], 0, len(msgs))
for _, m := range msgs {
kawaMsgs = append(kawaMsgs, m.msg)
}
// kawaMsgs := make([]kawa.Message[T], 0, len(msgs))
// for _, m := range msgs {
// kawaMsgs = append(kawaMsgs, m.msg)
// }

err := d.flusher.Flush(ctx, kawaMsgs)
err := d.flusher.Flush(ctx, msgs)
if err != nil {
slog.Debug("flush err", "error", err)
}
if err != nil {
d.flusherr <- err
return
err := d.errorHandler.HandleError(ctx, err, msgs)
if err != nil {
d.flusherr <- err
// if error handler returns an error, then we exit
return
}
}

for _, m := range msgs {
if m.ack != nil {
m.ack()
for _, ack := range acks {
if ack != nil {
ack()
}
}
}
Expand Down
Loading

0 comments on commit 2a9c7aa

Please sign in to comment.