Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pooling in codecs, partition_table, view etc. #445

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package goka

import "io"

// Codec decodes and encodes from and to []byte
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
DecodeP(data []byte) (value interface{}, closer io.Closer, err error)
frairon marked this conversation as resolved.
Show resolved Hide resolved
}

// FuncCloser implements io.Closer-interface for convenience
type FuncCloser func() error
frairon marked this conversation as resolved.
Show resolved Hide resolved

func (f FuncCloser) Close() error {
return f()
}
11 changes: 11 additions & 0 deletions codec/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package codec

type Closer interface {
Close()
}
frairon marked this conversation as resolved.
Show resolved Hide resolved

type nullCloser struct{}

func (n *nullCloser) Close() error { return nil }

var NoopCloser = new(nullCloser)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for, if all the code calling DecodeP is doing a nil check on the io.Closer return.

15 changes: 15 additions & 0 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package codec

import (
"fmt"
"io"
"strconv"
)

Expand All @@ -23,6 +24,11 @@ func (d *Bytes) Decode(data []byte) (interface{}, error) {
return data, nil
}

// Decode of defaultCodec simply returns the data
func (d *Bytes) DecodeP(data []byte) (interface{}, io.Closer, error) {
frairon marked this conversation as resolved.
Show resolved Hide resolved
return data, NoopCloser, nil
}

// String is a commonly used codec to encode and decode string <-> []byte
type String struct{}

Expand All @@ -40,6 +46,10 @@ func (c *String) Decode(data []byte) (interface{}, error) {
return string(data), nil
}

func (c *String) DecodeP(data []byte) (interface{}, io.Closer, error) {
return string(data), NoopCloser, nil
}

// Int64 is a commonly used codec to encode and decode string <-> []byte
type Int64 struct{}

Expand All @@ -60,3 +70,8 @@ func (c *Int64) Decode(data []byte) (interface{}, error) {
}
return intVal, nil
}

func (c *Int64) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, NoopCloser, err
}
46 changes: 40 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type cbContext struct {
// tracking statistics for the output topic
trackOutputStats func(ctx context.Context, topic string, size int)

deferreds []func() error
frairon marked this conversation as resolved.
Show resolved Hide resolved

msg *message
done bool
counters struct {
Expand Down Expand Up @@ -310,17 +312,25 @@ func (ctx *cbContext) Join(topic Table) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("table %s not subscribed", topic))
}
data, err := v.st.Get(ctx.Key())
data, getCloser, err := v.st.GetP(ctx.Key())
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", ctx.Key(), topic, err))
frairon marked this conversation as resolved.
Show resolved Hide resolved
} else if data == nil {
return nil
}

value, err := ctx.graph.codec(string(topic)).Decode(data)
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}

value, decodeCloser, err := ctx.graph.codec(string(topic)).DecodeP(data)
if err != nil {
ctx.Fail(fmt.Errorf("error decoding value key %s of table %s: %v", ctx.Key(), topic, err))
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value
}

Expand All @@ -332,10 +342,13 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("topic %s not subscribed", topic))
}
val, err := v.Get(key)
val, getCloser, err := v.GetP(key)
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err))
}
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}
return val
}

Expand All @@ -345,17 +358,26 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
return nil, fmt.Errorf("Cannot access state in stateless processor")
}

data, err := ctx.table.Get(key)
data, closer, err := ctx.table.Get(key)
if err != nil {
return nil, fmt.Errorf("error reading value: %v", err)
} else if data == nil {
}
if closer != nil {
ctx.addDeferred(closer.Close)
}

if data == nil {
return nil, nil
}

value, err := ctx.graph.GroupTable().Codec().Decode(data)
value, decodeCloser, err := ctx.graph.GroupTable().Codec().DecodeP(data)
if err != nil {
return nil, fmt.Errorf("error decoding value: %v", err)
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value, nil
}

Expand Down Expand Up @@ -450,6 +472,14 @@ func (ctx *cbContext) tryCommit(err error) {
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(fmt.Errorf("could not commit message with key '%s': %w", ctx.Key(), ctx.errors.ErrorOrNil()))
} else {

// execute deferred commit functions in reverse order
for i := len(ctx.deferreds) - 1; i >= 0; i-- {
if err := ctx.deferreds[i](); err != nil {
ctx.asyncFailer(fmt.Errorf("error executing context deferred: %w", err))
}
}

ctx.commit()
}

Expand Down Expand Up @@ -483,3 +513,7 @@ func (ctx *cbContext) DeferCommit() func(err error) {
})
}
}

func (ctx *cbContext) addDeferred(def func() error) {
frairon marked this conversation as resolved.
Show resolved Hide resolved
ctx.deferreds = append(ctx.deferreds, def)
}
12 changes: 6 additions & 6 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ func TestContext_GetSetStateful(t *testing.T) {
}
)

st.EXPECT().Get(key).Return(nil, nil)
st.EXPECT().GetP(key).Return(nil, nil, nil)
st.EXPECT().Set(key, []byte(value)).Return(nil)
st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)

graph := DefineGroup(group, Persist(new(codec.String)))
ctx := &cbContext{
Expand Down Expand Up @@ -537,11 +537,11 @@ func TestContext_Join(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Join(table)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Join(table) })

require.Panics(t, func() { ctx.Join("other-table") })
Expand Down Expand Up @@ -586,11 +586,11 @@ func TestContext_Lookup(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Lookup(table, key)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Lookup(table, key) })
require.Panics(t, func() { ctx.Lookup("other-table", key) })

Expand Down
6 changes: 6 additions & 0 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -58,6 +59,11 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down
13 changes: 13 additions & 0 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package blocker
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -28,6 +31,11 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type BlockValue struct {
Blocked bool
}
Expand All @@ -42,6 +50,11 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
Expand Down
10 changes: 9 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package collector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -27,6 +30,11 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func collect(ctx goka.Context, msg interface{}) {
var ml []messaging.Message
if v := ctx.Value(); v != nil {
Expand Down
14 changes: 10 additions & 4 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package detector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand All @@ -14,9 +17,7 @@ const (
maxRate = 0.5
)

var (
group goka.Group = "detector"
)
var group goka.Group = "detector"

type Counters struct {
Sent int
Expand All @@ -34,6 +35,11 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func getValue(ctx goka.Context) *Counters {
if v := ctx.Value(); v != nil {
return v.(*Counters)
Expand Down
12 changes: 12 additions & 0 deletions examples/3-messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package messaging

import (
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Expand All @@ -28,6 +30,11 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type MessageListCodec struct{}

func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) {
Expand All @@ -39,3 +46,8 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, &m)
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Loading