Skip to content

Commit

Permalink
Add support for pooling in codecs, partition_table, view etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Jan 1, 2024
1 parent 2f90eeb commit 87c2b2c
Show file tree
Hide file tree
Showing 31 changed files with 493 additions and 326 deletions.
9 changes: 9 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package goka

import "io"

// Codec decodes and encodes from and to []byte
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
DecodeP(data []byte) (value interface{}, closer io.Closer, err error)
}

type FuncCloser func() error

func (f FuncCloser) Close() error {
return f()
}
11 changes: 11 additions & 0 deletions codec/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package codec

type Closer interface {
Close()
}

type nullCloser struct{}

func (n *nullCloser) Close() error { return nil }

var NoopCloser = new(nullCloser)
15 changes: 15 additions & 0 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package codec

import (
"fmt"
"io"
"strconv"
)

Expand All @@ -23,6 +24,11 @@ func (d *Bytes) Decode(data []byte) (interface{}, error) {
return data, nil
}

// Decode of defaultCodec simply returns the data
func (d *Bytes) DecodeP(data []byte) (interface{}, io.Closer, error) {
return data, NoopCloser, nil
}

// String is a commonly used codec to encode and decode string <-> []byte
type String struct{}

Expand All @@ -40,6 +46,10 @@ func (c *String) Decode(data []byte) (interface{}, error) {
return string(data), nil
}

func (c *String) DecodeP(data []byte) (interface{}, io.Closer, error) {
return string(data), NoopCloser, nil
}

// Int64 is a commonly used codec to encode and decode string <-> []byte
type Int64 struct{}

Expand All @@ -60,3 +70,8 @@ func (c *Int64) Decode(data []byte) (interface{}, error) {
}
return intVal, nil
}

func (c *Int64) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, NoopCloser, err
}
46 changes: 40 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type cbContext struct {
// tracking statistics for the output topic
trackOutputStats func(ctx context.Context, topic string, size int)

deferreds []func() error

msg *message
done bool
counters struct {
Expand Down Expand Up @@ -310,17 +312,25 @@ func (ctx *cbContext) Join(topic Table) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("table %s not subscribed", topic))
}
data, err := v.st.Get(ctx.Key())
data, getCloser, err := v.st.GetP(ctx.Key())
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", ctx.Key(), topic, err))
} else if data == nil {
return nil
}

value, err := ctx.graph.codec(string(topic)).Decode(data)
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}

value, decodeCloser, err := ctx.graph.codec(string(topic)).DecodeP(data)
if err != nil {
ctx.Fail(fmt.Errorf("error decoding value key %s of table %s: %v", ctx.Key(), topic, err))
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value
}

Expand All @@ -332,10 +342,13 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("topic %s not subscribed", topic))
}
val, err := v.Get(key)
val, getCloser, err := v.GetP(key)
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err))
}
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}
return val
}

Expand All @@ -345,17 +358,26 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
return nil, fmt.Errorf("Cannot access state in stateless processor")
}

data, err := ctx.table.Get(key)
data, closer, err := ctx.table.Get(key)
if err != nil {
return nil, fmt.Errorf("error reading value: %v", err)
} else if data == nil {
}
if closer != nil {
ctx.addDeferred(closer.Close)
}

if data == nil {
return nil, nil
}

value, err := ctx.graph.GroupTable().Codec().Decode(data)
value, decodeCloser, err := ctx.graph.GroupTable().Codec().DecodeP(data)
if err != nil {
return nil, fmt.Errorf("error decoding value: %v", err)
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value, nil
}

Expand Down Expand Up @@ -450,6 +472,14 @@ func (ctx *cbContext) tryCommit(err error) {
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(fmt.Errorf("could not commit message with key '%s': %w", ctx.Key(), ctx.errors.ErrorOrNil()))
} else {

// execute deferred commit functions in reverse order
for i := len(ctx.deferreds) - 1; i >= 0; i-- {
if err := ctx.deferreds[i](); err != nil {
ctx.asyncFailer(fmt.Errorf("error executing context deferred: %w", err))
}
}

ctx.commit()
}

Expand Down Expand Up @@ -483,3 +513,7 @@ func (ctx *cbContext) DeferCommit() func(err error) {
})
}
}

func (ctx *cbContext) addDeferred(def func() error) {
ctx.deferreds = append(ctx.deferreds, def)
}
12 changes: 6 additions & 6 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ func TestContext_GetSetStateful(t *testing.T) {
}
)

st.EXPECT().Get(key).Return(nil, nil)
st.EXPECT().GetP(key).Return(nil, nil, nil)
st.EXPECT().Set(key, []byte(value)).Return(nil)
st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)

graph := DefineGroup(group, Persist(new(codec.String)))
ctx := &cbContext{
Expand Down Expand Up @@ -537,11 +537,11 @@ func TestContext_Join(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Join(table)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Join(table) })

require.Panics(t, func() { ctx.Join("other-table") })
Expand Down Expand Up @@ -586,11 +586,11 @@ func TestContext_Lookup(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Lookup(table, key)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Lookup(table, key) })
require.Panics(t, func() { ctx.Lookup("other-table", key) })

Expand Down
6 changes: 6 additions & 0 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -58,6 +59,11 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down
13 changes: 13 additions & 0 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package blocker
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -28,6 +31,11 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type BlockValue struct {
Blocked bool
}
Expand All @@ -42,6 +50,11 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
Expand Down
10 changes: 9 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package collector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -27,6 +30,11 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func collect(ctx goka.Context, msg interface{}) {
var ml []messaging.Message
if v := ctx.Value(); v != nil {
Expand Down
14 changes: 10 additions & 4 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package detector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand All @@ -14,9 +17,7 @@ const (
maxRate = 0.5
)

var (
group goka.Group = "detector"
)
var group goka.Group = "detector"

type Counters struct {
Sent int
Expand All @@ -34,6 +35,11 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func getValue(ctx goka.Context) *Counters {
if v := ctx.Value(); v != nil {
return v.(*Counters)
Expand Down
12 changes: 12 additions & 0 deletions examples/3-messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package messaging

import (
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Expand All @@ -28,6 +30,11 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type MessageListCodec struct{}

func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) {
Expand All @@ -39,3 +46,8 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, &m)
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Loading

0 comments on commit 87c2b2c

Please sign in to comment.