Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pooling in codecs, partition_table, view etc. #445

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,42 @@
package goka

import (
"io"

"github.com/lovoo/goka/codec"
)

// Codec decodes and encodes from and to []byte
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
}

type CodecP interface {
Codec

DecodeP(data []byte) (value interface{}, closer io.Closer, err error)
frairon marked this conversation as resolved.
Show resolved Hide resolved
}

// CloserFunc implements io.Closer-interface for convenience when wrapping functions
type CloserFunc func() error

func (f CloserFunc) Close() error {
return f()
}

type codecWrapper struct {
Codec
}

func (cw *codecWrapper) DecodeP(data []byte) (value interface{}, closer io.Closer, err error) {
val, err := cw.Codec.Decode(data)
return val, codec.NoopCloser, err
}

func convertOrFakeCodec(c Codec) CodecP {
if cp, ok := c.(CodecP); ok {
return cp
}
return &codecWrapper{Codec: c}
}
9 changes: 9 additions & 0 deletions codec/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package codec

type nullCloser struct{}

func (n *nullCloser) Close() error { return nil }

// NoopCloser can be used for returning io.Closer interfaces, whose Close call does
// nothing.
var NoopCloser = new(nullCloser)
46 changes: 40 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type cbContext struct {
// tracking statistics for the output topic
trackOutputStats func(ctx context.Context, topic string, size int)

deferreds []func() error
frairon marked this conversation as resolved.
Show resolved Hide resolved

msg *message
done bool
counters struct {
Expand Down Expand Up @@ -310,17 +312,25 @@ func (ctx *cbContext) Join(topic Table) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("table %s not subscribed", topic))
}
data, err := v.st.Get(ctx.Key())
data, getCloser, err := v.st.GetP(ctx.Key())
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", ctx.Key(), topic, err))
frairon marked this conversation as resolved.
Show resolved Hide resolved
} else if data == nil {
return nil
}

value, err := ctx.graph.codec(string(topic)).Decode(data)
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}

value, decodeCloser, err := ctx.graph.codec(string(topic)).DecodeP(data)
if err != nil {
ctx.Fail(fmt.Errorf("error decoding value key %s of table %s: %v", ctx.Key(), topic, err))
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value
}

Expand All @@ -332,10 +342,13 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} {
if !ok {
ctx.Fail(fmt.Errorf("topic %s not subscribed", topic))
}
val, err := v.Get(key)
val, getCloser, err := v.GetP(key)
if err != nil {
ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err))
}
if getCloser != nil {
ctx.addDeferred(getCloser.Close)
}
return val
}

Expand All @@ -345,17 +358,26 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
return nil, fmt.Errorf("Cannot access state in stateless processor")
}

data, err := ctx.table.Get(key)
data, closer, err := ctx.table.Get(key)
if err != nil {
return nil, fmt.Errorf("error reading value: %v", err)
} else if data == nil {
}
if closer != nil {
ctx.addDeferred(closer.Close)
}

if data == nil {
return nil, nil
}

value, err := ctx.graph.GroupTable().Codec().Decode(data)
value, decodeCloser, err := ctx.graph.GroupTable().Codec().DecodeP(data)
if err != nil {
return nil, fmt.Errorf("error decoding value: %v", err)
}
if decodeCloser != nil {
ctx.addDeferred(decodeCloser.Close)
}

return value, nil
}

Expand Down Expand Up @@ -450,6 +472,14 @@ func (ctx *cbContext) tryCommit(err error) {
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(fmt.Errorf("could not commit message with key '%s': %w", ctx.Key(), ctx.errors.ErrorOrNil()))
} else {

// execute deferred commit functions in reverse order
for i := len(ctx.deferreds) - 1; i >= 0; i-- {
if err := ctx.deferreds[i](); err != nil {
ctx.asyncFailer(fmt.Errorf("error executing context deferred: %w", err))
}
}

ctx.commit()
}

Expand Down Expand Up @@ -483,3 +513,7 @@ func (ctx *cbContext) DeferCommit() func(err error) {
})
}
}

func (ctx *cbContext) addDeferred(def func() error) {
frairon marked this conversation as resolved.
Show resolved Hide resolved
ctx.deferreds = append(ctx.deferreds, def)
}
12 changes: 6 additions & 6 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ func TestContext_GetSetStateful(t *testing.T) {
}
)

st.EXPECT().Get(key).Return(nil, nil)
st.EXPECT().GetP(key).Return(nil, nil, nil)
st.EXPECT().Set(key, []byte(value)).Return(nil)
st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)

graph := DefineGroup(group, Persist(new(codec.String)))
ctx := &cbContext{
Expand Down Expand Up @@ -537,11 +537,11 @@ func TestContext_Join(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Join(table)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Join(table) })

require.Panics(t, func() { ctx.Join("other-table") })
Expand Down Expand Up @@ -586,11 +586,11 @@ func TestContext_Lookup(t *testing.T) {
syncFailer: func(err error) { panic(err) },
}

st.EXPECT().Get(key).Return([]byte(value), nil)
st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil)
v := ctx.Lookup(table, key)
require.Equal(t, value, v)

st.EXPECT().Get(key).Return(nil, errSome)
st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome)
require.Panics(t, func() { ctx.Lookup(table, key) })
require.Panics(t, func() { ctx.Lookup("other-table", key) })

Expand Down
1 change: 1 addition & 0 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocker
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand Down
3 changes: 2 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package collector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand Down
7 changes: 3 additions & 4 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package detector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand All @@ -14,9 +15,7 @@ const (
maxRate = 0.5
)

var (
group goka.Group = "detector"
)
var group goka.Group = "detector"

type Counters struct {
Sent int
Expand Down
10 changes: 4 additions & 6 deletions examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func process(ctx goka.Context, msg interface{}) {
func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
groupInitialized chan struct{}) error {

groupInitialized chan struct{},
) error {
tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
Expand Down Expand Up @@ -161,8 +161,8 @@ func runView(ctx context.Context,
errg *multierr.ErrGroup,
root *mux.Router,
monitor *monitor.Server,
groupInitialized chan struct{}) error {

groupInitialized chan struct{},
) error {
<-groupInitialized

view, err := goka.NewView(brokers,
Expand All @@ -183,7 +183,6 @@ func runView(ctx context.Context,

server := &http.Server{Addr: ":0", Handler: root}
errg.Go(func() error {

root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
value, _ := view.Get(mux.Vars(r)["key"])
data, _ := json.Marshal(value)
Expand Down Expand Up @@ -225,7 +224,6 @@ func pprofInit(root *mux.Router) {
}

func main() {

cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Version = sarama.V2_4_0_0
Expand Down
4 changes: 3 additions & 1 deletion examples/7-redis/codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import "encoding/json"
import (
"encoding/json"
)

type Codec struct{}

Expand Down
14 changes: 6 additions & 8 deletions examples/8-monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ func process(ctx goka.Context, msg interface{}) {
ctx.SetValue(u)
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error {
g := goka.DefineGroup(group+"-stateless",
goka.Input(topic,
new(codec.String),
func(ctx goka.Context, msg interface{}) {
//ignored
// ignored
}),
)
p, err := goka.NewProcessor(brokers, g)
Expand All @@ -124,8 +125,8 @@ func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error {

func runJoinProcessor(ctx context.Context,
monitor *monitor.Server,
joinGroupInitialized chan struct{}) error {

joinGroupInitialized chan struct{},
) error {
g := goka.DefineGroup(joinGroup,
goka.Input(topic,
new(codec.String),
Expand Down Expand Up @@ -160,11 +161,10 @@ func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
actions *actions.Server,
joinGroupInitialized chan struct{}) error {

joinGroupInitialized chan struct{},
) error {
// helper function that waits the configured number of times
waitVisitor := func(ctx goka.Context, value interface{}) {

waitTime, ok := value.(int64)
if !ok {
return
Expand Down Expand Up @@ -245,7 +245,6 @@ func runView(errg *multierr.ErrGroup, ctx context.Context, root *mux.Router, mon
server := &http.Server{Addr: ":9095", Handler: root}

errg.Go(func() error {

root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
value, _ := view.Get(mux.Vars(r)["key"])
data, _ := json.Marshal(value)
Expand Down Expand Up @@ -287,7 +286,6 @@ func pprofInit(root *mux.Router) {
}

func main() {

cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Version = sarama.V2_4_0_0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/lovoo/goka
go 1.20

require (
github.com/IBM/sarama v1.41.3
github.com/IBM/sarama v1.41.2
github.com/go-stack/stack v1.8.1
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c=
github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
Loading
Loading