diff --git a/codec.go b/codec.go index 832c20be..6ab1b2eb 100644 --- a/codec.go +++ b/codec.go @@ -1,7 +1,42 @@ package goka +import ( + "io" + + "github.com/lovoo/goka/codec" +) + // Codec decodes and encodes from and to []byte type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) } + +type CodecP interface { + Codec + + DecodeP(data []byte) (value interface{}, closer io.Closer, err error) +} + +// CloserFunc implements io.Closer-interface for convenience when wrapping functions +type CloserFunc func() error + +func (f CloserFunc) Close() error { + return f() +} + +type codecWrapper struct { + Codec +} + +func (cw *codecWrapper) DecodeP(data []byte) (value interface{}, closer io.Closer, err error) { + val, err := cw.Codec.Decode(data) + return val, codec.NoopCloser, err +} + +func convertOrFakeCodec(c Codec) CodecP { + if cp, ok := c.(CodecP); ok { + return cp + } + return &codecWrapper{Codec: c} +} diff --git a/codec/closer.go b/codec/closer.go new file mode 100644 index 00000000..801fd845 --- /dev/null +++ b/codec/closer.go @@ -0,0 +1,9 @@ +package codec + +type nullCloser struct{} + +func (n *nullCloser) Close() error { return nil } + +// NoopCloser can be used for returning io.Closer interfaces, whose Close call does +// nothing. +var NoopCloser = new(nullCloser) diff --git a/context.go b/context.go index ce2ef9f9..dc5b633c 100644 --- a/context.go +++ b/context.go @@ -170,6 +170,8 @@ type cbContext struct { // tracking statistics for the output topic trackOutputStats func(ctx context.Context, topic string, size int) + deferreds []func() error + msg *message done bool counters struct { @@ -310,17 +312,25 @@ func (ctx *cbContext) Join(topic Table) interface{} { if !ok { ctx.Fail(fmt.Errorf("table %s not subscribed", topic)) } - data, err := v.st.Get(ctx.Key()) + data, getCloser, err := v.st.GetP(ctx.Key()) if err != nil { ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", ctx.Key(), topic, err)) } else if data == nil { return nil } - value, err := ctx.graph.codec(string(topic)).Decode(data) + if getCloser != nil { + ctx.addDeferred(getCloser.Close) + } + + value, decodeCloser, err := ctx.graph.codec(string(topic)).DecodeP(data) if err != nil { ctx.Fail(fmt.Errorf("error decoding value key %s of table %s: %v", ctx.Key(), topic, err)) } + if decodeCloser != nil { + ctx.addDeferred(decodeCloser.Close) + } + return value } @@ -332,10 +342,13 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} { if !ok { ctx.Fail(fmt.Errorf("topic %s not subscribed", topic)) } - val, err := v.Get(key) + val, getCloser, err := v.GetP(key) if err != nil { ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err)) } + if getCloser != nil { + ctx.addDeferred(getCloser.Close) + } return val } @@ -345,17 +358,26 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) { return nil, fmt.Errorf("Cannot access state in stateless processor") } - data, err := ctx.table.Get(key) + data, closer, err := ctx.table.Get(key) if err != nil { return nil, fmt.Errorf("error reading value: %v", err) - } else if data == nil { + } + if closer != nil { + ctx.addDeferred(closer.Close) + } + + if data == nil { return nil, nil } - value, err := ctx.graph.GroupTable().Codec().Decode(data) + value, decodeCloser, err := ctx.graph.GroupTable().Codec().DecodeP(data) if err != nil { return nil, fmt.Errorf("error decoding value: %v", err) } + if decodeCloser != nil { + ctx.addDeferred(decodeCloser.Close) + } + return value, nil } @@ -450,6 +472,14 @@ func (ctx *cbContext) tryCommit(err error) { if ctx.errors.ErrorOrNil() != nil { ctx.asyncFailer(fmt.Errorf("could not commit message with key '%s': %w", ctx.Key(), ctx.errors.ErrorOrNil())) } else { + + // execute deferred commit functions in reverse order + for i := len(ctx.deferreds) - 1; i >= 0; i-- { + if err := ctx.deferreds[i](); err != nil { + ctx.asyncFailer(fmt.Errorf("error executing context deferred: %w", err)) + } + } + ctx.commit() } @@ -483,3 +513,7 @@ func (ctx *cbContext) DeferCommit() func(err error) { }) } } + +func (ctx *cbContext) addDeferred(def func() error) { + ctx.deferreds = append(ctx.deferreds, def) +} diff --git a/context_test.go b/context_test.go index 973c173b..6e1b15cf 100644 --- a/context_test.go +++ b/context_test.go @@ -380,9 +380,9 @@ func TestContext_GetSetStateful(t *testing.T) { } ) - st.EXPECT().Get(key).Return(nil, nil) + st.EXPECT().GetP(key).Return(nil, nil, nil) st.EXPECT().Set(key, []byte(value)).Return(nil) - st.EXPECT().Get(key).Return([]byte(value), nil) + st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil) graph := DefineGroup(group, Persist(new(codec.String))) ctx := &cbContext{ @@ -537,11 +537,11 @@ func TestContext_Join(t *testing.T) { syncFailer: func(err error) { panic(err) }, } - st.EXPECT().Get(key).Return([]byte(value), nil) + st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil) v := ctx.Join(table) require.Equal(t, value, v) - st.EXPECT().Get(key).Return(nil, errSome) + st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome) require.Panics(t, func() { ctx.Join(table) }) require.Panics(t, func() { ctx.Join("other-table") }) @@ -586,11 +586,11 @@ func TestContext_Lookup(t *testing.T) { syncFailer: func(err error) { panic(err) }, } - st.EXPECT().Get(key).Return([]byte(value), nil) + st.EXPECT().GetP(key).Return([]byte(value), codec.NoopCloser, nil) v := ctx.Lookup(table, key) require.Equal(t, value, v) - st.EXPECT().Get(key).Return(nil, errSome) + st.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errSome) require.Panics(t, func() { ctx.Lookup(table, key) }) require.Panics(t, func() { ctx.Lookup("other-table", key) }) diff --git a/examples/3-messaging/blocker/blocker.go b/examples/3-messaging/blocker/blocker.go index 71d27d08..67c7ee92 100644 --- a/examples/3-messaging/blocker/blocker.go +++ b/examples/3-messaging/blocker/blocker.go @@ -3,6 +3,7 @@ package blocker import ( "context" "encoding/json" + "github.com/lovoo/goka" "github.com/lovoo/goka/examples/3-messaging/topicinit" ) diff --git a/examples/3-messaging/collector/collector.go b/examples/3-messaging/collector/collector.go index 73fd86e1..e6a68631 100644 --- a/examples/3-messaging/collector/collector.go +++ b/examples/3-messaging/collector/collector.go @@ -3,8 +3,9 @@ package collector import ( "context" "encoding/json" + "github.com/lovoo/goka" - "github.com/lovoo/goka/examples/3-messaging" + messaging "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/topicinit" ) diff --git a/examples/3-messaging/detector/detector.go b/examples/3-messaging/detector/detector.go index 706a62c3..b50642bd 100644 --- a/examples/3-messaging/detector/detector.go +++ b/examples/3-messaging/detector/detector.go @@ -3,8 +3,9 @@ package detector import ( "context" "encoding/json" + "github.com/lovoo/goka" - "github.com/lovoo/goka/examples/3-messaging" + messaging "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/blocker" "github.com/lovoo/goka/examples/3-messaging/topicinit" ) @@ -14,9 +15,7 @@ const ( maxRate = 0.5 ) -var ( - group goka.Group = "detector" -) +var group goka.Group = "detector" type Counters struct { Sent int diff --git a/examples/5-multiple/main.go b/examples/5-multiple/main.go index 26fc32d4..0f994a4b 100644 --- a/examples/5-multiple/main.go +++ b/examples/5-multiple/main.go @@ -118,8 +118,8 @@ func process(ctx goka.Context, msg interface{}) { func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server, - groupInitialized chan struct{}) error { - + groupInitialized chan struct{}, +) error { tmc := goka.NewTopicManagerConfig() tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) if err != nil { @@ -161,8 +161,8 @@ func runView(ctx context.Context, errg *multierr.ErrGroup, root *mux.Router, monitor *monitor.Server, - groupInitialized chan struct{}) error { - + groupInitialized chan struct{}, +) error { <-groupInitialized view, err := goka.NewView(brokers, @@ -183,7 +183,6 @@ func runView(ctx context.Context, server := &http.Server{Addr: ":0", Handler: root} errg.Go(func() error { - root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) { value, _ := view.Get(mux.Vars(r)["key"]) data, _ := json.Marshal(value) @@ -225,7 +224,6 @@ func pprofInit(root *mux.Router) { } func main() { - cfg := goka.DefaultConfig() cfg.Consumer.Offsets.Initial = sarama.OffsetOldest cfg.Version = sarama.V2_4_0_0 diff --git a/examples/7-redis/codec.go b/examples/7-redis/codec.go index d20dab77..7ce9237b 100644 --- a/examples/7-redis/codec.go +++ b/examples/7-redis/codec.go @@ -1,6 +1,8 @@ package main -import "encoding/json" +import ( + "encoding/json" +) type Codec struct{} diff --git a/examples/8-monitoring/main.go b/examples/8-monitoring/main.go index 1a634538..744603f4 100644 --- a/examples/8-monitoring/main.go +++ b/examples/8-monitoring/main.go @@ -103,12 +103,13 @@ func process(ctx goka.Context, msg interface{}) { ctx.SetValue(u) fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg) } + func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error { g := goka.DefineGroup(group+"-stateless", goka.Input(topic, new(codec.String), func(ctx goka.Context, msg interface{}) { - //ignored + // ignored }), ) p, err := goka.NewProcessor(brokers, g) @@ -124,8 +125,8 @@ func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error { func runJoinProcessor(ctx context.Context, monitor *monitor.Server, - joinGroupInitialized chan struct{}) error { - + joinGroupInitialized chan struct{}, +) error { g := goka.DefineGroup(joinGroup, goka.Input(topic, new(codec.String), @@ -160,11 +161,10 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server, actions *actions.Server, - joinGroupInitialized chan struct{}) error { - + joinGroupInitialized chan struct{}, +) error { // helper function that waits the configured number of times waitVisitor := func(ctx goka.Context, value interface{}) { - waitTime, ok := value.(int64) if !ok { return @@ -245,7 +245,6 @@ func runView(errg *multierr.ErrGroup, ctx context.Context, root *mux.Router, mon server := &http.Server{Addr: ":9095", Handler: root} errg.Go(func() error { - root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) { value, _ := view.Get(mux.Vars(r)["key"]) data, _ := json.Marshal(value) @@ -287,7 +286,6 @@ func pprofInit(root *mux.Router) { } func main() { - cfg := goka.DefaultConfig() cfg.Consumer.Offsets.Initial = sarama.OffsetOldest cfg.Version = sarama.V2_4_0_0 diff --git a/go.mod b/go.mod index 3d7402fe..874d3d3f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/lovoo/goka go 1.20 require ( - github.com/IBM/sarama v1.41.3 + github.com/IBM/sarama v1.41.2 github.com/go-stack/stack v1.8.1 github.com/golang/mock v1.6.0 github.com/gorilla/mux v1.8.0 diff --git a/go.sum b/go.sum index 96875f84..3f52aa7d 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c= +github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/graph.go b/graph.go index 4184b6af..ccbb89c1 100644 --- a/graph.go +++ b/graph.go @@ -67,7 +67,7 @@ type GroupGraph struct { visitors []Edge // those fields cache the info from above edges or are used to avoid naming/codec collisions - codecs map[string]Codec + codecs map[string]CodecP callbacks map[string]ProcessCallback outputStreamTopics map[Stream]struct{} @@ -121,11 +121,11 @@ func (gg *GroupGraph) OutputStreams() Edges { // AllEdges returns a list of all edges for the group graph. // This allows to modify a graph by cloning it's edges into a new one. // -// var existing Graph -// edges := existiting.AllEdges() -// // modify edges as required -// // recreate the modifiedg raph -// newGraph := DefineGroup(existing.Groug(), edges...) +// var existing Graph +// edges := existiting.AllEdges() +// // modify edges as required +// // recreate the modifiedg raph +// newGraph := DefineGroup(existing.Groug(), edges...) func (gg *GroupGraph) AllEdges() Edges { return chainEdges( gg.inputTables, @@ -153,7 +153,7 @@ func (gg *GroupGraph) copartitioned() Edges { return chainEdges(gg.inputStreams, gg.inputTables) } -func (gg *GroupGraph) codec(topic string) Codec { +func (gg *GroupGraph) codec(topic string) CodecP { return gg.codecs[topic] } @@ -170,7 +170,7 @@ func (gg *GroupGraph) joint(topic string) bool { func DefineGroup(group Group, edges ...Edge) *GroupGraph { gg := GroupGraph{ group: string(group), - codecs: make(map[string]Codec), + codecs: make(map[string]CodecP), callbacks: make(map[string]ProcessCallback), joinCheck: make(map[string]bool), outputStreamTopics: make(map[Stream]struct{}), @@ -264,7 +264,7 @@ func (gg *GroupGraph) Validate() error { type Edge interface { String() string Topic() string - Codec() Codec + Codec() CodecP } // Edges is a slice of edge objects. @@ -296,7 +296,7 @@ func (e Edges) Topics() []string { type topicDef struct { name string - codec Codec + codec CodecP } func (t *topicDef) Topic() string { @@ -307,7 +307,7 @@ func (t *topicDef) String() string { return fmt.Sprintf("%s/%T", t.name, t.codec) } -func (t *topicDef) Codec() Codec { +func (t *topicDef) Codec() CodecP { return t.codec } @@ -322,7 +322,7 @@ type inputStream struct { // the group and with the group table. // The group starts reading the topic from the newest offset. func Input(topic Stream, c Codec, cb ProcessCallback) Edge { - return &inputStream{&topicDef{string(topic), c}, cb} + return &inputStream{&topicDef{string(topic), convertOrFakeCodec(c)}, cb} } type inputStreams Edges @@ -347,7 +347,7 @@ func (is inputStreams) Topic() string { return strings.Join(topics, ",") } -func (is inputStreams) Codec() Codec { +func (is inputStreams) Codec() CodecP { if is == nil { return nil } @@ -375,9 +375,11 @@ type visitor struct { func (m *visitor) Topic() string { return m.name } -func (m *visitor) Codec() Codec { + +func (m *visitor) Codec() CodecP { return nil } + func (m *visitor) String() string { return fmt.Sprintf("visitor %s", m.name) } @@ -399,7 +401,7 @@ type loopStream inputStream // process the messages of the topic. Context.Loopback() is used to write // messages into this topic from any callback of the group. func Loop(c Codec, cb ProcessCallback) Edge { - return &loopStream{&topicDef{codec: c}, cb} + return &loopStream{&topicDef{codec: convertOrFakeCodec(c)}, cb} } func (s *loopStream) setGroup(group Group) { @@ -416,7 +418,7 @@ type inputTable struct { // The processing of input streams is blocked until all partitions of the table // are recovered. func Join(topic Table, c Codec) Edge { - return &inputTable{&topicDef{string(topic), c}} + return &inputTable{&topicDef{string(topic), convertOrFakeCodec(c)}} } type crossTable struct { @@ -429,7 +431,7 @@ type crossTable struct { // The processing of input streams is blocked until the table is fully // recovered. func Lookup(topic Table, c Codec) Edge { - return &crossTable{&topicDef{string(topic), c}} + return &crossTable{&topicDef{string(topic), convertOrFakeCodec(c)}} } type groupTable struct { @@ -448,7 +450,7 @@ type groupTable struct { // // The topic name is derived from the group name by appending "-table". func Persist(c Codec) Edge { - return &groupTable{&topicDef{codec: c}} + return &groupTable{&topicDef{codec: convertOrFakeCodec(c)}} } func (t *groupTable) setGroup(group Group) { @@ -465,7 +467,7 @@ type outputStream struct { // graph. // The topic does not have to be copartitioned with the input streams. func Output(topic Stream, c Codec) Edge { - return &outputStream{&topicDef{string(topic), c}} + return &outputStream{&topicDef{string(topic), convertOrFakeCodec(c)}} } // GroupTable returns the name of the group table of group. diff --git a/graph_test.go b/graph_test.go index bf98228a..3bbc6ef2 100644 --- a/graph_test.go +++ b/graph_test.go @@ -11,7 +11,7 @@ import ( ) var ( - c = new(codec.String) + c = convertOrFakeCodec(new(codec.String)) cb = func(ctx Context, msg interface{}) {} ) diff --git a/iterator.go b/iterator.go index 17d93534..b814d0e8 100644 --- a/iterator.go +++ b/iterator.go @@ -1,6 +1,7 @@ package goka import ( + "github.com/hashicorp/go-multierror" "github.com/lovoo/goka/storage" ) @@ -16,6 +17,8 @@ type Iterator interface { Key() string // Return the value of the current item // This value is already decoded with the view's codec (or nil, if it's nil) + // Note that the returned reference should not be changed and might be different after + // calling "Next", "Seek" or "Release". Value() (interface{}, error) // Release the iterator. After release, the iterator is not usable anymore Release() @@ -28,12 +31,16 @@ type Iterator interface { } type iterator struct { - iter storage.Iterator - codec Codec + iter storage.Iterator + + deferreds []func() error + deferredErrs error + codec CodecP } // Next advances the iterator to the next key. func (i *iterator) Next() bool { + i.runDeferred() return i.iter.Next() } @@ -50,19 +57,39 @@ func (i *iterator) Value() (interface{}, error) { } else if data == nil { return nil, nil } - return i.codec.Decode(data) + value, decodeCloser, err := i.codec.DecodeP(data) + if decodeCloser != nil { + i.deferreds = append(i.deferreds, decodeCloser.Close) + } + return value, err } // Err returns the possible iteration error. func (i *iterator) Err() error { + if i.deferredErrs != nil { + return i.deferredErrs + } return i.iter.Err() } // Releases releases the iterator. The iterator is not usable anymore after calling Release. func (i *iterator) Release() { + i.runDeferred() i.iter.Release() } func (i *iterator) Seek(key string) bool { + i.runDeferred() return i.iter.Seek([]byte(key)) } + +func (i *iterator) runDeferred() { + var err *multierror.Error + for idx, def := range i.deferreds { + err = multierror.Append(err, def()) + i.deferreds[idx] = nil // reset it for the garbage collector + } + i.deferredErrs = err.ErrorOrNil() + // reset length, but keep the slice + i.deferreds = i.deferreds[:0] +} diff --git a/iterator_test.go b/iterator_test.go index ea94a652..48cba55e 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -37,7 +37,7 @@ func TestIterator(t *testing.T) { it := &iterator{ iter: storage.NewMultiIterator([]storage.Iterator{iter}), - codec: new(codec.String), + codec: convertOrFakeCodec(&codec.String{}), } defer it.Release() count := 0 diff --git a/mockbuilder.go b/mockbuilder.go index b06f441d..557b7997 100644 --- a/mockbuilder.go +++ b/mockbuilder.go @@ -11,9 +11,7 @@ import ( "github.com/lovoo/goka/storage" ) -var ( - errProducerBuilder error = errors.New("building producer failed on purpose") -) +var errProducerBuilder error = errors.New("building producer failed on purpose") type builderMock struct { ctrl *gomock.Controller diff --git a/mocks.go b/mocks.go index 2156a138..9e78a4b0 100644 --- a/mocks.go +++ b/mocks.go @@ -5,36 +5,35 @@ package goka import ( - reflect "reflect" - sarama "github.com/IBM/sarama" gomock "github.com/golang/mock/gomock" + reflect "reflect" ) -// MockTopicManager is a mock of TopicManager interface. +// MockTopicManager is a mock of TopicManager interface type MockTopicManager struct { ctrl *gomock.Controller recorder *MockTopicManagerMockRecorder } -// MockTopicManagerMockRecorder is the mock recorder for MockTopicManager. +// MockTopicManagerMockRecorder is the mock recorder for MockTopicManager type MockTopicManagerMockRecorder struct { mock *MockTopicManager } -// NewMockTopicManager creates a new mock instance. +// NewMockTopicManager creates a new mock instance func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager { mock := &MockTopicManager{ctrl: ctrl} mock.recorder = &MockTopicManagerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder { return m.recorder } -// Close mocks base method. +// Close mocks base method func (m *MockTopicManager) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -42,13 +41,13 @@ func (m *MockTopicManager) Close() error { return ret0 } -// Close indicates an expected call of Close. +// Close indicates an expected call of Close func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockTopicManager)(nil).Close)) } -// EnsureStreamExists mocks base method. +// EnsureStreamExists mocks base method func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EnsureStreamExists", arg0, arg1) @@ -56,13 +55,13 @@ func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error { return ret0 } -// EnsureStreamExists indicates an expected call of EnsureStreamExists. +// EnsureStreamExists indicates an expected call of EnsureStreamExists func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureStreamExists", reflect.TypeOf((*MockTopicManager)(nil).EnsureStreamExists), arg0, arg1) } -// EnsureTableExists mocks base method. +// EnsureTableExists mocks base method func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EnsureTableExists", arg0, arg1) @@ -70,13 +69,13 @@ func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error { return ret0 } -// EnsureTableExists indicates an expected call of EnsureTableExists. +// EnsureTableExists indicates an expected call of EnsureTableExists func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureTableExists", reflect.TypeOf((*MockTopicManager)(nil).EnsureTableExists), arg0, arg1) } -// EnsureTopicExists mocks base method. +// EnsureTopicExists mocks base method func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EnsureTopicExists", arg0, arg1, arg2, arg3) @@ -84,13 +83,13 @@ func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 m return ret0 } -// EnsureTopicExists indicates an expected call of EnsureTopicExists. +// EnsureTopicExists indicates an expected call of EnsureTopicExists func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureTopicExists", reflect.TypeOf((*MockTopicManager)(nil).EnsureTopicExists), arg0, arg1, arg2, arg3) } -// GetOffset mocks base method. +// GetOffset mocks base method func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetOffset", arg0, arg1, arg2) @@ -99,13 +98,13 @@ func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64 return ret0, ret1 } -// GetOffset indicates an expected call of GetOffset. +// GetOffset indicates an expected call of GetOffset func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffset", reflect.TypeOf((*MockTopicManager)(nil).GetOffset), arg0, arg1, arg2) } -// Partitions mocks base method. +// Partitions mocks base method func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Partitions", arg0) @@ -114,36 +113,36 @@ func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error) { return ret0, ret1 } -// Partitions indicates an expected call of Partitions. +// Partitions indicates an expected call of Partitions func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Partitions", reflect.TypeOf((*MockTopicManager)(nil).Partitions), arg0) } -// MockProducer is a mock of Producer interface. +// MockProducer is a mock of Producer interface type MockProducer struct { ctrl *gomock.Controller recorder *MockProducerMockRecorder } -// MockProducerMockRecorder is the mock recorder for MockProducer. +// MockProducerMockRecorder is the mock recorder for MockProducer type MockProducerMockRecorder struct { mock *MockProducer } -// NewMockProducer creates a new mock instance. +// NewMockProducer creates a new mock instance func NewMockProducer(ctrl *gomock.Controller) *MockProducer { mock := &MockProducer{ctrl: ctrl} mock.recorder = &MockProducerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockProducer) EXPECT() *MockProducerMockRecorder { return m.recorder } -// Close mocks base method. +// Close mocks base method func (m *MockProducer) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -151,13 +150,13 @@ func (m *MockProducer) Close() error { return ret0 } -// Close indicates an expected call of Close. +// Close indicates an expected call of Close func (mr *MockProducerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockProducer)(nil).Close)) } -// Emit mocks base method. +// Emit mocks base method func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Emit", arg0, arg1, arg2) @@ -165,13 +164,13 @@ func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise { return ret0 } -// Emit indicates an expected call of Emit. +// Emit indicates an expected call of Emit func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Emit", reflect.TypeOf((*MockProducer)(nil).Emit), arg0, arg1, arg2) } -// EmitWithHeaders mocks base method. +// EmitWithHeaders mocks base method func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 Headers) *Promise { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EmitWithHeaders", arg0, arg1, arg2, arg3) @@ -179,36 +178,36 @@ func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 Head return ret0 } -// EmitWithHeaders indicates an expected call of EmitWithHeaders. +// EmitWithHeaders indicates an expected call of EmitWithHeaders func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EmitWithHeaders", reflect.TypeOf((*MockProducer)(nil).EmitWithHeaders), arg0, arg1, arg2, arg3) } -// MockBroker is a mock of Broker interface. +// MockBroker is a mock of Broker interface type MockBroker struct { ctrl *gomock.Controller recorder *MockBrokerMockRecorder } -// MockBrokerMockRecorder is the mock recorder for MockBroker. +// MockBrokerMockRecorder is the mock recorder for MockBroker type MockBrokerMockRecorder struct { mock *MockBroker } -// NewMockBroker creates a new mock instance. +// NewMockBroker creates a new mock instance func NewMockBroker(ctrl *gomock.Controller) *MockBroker { mock := &MockBroker{ctrl: ctrl} mock.recorder = &MockBrokerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockBroker) EXPECT() *MockBrokerMockRecorder { return m.recorder } -// Addr mocks base method. +// Addr mocks base method func (m *MockBroker) Addr() string { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Addr") @@ -216,13 +215,13 @@ func (m *MockBroker) Addr() string { return ret0 } -// Addr indicates an expected call of Addr. +// Addr indicates an expected call of Addr func (mr *MockBrokerMockRecorder) Addr() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addr", reflect.TypeOf((*MockBroker)(nil).Addr)) } -// Connected mocks base method. +// Connected mocks base method func (m *MockBroker) Connected() (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Connected") @@ -231,13 +230,13 @@ func (m *MockBroker) Connected() (bool, error) { return ret0, ret1 } -// Connected indicates an expected call of Connected. +// Connected indicates an expected call of Connected func (mr *MockBrokerMockRecorder) Connected() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connected", reflect.TypeOf((*MockBroker)(nil).Connected)) } -// CreateTopics mocks base method. +// CreateTopics mocks base method func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateTopics", arg0) @@ -246,13 +245,13 @@ func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.Cre return ret0, ret1 } -// CreateTopics indicates an expected call of CreateTopics. +// CreateTopics indicates an expected call of CreateTopics func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopics", reflect.TypeOf((*MockBroker)(nil).CreateTopics), arg0) } -// Open mocks base method. +// Open mocks base method func (m *MockBroker) Open(arg0 *sarama.Config) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Open", arg0) @@ -260,7 +259,7 @@ func (m *MockBroker) Open(arg0 *sarama.Config) error { return ret0 } -// Open indicates an expected call of Open. +// Open indicates an expected call of Open func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockBroker)(nil).Open), arg0) diff --git a/mockssarama.go b/mockssarama.go index 8eb6d673..55db7ff3 100644 --- a/mockssarama.go +++ b/mockssarama.go @@ -5,36 +5,35 @@ package goka import ( - reflect "reflect" - sarama "github.com/IBM/sarama" gomock "github.com/golang/mock/gomock" + reflect "reflect" ) -// MockClient is a mock of Client interface. +// MockClient is a mock of Client interface type MockClient struct { ctrl *gomock.Controller recorder *MockClientMockRecorder } -// MockClientMockRecorder is the mock recorder for MockClient. +// MockClientMockRecorder is the mock recorder for MockClient type MockClientMockRecorder struct { mock *MockClient } -// NewMockClient creates a new mock instance. +// NewMockClient creates a new mock instance func NewMockClient(ctrl *gomock.Controller) *MockClient { mock := &MockClient{ctrl: ctrl} mock.recorder = &MockClientMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockClient) EXPECT() *MockClientMockRecorder { return m.recorder } -// Broker mocks base method. +// Broker mocks base method func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Broker", arg0) @@ -43,13 +42,13 @@ func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error) { return ret0, ret1 } -// Broker indicates an expected call of Broker. +// Broker indicates an expected call of Broker func (mr *MockClientMockRecorder) Broker(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broker", reflect.TypeOf((*MockClient)(nil).Broker), arg0) } -// Brokers mocks base method. +// Brokers mocks base method func (m *MockClient) Brokers() []*sarama.Broker { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Brokers") @@ -57,13 +56,13 @@ func (m *MockClient) Brokers() []*sarama.Broker { return ret0 } -// Brokers indicates an expected call of Brokers. +// Brokers indicates an expected call of Brokers func (mr *MockClientMockRecorder) Brokers() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Brokers", reflect.TypeOf((*MockClient)(nil).Brokers)) } -// Close mocks base method. +// Close mocks base method func (m *MockClient) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -71,13 +70,13 @@ func (m *MockClient) Close() error { return ret0 } -// Close indicates an expected call of Close. +// Close indicates an expected call of Close func (mr *MockClientMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) } -// Closed mocks base method. +// Closed mocks base method func (m *MockClient) Closed() bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Closed") @@ -85,13 +84,13 @@ func (m *MockClient) Closed() bool { return ret0 } -// Closed indicates an expected call of Closed. +// Closed indicates an expected call of Closed func (mr *MockClientMockRecorder) Closed() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Closed", reflect.TypeOf((*MockClient)(nil).Closed)) } -// Config mocks base method. +// Config mocks base method func (m *MockClient) Config() *sarama.Config { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Config") @@ -99,13 +98,13 @@ func (m *MockClient) Config() *sarama.Config { return ret0 } -// Config indicates an expected call of Config. +// Config indicates an expected call of Config func (mr *MockClientMockRecorder) Config() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Config", reflect.TypeOf((*MockClient)(nil).Config)) } -// Controller mocks base method. +// Controller mocks base method func (m *MockClient) Controller() (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Controller") @@ -114,13 +113,13 @@ func (m *MockClient) Controller() (*sarama.Broker, error) { return ret0, ret1 } -// Controller indicates an expected call of Controller. +// Controller indicates an expected call of Controller func (mr *MockClientMockRecorder) Controller() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Controller", reflect.TypeOf((*MockClient)(nil).Controller)) } -// Coordinator mocks base method. +// Coordinator mocks base method func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Coordinator", arg0) @@ -129,13 +128,13 @@ func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error) { return ret0, ret1 } -// Coordinator indicates an expected call of Coordinator. +// Coordinator indicates an expected call of Coordinator func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Coordinator", reflect.TypeOf((*MockClient)(nil).Coordinator), arg0) } -// GetOffset mocks base method. +// GetOffset mocks base method func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetOffset", arg0, arg1, arg2) @@ -144,13 +143,13 @@ func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, erro return ret0, ret1 } -// GetOffset indicates an expected call of GetOffset. +// GetOffset indicates an expected call of GetOffset func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffset", reflect.TypeOf((*MockClient)(nil).GetOffset), arg0, arg1, arg2) } -// InSyncReplicas mocks base method. +// InSyncReplicas mocks base method func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InSyncReplicas", arg0, arg1) @@ -159,13 +158,13 @@ func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error) { return ret0, ret1 } -// InSyncReplicas indicates an expected call of InSyncReplicas. +// InSyncReplicas indicates an expected call of InSyncReplicas func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InSyncReplicas", reflect.TypeOf((*MockClient)(nil).InSyncReplicas), arg0, arg1) } -// InitProducerID mocks base method. +// InitProducerID mocks base method func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InitProducerID") @@ -174,13 +173,13 @@ func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error) { return ret0, ret1 } -// InitProducerID indicates an expected call of InitProducerID. +// InitProducerID indicates an expected call of InitProducerID func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitProducerID", reflect.TypeOf((*MockClient)(nil).InitProducerID)) } -// Leader mocks base method. +// Leader mocks base method func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Leader", arg0, arg1) @@ -189,13 +188,13 @@ func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error) { return ret0, ret1 } -// Leader indicates an expected call of Leader. +// Leader indicates an expected call of Leader func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Leader", reflect.TypeOf((*MockClient)(nil).Leader), arg0, arg1) } -// LeaderAndEpoch mocks base method. +// LeaderAndEpoch mocks base method func (m *MockClient) LeaderAndEpoch(arg0 string, arg1 int32) (*sarama.Broker, int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LeaderAndEpoch", arg0, arg1) @@ -205,13 +204,13 @@ func (m *MockClient) LeaderAndEpoch(arg0 string, arg1 int32) (*sarama.Broker, in return ret0, ret1, ret2 } -// LeaderAndEpoch indicates an expected call of LeaderAndEpoch. +// LeaderAndEpoch indicates an expected call of LeaderAndEpoch func (mr *MockClientMockRecorder) LeaderAndEpoch(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LeaderAndEpoch", reflect.TypeOf((*MockClient)(nil).LeaderAndEpoch), arg0, arg1) } -// LeastLoadedBroker mocks base method. +// LeastLoadedBroker mocks base method func (m *MockClient) LeastLoadedBroker() *sarama.Broker { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LeastLoadedBroker") @@ -219,13 +218,13 @@ func (m *MockClient) LeastLoadedBroker() *sarama.Broker { return ret0 } -// LeastLoadedBroker indicates an expected call of LeastLoadedBroker. +// LeastLoadedBroker indicates an expected call of LeastLoadedBroker func (mr *MockClientMockRecorder) LeastLoadedBroker() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LeastLoadedBroker", reflect.TypeOf((*MockClient)(nil).LeastLoadedBroker)) } -// OfflineReplicas mocks base method. +// OfflineReplicas mocks base method func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "OfflineReplicas", arg0, arg1) @@ -234,13 +233,13 @@ func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error) { return ret0, ret1 } -// OfflineReplicas indicates an expected call of OfflineReplicas. +// OfflineReplicas indicates an expected call of OfflineReplicas func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OfflineReplicas", reflect.TypeOf((*MockClient)(nil).OfflineReplicas), arg0, arg1) } -// Partitions mocks base method. +// Partitions mocks base method func (m *MockClient) Partitions(arg0 string) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Partitions", arg0) @@ -249,13 +248,13 @@ func (m *MockClient) Partitions(arg0 string) ([]int32, error) { return ret0, ret1 } -// Partitions indicates an expected call of Partitions. +// Partitions indicates an expected call of Partitions func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Partitions", reflect.TypeOf((*MockClient)(nil).Partitions), arg0) } -// RefreshBrokers mocks base method. +// RefreshBrokers mocks base method func (m *MockClient) RefreshBrokers(arg0 []string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RefreshBrokers", arg0) @@ -263,13 +262,13 @@ func (m *MockClient) RefreshBrokers(arg0 []string) error { return ret0 } -// RefreshBrokers indicates an expected call of RefreshBrokers. +// RefreshBrokers indicates an expected call of RefreshBrokers func (mr *MockClientMockRecorder) RefreshBrokers(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshBrokers", reflect.TypeOf((*MockClient)(nil).RefreshBrokers), arg0) } -// RefreshController mocks base method. +// RefreshController mocks base method func (m *MockClient) RefreshController() (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RefreshController") @@ -278,13 +277,13 @@ func (m *MockClient) RefreshController() (*sarama.Broker, error) { return ret0, ret1 } -// RefreshController indicates an expected call of RefreshController. +// RefreshController indicates an expected call of RefreshController func (mr *MockClientMockRecorder) RefreshController() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshController", reflect.TypeOf((*MockClient)(nil).RefreshController)) } -// RefreshCoordinator mocks base method. +// RefreshCoordinator mocks base method func (m *MockClient) RefreshCoordinator(arg0 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RefreshCoordinator", arg0) @@ -292,13 +291,13 @@ func (m *MockClient) RefreshCoordinator(arg0 string) error { return ret0 } -// RefreshCoordinator indicates an expected call of RefreshCoordinator. +// RefreshCoordinator indicates an expected call of RefreshCoordinator func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshCoordinator", reflect.TypeOf((*MockClient)(nil).RefreshCoordinator), arg0) } -// RefreshMetadata mocks base method. +// RefreshMetadata mocks base method func (m *MockClient) RefreshMetadata(arg0 ...string) error { m.ctrl.T.Helper() varargs := []interface{}{} @@ -310,13 +309,13 @@ func (m *MockClient) RefreshMetadata(arg0 ...string) error { return ret0 } -// RefreshMetadata indicates an expected call of RefreshMetadata. +// RefreshMetadata indicates an expected call of RefreshMetadata func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshMetadata", reflect.TypeOf((*MockClient)(nil).RefreshMetadata), arg0...) } -// RefreshTransactionCoordinator mocks base method. +// RefreshTransactionCoordinator mocks base method func (m *MockClient) RefreshTransactionCoordinator(arg0 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RefreshTransactionCoordinator", arg0) @@ -324,13 +323,13 @@ func (m *MockClient) RefreshTransactionCoordinator(arg0 string) error { return ret0 } -// RefreshTransactionCoordinator indicates an expected call of RefreshTransactionCoordinator. +// RefreshTransactionCoordinator indicates an expected call of RefreshTransactionCoordinator func (mr *MockClientMockRecorder) RefreshTransactionCoordinator(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshTransactionCoordinator", reflect.TypeOf((*MockClient)(nil).RefreshTransactionCoordinator), arg0) } -// Replicas mocks base method. +// Replicas mocks base method func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Replicas", arg0, arg1) @@ -339,13 +338,13 @@ func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error) { return ret0, ret1 } -// Replicas indicates an expected call of Replicas. +// Replicas indicates an expected call of Replicas func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Replicas", reflect.TypeOf((*MockClient)(nil).Replicas), arg0, arg1) } -// Topics mocks base method. +// Topics mocks base method func (m *MockClient) Topics() ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Topics") @@ -354,13 +353,13 @@ func (m *MockClient) Topics() ([]string, error) { return ret0, ret1 } -// Topics indicates an expected call of Topics. +// Topics indicates an expected call of Topics func (mr *MockClientMockRecorder) Topics() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Topics", reflect.TypeOf((*MockClient)(nil).Topics)) } -// TransactionCoordinator mocks base method. +// TransactionCoordinator mocks base method func (m *MockClient) TransactionCoordinator(arg0 string) (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TransactionCoordinator", arg0) @@ -369,13 +368,13 @@ func (m *MockClient) TransactionCoordinator(arg0 string) (*sarama.Broker, error) return ret0, ret1 } -// TransactionCoordinator indicates an expected call of TransactionCoordinator. +// TransactionCoordinator indicates an expected call of TransactionCoordinator func (mr *MockClientMockRecorder) TransactionCoordinator(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionCoordinator", reflect.TypeOf((*MockClient)(nil).TransactionCoordinator), arg0) } -// WritablePartitions mocks base method. +// WritablePartitions mocks base method func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WritablePartitions", arg0) @@ -384,36 +383,36 @@ func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error) { return ret0, ret1 } -// WritablePartitions indicates an expected call of WritablePartitions. +// WritablePartitions indicates an expected call of WritablePartitions func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritablePartitions", reflect.TypeOf((*MockClient)(nil).WritablePartitions), arg0) } -// MockClusterAdmin is a mock of ClusterAdmin interface. +// MockClusterAdmin is a mock of ClusterAdmin interface type MockClusterAdmin struct { ctrl *gomock.Controller recorder *MockClusterAdminMockRecorder } -// MockClusterAdminMockRecorder is the mock recorder for MockClusterAdmin. +// MockClusterAdminMockRecorder is the mock recorder for MockClusterAdmin type MockClusterAdminMockRecorder struct { mock *MockClusterAdmin } -// NewMockClusterAdmin creates a new mock instance. +// NewMockClusterAdmin creates a new mock instance func NewMockClusterAdmin(ctrl *gomock.Controller) *MockClusterAdmin { mock := &MockClusterAdmin{ctrl: ctrl} mock.recorder = &MockClusterAdminMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockClusterAdmin) EXPECT() *MockClusterAdminMockRecorder { return m.recorder } -// AlterClientQuotas mocks base method. +// AlterClientQuotas mocks base method func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, arg1 sarama.ClientQuotasOp, arg2 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AlterClientQuotas", arg0, arg1, arg2) @@ -421,13 +420,13 @@ func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, return ret0 } -// AlterClientQuotas indicates an expected call of AlterClientQuotas. +// AlterClientQuotas indicates an expected call of AlterClientQuotas func (mr *MockClusterAdminMockRecorder) AlterClientQuotas(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterClientQuotas", reflect.TypeOf((*MockClusterAdmin)(nil).AlterClientQuotas), arg0, arg1, arg2) } -// AlterConfig mocks base method. +// AlterConfig mocks base method func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]*string, arg3 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AlterConfig", arg0, arg1, arg2, arg3) @@ -435,13 +434,13 @@ func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 stri return ret0 } -// AlterConfig indicates an expected call of AlterConfig. +// AlterConfig indicates an expected call of AlterConfig func (mr *MockClusterAdminMockRecorder) AlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterConfig", reflect.TypeOf((*MockClusterAdmin)(nil).AlterConfig), arg0, arg1, arg2, arg3) } -// AlterPartitionReassignments mocks base method. +// AlterPartitionReassignments mocks base method func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int32) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AlterPartitionReassignments", arg0, arg1) @@ -449,13 +448,13 @@ func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int return ret0 } -// AlterPartitionReassignments indicates an expected call of AlterPartitionReassignments. +// AlterPartitionReassignments indicates an expected call of AlterPartitionReassignments func (mr *MockClusterAdminMockRecorder) AlterPartitionReassignments(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterPartitionReassignments", reflect.TypeOf((*MockClusterAdmin)(nil).AlterPartitionReassignments), arg0, arg1) } -// Close mocks base method. +// Close mocks base method func (m *MockClusterAdmin) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -463,13 +462,13 @@ func (m *MockClusterAdmin) Close() error { return ret0 } -// Close indicates an expected call of Close. +// Close indicates an expected call of Close func (mr *MockClusterAdminMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClusterAdmin)(nil).Close)) } -// Controller mocks base method. +// Controller mocks base method func (m *MockClusterAdmin) Controller() (*sarama.Broker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Controller") @@ -478,13 +477,13 @@ func (m *MockClusterAdmin) Controller() (*sarama.Broker, error) { return ret0, ret1 } -// Controller indicates an expected call of Controller. +// Controller indicates an expected call of Controller func (mr *MockClusterAdminMockRecorder) Controller() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Controller", reflect.TypeOf((*MockClusterAdmin)(nil).Controller)) } -// CreateACL mocks base method. +// CreateACL mocks base method func (m *MockClusterAdmin) CreateACL(arg0 sarama.Resource, arg1 sarama.Acl) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateACL", arg0, arg1) @@ -492,13 +491,13 @@ func (m *MockClusterAdmin) CreateACL(arg0 sarama.Resource, arg1 sarama.Acl) erro return ret0 } -// CreateACL indicates an expected call of CreateACL. +// CreateACL indicates an expected call of CreateACL func (mr *MockClusterAdminMockRecorder) CreateACL(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateACL", reflect.TypeOf((*MockClusterAdmin)(nil).CreateACL), arg0, arg1) } -// CreateACLs mocks base method. +// CreateACLs mocks base method func (m *MockClusterAdmin) CreateACLs(arg0 []*sarama.ResourceAcls) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateACLs", arg0) @@ -506,13 +505,13 @@ func (m *MockClusterAdmin) CreateACLs(arg0 []*sarama.ResourceAcls) error { return ret0 } -// CreateACLs indicates an expected call of CreateACLs. +// CreateACLs indicates an expected call of CreateACLs func (mr *MockClusterAdminMockRecorder) CreateACLs(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateACLs", reflect.TypeOf((*MockClusterAdmin)(nil).CreateACLs), arg0) } -// CreatePartitions mocks base method. +// CreatePartitions mocks base method func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]int32, arg3 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreatePartitions", arg0, arg1, arg2, arg3) @@ -520,13 +519,13 @@ func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]in return ret0 } -// CreatePartitions indicates an expected call of CreatePartitions. +// CreatePartitions indicates an expected call of CreatePartitions func (mr *MockClusterAdminMockRecorder) CreatePartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreatePartitions", reflect.TypeOf((*MockClusterAdmin)(nil).CreatePartitions), arg0, arg1, arg2, arg3) } -// CreateTopic mocks base method. +// CreateTopic mocks base method func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, arg2 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateTopic", arg0, arg1, arg2) @@ -534,13 +533,13 @@ func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, ar return ret0 } -// CreateTopic indicates an expected call of CreateTopic. +// CreateTopic indicates an expected call of CreateTopic func (mr *MockClusterAdminMockRecorder) CreateTopic(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockClusterAdmin)(nil).CreateTopic), arg0, arg1, arg2) } -// DeleteACL mocks base method. +// DeleteACL mocks base method func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama.MatchingAcl, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteACL", arg0, arg1) @@ -549,13 +548,13 @@ func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama return ret0, ret1 } -// DeleteACL indicates an expected call of DeleteACL. +// DeleteACL indicates an expected call of DeleteACL func (mr *MockClusterAdminMockRecorder) DeleteACL(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteACL", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteACL), arg0, arg1) } -// DeleteConsumerGroup mocks base method. +// DeleteConsumerGroup mocks base method func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteConsumerGroup", arg0) @@ -563,13 +562,13 @@ func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error { return ret0 } -// DeleteConsumerGroup indicates an expected call of DeleteConsumerGroup. +// DeleteConsumerGroup indicates an expected call of DeleteConsumerGroup func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroup(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteConsumerGroup", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteConsumerGroup), arg0) } -// DeleteConsumerGroupOffset mocks base method. +// DeleteConsumerGroupOffset mocks base method func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int32) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteConsumerGroupOffset", arg0, arg1, arg2) @@ -577,13 +576,13 @@ func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int return ret0 } -// DeleteConsumerGroupOffset indicates an expected call of DeleteConsumerGroupOffset. +// DeleteConsumerGroupOffset indicates an expected call of DeleteConsumerGroupOffset func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroupOffset(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteConsumerGroupOffset", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteConsumerGroupOffset), arg0, arg1, arg2) } -// DeleteRecords mocks base method. +// DeleteRecords mocks base method func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteRecords", arg0, arg1) @@ -591,13 +590,13 @@ func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) erro return ret0 } -// DeleteRecords indicates an expected call of DeleteRecords. +// DeleteRecords indicates an expected call of DeleteRecords func (mr *MockClusterAdminMockRecorder) DeleteRecords(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteRecords", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteRecords), arg0, arg1) } -// DeleteTopic mocks base method. +// DeleteTopic mocks base method func (m *MockClusterAdmin) DeleteTopic(arg0 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteTopic", arg0) @@ -605,13 +604,13 @@ func (m *MockClusterAdmin) DeleteTopic(arg0 string) error { return ret0 } -// DeleteTopic indicates an expected call of DeleteTopic. +// DeleteTopic indicates an expected call of DeleteTopic func (mr *MockClusterAdminMockRecorder) DeleteTopic(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteTopic), arg0) } -// DeleteUserScramCredentials mocks base method. +// DeleteUserScramCredentials mocks base method func (m *MockClusterAdmin) DeleteUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteUserScramCredentials", arg0) @@ -620,13 +619,13 @@ func (m *MockClusterAdmin) DeleteUserScramCredentials(arg0 []sarama.AlterUserScr return ret0, ret1 } -// DeleteUserScramCredentials indicates an expected call of DeleteUserScramCredentials. +// DeleteUserScramCredentials indicates an expected call of DeleteUserScramCredentials func (mr *MockClusterAdminMockRecorder) DeleteUserScramCredentials(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUserScramCredentials", reflect.TypeOf((*MockClusterAdmin)(nil).DeleteUserScramCredentials), arg0) } -// DescribeClientQuotas mocks base method. +// DescribeClientQuotas mocks base method func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterComponent, arg1 bool) ([]sarama.DescribeClientQuotasEntry, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeClientQuotas", arg0, arg1) @@ -635,13 +634,13 @@ func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterCompone return ret0, ret1 } -// DescribeClientQuotas indicates an expected call of DescribeClientQuotas. +// DescribeClientQuotas indicates an expected call of DescribeClientQuotas func (mr *MockClusterAdminMockRecorder) DescribeClientQuotas(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeClientQuotas", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeClientQuotas), arg0, arg1) } -// DescribeCluster mocks base method. +// DescribeCluster mocks base method func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeCluster") @@ -651,13 +650,13 @@ func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) { return ret0, ret1, ret2 } -// DescribeCluster indicates an expected call of DescribeCluster. +// DescribeCluster indicates an expected call of DescribeCluster func (mr *MockClusterAdminMockRecorder) DescribeCluster() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeCluster", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeCluster)) } -// DescribeConfig mocks base method. +// DescribeConfig mocks base method func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama.ConfigEntry, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeConfig", arg0) @@ -666,13 +665,13 @@ func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama. return ret0, ret1 } -// DescribeConfig indicates an expected call of DescribeConfig. +// DescribeConfig indicates an expected call of DescribeConfig func (mr *MockClusterAdminMockRecorder) DescribeConfig(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeConfig", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeConfig), arg0) } -// DescribeConsumerGroups mocks base method. +// DescribeConsumerGroups mocks base method func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.GroupDescription, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeConsumerGroups", arg0) @@ -681,13 +680,13 @@ func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.Grou return ret0, ret1 } -// DescribeConsumerGroups indicates an expected call of DescribeConsumerGroups. +// DescribeConsumerGroups indicates an expected call of DescribeConsumerGroups func (mr *MockClusterAdminMockRecorder) DescribeConsumerGroups(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeConsumerGroups", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeConsumerGroups), arg0) } -// DescribeLogDirs mocks base method. +// DescribeLogDirs mocks base method func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeLogDirs", arg0) @@ -696,13 +695,13 @@ func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.Des return ret0, ret1 } -// DescribeLogDirs indicates an expected call of DescribeLogDirs. +// DescribeLogDirs indicates an expected call of DescribeLogDirs func (mr *MockClusterAdminMockRecorder) DescribeLogDirs(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeLogDirs", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeLogDirs), arg0) } -// DescribeTopics mocks base method. +// DescribeTopics mocks base method func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadata, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeTopics", arg0) @@ -711,13 +710,13 @@ func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadat return ret0, ret1 } -// DescribeTopics indicates an expected call of DescribeTopics. +// DescribeTopics indicates an expected call of DescribeTopics func (mr *MockClusterAdminMockRecorder) DescribeTopics(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeTopics", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeTopics), arg0) } -// DescribeUserScramCredentials mocks base method. +// DescribeUserScramCredentials mocks base method func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*sarama.DescribeUserScramCredentialsResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeUserScramCredentials", arg0) @@ -726,13 +725,13 @@ func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*saram return ret0, ret1 } -// DescribeUserScramCredentials indicates an expected call of DescribeUserScramCredentials. +// DescribeUserScramCredentials indicates an expected call of DescribeUserScramCredentials func (mr *MockClusterAdminMockRecorder) DescribeUserScramCredentials(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeUserScramCredentials", reflect.TypeOf((*MockClusterAdmin)(nil).DescribeUserScramCredentials), arg0) } -// IncrementalAlterConfig mocks base method. +// IncrementalAlterConfig mocks base method func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]sarama.IncrementalAlterConfigsEntry, arg3 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IncrementalAlterConfig", arg0, arg1, arg2, arg3) @@ -740,13 +739,13 @@ func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType return ret0 } -// IncrementalAlterConfig indicates an expected call of IncrementalAlterConfig. +// IncrementalAlterConfig indicates an expected call of IncrementalAlterConfig func (mr *MockClusterAdminMockRecorder) IncrementalAlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncrementalAlterConfig", reflect.TypeOf((*MockClusterAdmin)(nil).IncrementalAlterConfig), arg0, arg1, arg2, arg3) } -// ListAcls mocks base method. +// ListAcls mocks base method func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcls, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListAcls", arg0) @@ -755,13 +754,13 @@ func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcl return ret0, ret1 } -// ListAcls indicates an expected call of ListAcls. +// ListAcls indicates an expected call of ListAcls func (mr *MockClusterAdminMockRecorder) ListAcls(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAcls", reflect.TypeOf((*MockClusterAdmin)(nil).ListAcls), arg0) } -// ListConsumerGroupOffsets mocks base method. +// ListConsumerGroupOffsets mocks base method func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string][]int32) (*sarama.OffsetFetchResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListConsumerGroupOffsets", arg0, arg1) @@ -770,13 +769,13 @@ func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string return ret0, ret1 } -// ListConsumerGroupOffsets indicates an expected call of ListConsumerGroupOffsets. +// ListConsumerGroupOffsets indicates an expected call of ListConsumerGroupOffsets func (mr *MockClusterAdminMockRecorder) ListConsumerGroupOffsets(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConsumerGroupOffsets", reflect.TypeOf((*MockClusterAdmin)(nil).ListConsumerGroupOffsets), arg0, arg1) } -// ListConsumerGroups mocks base method. +// ListConsumerGroups mocks base method func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListConsumerGroups") @@ -785,13 +784,13 @@ func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) { return ret0, ret1 } -// ListConsumerGroups indicates an expected call of ListConsumerGroups. +// ListConsumerGroups indicates an expected call of ListConsumerGroups func (mr *MockClusterAdminMockRecorder) ListConsumerGroups() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConsumerGroups", reflect.TypeOf((*MockClusterAdmin)(nil).ListConsumerGroups)) } -// ListPartitionReassignments mocks base method. +// ListPartitionReassignments mocks base method func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPartitionReassignments", arg0, arg1) @@ -800,13 +799,13 @@ func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) return ret0, ret1 } -// ListPartitionReassignments indicates an expected call of ListPartitionReassignments. +// ListPartitionReassignments indicates an expected call of ListPartitionReassignments func (mr *MockClusterAdminMockRecorder) ListPartitionReassignments(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPartitionReassignments", reflect.TypeOf((*MockClusterAdmin)(nil).ListPartitionReassignments), arg0, arg1) } -// ListTopics mocks base method. +// ListTopics mocks base method func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListTopics") @@ -815,13 +814,13 @@ func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) { return ret0, ret1 } -// ListTopics indicates an expected call of ListTopics. +// ListTopics indicates an expected call of ListTopics func (mr *MockClusterAdminMockRecorder) ListTopics() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopics", reflect.TypeOf((*MockClusterAdmin)(nil).ListTopics)) } -// RemoveMemberFromConsumerGroup mocks base method. +// RemoveMemberFromConsumerGroup mocks base method func (m *MockClusterAdmin) RemoveMemberFromConsumerGroup(arg0 string, arg1 []string) (*sarama.LeaveGroupResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RemoveMemberFromConsumerGroup", arg0, arg1) @@ -830,13 +829,13 @@ func (m *MockClusterAdmin) RemoveMemberFromConsumerGroup(arg0 string, arg1 []str return ret0, ret1 } -// RemoveMemberFromConsumerGroup indicates an expected call of RemoveMemberFromConsumerGroup. +// RemoveMemberFromConsumerGroup indicates an expected call of RemoveMemberFromConsumerGroup func (mr *MockClusterAdminMockRecorder) RemoveMemberFromConsumerGroup(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveMemberFromConsumerGroup", reflect.TypeOf((*MockClusterAdmin)(nil).RemoveMemberFromConsumerGroup), arg0, arg1) } -// UpsertUserScramCredentials mocks base method. +// UpsertUserScramCredentials mocks base method func (m *MockClusterAdmin) UpsertUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpsertUserScramCredentials", arg0) @@ -845,7 +844,7 @@ func (m *MockClusterAdmin) UpsertUserScramCredentials(arg0 []sarama.AlterUserScr return ret0, ret1 } -// UpsertUserScramCredentials indicates an expected call of UpsertUserScramCredentials. +// UpsertUserScramCredentials indicates an expected call of UpsertUserScramCredentials func (mr *MockClusterAdminMockRecorder) UpsertUserScramCredentials(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertUserScramCredentials", reflect.TypeOf((*MockClusterAdmin)(nil).UpsertUserScramCredentials), arg0) diff --git a/mockstorage.go b/mockstorage.go index 5eb29bee..656f73e5 100644 --- a/mockstorage.go +++ b/mockstorage.go @@ -5,36 +5,36 @@ package goka import ( - reflect "reflect" - gomock "github.com/golang/mock/gomock" storage "github.com/lovoo/goka/storage" + io "io" + reflect "reflect" ) -// MockStorage is a mock of Storage interface. +// MockStorage is a mock of Storage interface type MockStorage struct { ctrl *gomock.Controller recorder *MockStorageMockRecorder } -// MockStorageMockRecorder is the mock recorder for MockStorage. +// MockStorageMockRecorder is the mock recorder for MockStorage type MockStorageMockRecorder struct { mock *MockStorage } -// NewMockStorage creates a new mock instance. +// NewMockStorage creates a new mock instance func NewMockStorage(ctrl *gomock.Controller) *MockStorage { mock := &MockStorage{ctrl: ctrl} mock.recorder = &MockStorageMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockStorage) EXPECT() *MockStorageMockRecorder { return m.recorder } -// Close mocks base method. +// Close mocks base method func (m *MockStorage) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -42,13 +42,13 @@ func (m *MockStorage) Close() error { return ret0 } -// Close indicates an expected call of Close. +// Close indicates an expected call of Close func (mr *MockStorageMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStorage)(nil).Close)) } -// Delete mocks base method. +// Delete mocks base method func (m *MockStorage) Delete(arg0 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Delete", arg0) @@ -56,13 +56,13 @@ func (m *MockStorage) Delete(arg0 string) error { return ret0 } -// Delete indicates an expected call of Delete. +// Delete indicates an expected call of Delete func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStorage)(nil).Delete), arg0) } -// Get mocks base method. +// Get mocks base method func (m *MockStorage) Get(arg0 string) ([]byte, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", arg0) @@ -71,13 +71,13 @@ func (m *MockStorage) Get(arg0 string) ([]byte, error) { return ret0, ret1 } -// Get indicates an expected call of Get. +// Get indicates an expected call of Get func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStorage)(nil).Get), arg0) } -// GetOffset mocks base method. +// GetOffset mocks base method func (m *MockStorage) GetOffset(arg0 int64) (int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetOffset", arg0) @@ -86,13 +86,29 @@ func (m *MockStorage) GetOffset(arg0 int64) (int64, error) { return ret0, ret1 } -// GetOffset indicates an expected call of GetOffset. +// GetOffset indicates an expected call of GetOffset func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffset", reflect.TypeOf((*MockStorage)(nil).GetOffset), arg0) } -// Has mocks base method. +// GetP mocks base method +func (m *MockStorage) GetP(arg0 string) ([]byte, io.Closer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetP", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(io.Closer) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetP indicates an expected call of GetP +func (mr *MockStorageMockRecorder) GetP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetP", reflect.TypeOf((*MockStorage)(nil).GetP), arg0) +} + +// Has mocks base method func (m *MockStorage) Has(arg0 string) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Has", arg0) @@ -101,13 +117,13 @@ func (m *MockStorage) Has(arg0 string) (bool, error) { return ret0, ret1 } -// Has indicates an expected call of Has. +// Has indicates an expected call of Has func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Has", reflect.TypeOf((*MockStorage)(nil).Has), arg0) } -// Iterator mocks base method. +// Iterator mocks base method func (m *MockStorage) Iterator() (storage.Iterator, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Iterator") @@ -116,13 +132,13 @@ func (m *MockStorage) Iterator() (storage.Iterator, error) { return ret0, ret1 } -// Iterator indicates an expected call of Iterator. +// Iterator indicates an expected call of Iterator func (mr *MockStorageMockRecorder) Iterator() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Iterator", reflect.TypeOf((*MockStorage)(nil).Iterator)) } -// IteratorWithRange mocks base method. +// IteratorWithRange mocks base method func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IteratorWithRange", arg0, arg1) @@ -131,13 +147,13 @@ func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, er return ret0, ret1 } -// IteratorWithRange indicates an expected call of IteratorWithRange. +// IteratorWithRange indicates an expected call of IteratorWithRange func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IteratorWithRange", reflect.TypeOf((*MockStorage)(nil).IteratorWithRange), arg0, arg1) } -// MarkRecovered mocks base method. +// MarkRecovered mocks base method func (m *MockStorage) MarkRecovered() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MarkRecovered") @@ -145,13 +161,13 @@ func (m *MockStorage) MarkRecovered() error { return ret0 } -// MarkRecovered indicates an expected call of MarkRecovered. +// MarkRecovered indicates an expected call of MarkRecovered func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkRecovered", reflect.TypeOf((*MockStorage)(nil).MarkRecovered)) } -// Open mocks base method. +// Open mocks base method func (m *MockStorage) Open() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Open") @@ -159,13 +175,13 @@ func (m *MockStorage) Open() error { return ret0 } -// Open indicates an expected call of Open. +// Open indicates an expected call of Open func (mr *MockStorageMockRecorder) Open() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockStorage)(nil).Open)) } -// Set mocks base method. +// Set mocks base method func (m *MockStorage) Set(arg0 string, arg1 []byte) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Set", arg0, arg1) @@ -173,13 +189,13 @@ func (m *MockStorage) Set(arg0 string, arg1 []byte) error { return ret0 } -// Set indicates an expected call of Set. +// Set indicates an expected call of Set func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockStorage)(nil).Set), arg0, arg1) } -// SetOffset mocks base method. +// SetOffset mocks base method func (m *MockStorage) SetOffset(arg0 int64) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SetOffset", arg0) @@ -187,7 +203,7 @@ func (m *MockStorage) SetOffset(arg0 int64) error { return ret0 } -// SetOffset indicates an expected call of SetOffset. +// SetOffset indicates an expected call of SetOffset func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetOffset", reflect.TypeOf((*MockStorage)(nil).SetOffset), arg0) diff --git a/options.go b/options.go index d93711e7..380d40f2 100644 --- a/options.go +++ b/options.go @@ -398,7 +398,7 @@ type ViewOption func(*voptions, Table, Codec) type voptions struct { log logger clientID string - tableCodec Codec + tableCodec CodecP updateCallback UpdateCallback hasher func() hash.Hash32 autoreconnect bool diff --git a/partition_processor.go b/partition_processor.go index c9b19afe..955de948 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -3,6 +3,7 @@ package goka import ( "context" "fmt" + "io" "strings" "sync" "sync/atomic" @@ -579,8 +580,9 @@ func (pp *PartitionProcessor) processMessage(ctx context.Context, wg *sync.WaitG } var ( - m interface{} - err error + m interface{} + err error + decodeCloser io.Closer ) // decide whether to decode or ignore message @@ -602,10 +604,11 @@ func (pp *PartitionProcessor) processMessage(ctx context.Context, wg *sync.WaitG } // decode message - m, err = codec.Decode(msg.value) + m, decodeCloser, err = codec.DecodeP(msg.value) if err != nil { return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.key, msg.topic, msg.partition, err) } + msgContext.addDeferred(decodeCloser.Close) } cb := pp.callbacks[msg.topic] diff --git a/partition_table.go b/partition_table.go index 48bc7b6b..417ee709 100644 --- a/partition_table.go +++ b/partition_table.go @@ -3,11 +3,13 @@ package goka import ( "context" "fmt" + "io" "sync" "time" "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -567,11 +569,11 @@ func (p *PartitionTable) WaitRecovered() <-chan struct{} { } // Get returns the value for passed key -func (p *PartitionTable) Get(key string) ([]byte, error) { +func (p *PartitionTable) Get(key string) ([]byte, io.Closer, error) { if err := p.readyToRead(); err != nil { - return nil, err + return nil, codec.NoopCloser, err } - return p.st.Get(key) + return p.st.GetP(key) } // Has returns whether the storage contains passed key diff --git a/storage/append.go b/storage/append.go deleted file mode 100644 index f76ffd21..00000000 --- a/storage/append.go +++ /dev/null @@ -1,89 +0,0 @@ -package storage - -import ( - "fmt" - "io" - "os" - "path/filepath" -) - -type file struct { - file io.WriteCloser - recovered bool - - bytesWritten int64 -} - -// NewFile retuns a new on-disk storage. -func NewFile(path string, part int32) (Storage, error) { - if err := os.MkdirAll(path, os.ModePerm); err != nil { - return nil, fmt.Errorf("error creating storage directory: %v", err) - } - - f, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("part-%d", part)), os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) - if err != nil { - return nil, err - } - - return &file{file: f}, nil -} - -func (f *file) Recovered() bool { - return f.recovered -} - -func (f *file) MarkRecovered() error { - f.recovered = true - return nil -} - -func (f *file) Has(key string) (bool, error) { - return false, nil -} - -func (f *file) Get(key string) ([]byte, error) { - return nil, nil -} - -func (f *file) Set(key string, val []byte) error { - num, err := f.file.Write(val) - if err != nil { - return err - } - - f.bytesWritten += int64(num) - - if _, err := f.file.Write([]byte("\n")); err != nil { - return err - } - - return nil -} - -func (f *file) Delete(string) error { - return nil -} - -func (f *file) GetOffset(def int64) (int64, error) { - return def, nil -} - -func (f *file) SetOffset(val int64) error { - return nil -} - -func (f *file) Iterator() (Iterator, error) { - return new(NullIter), nil -} - -func (f *file) IteratorWithRange(start, limit []byte) (Iterator, error) { - return new(NullIter), nil -} - -func (f *file) Open() error { - return nil -} - -func (f *file) Close() error { - return f.file.Close() -} diff --git a/storage/closer.go b/storage/closer.go new file mode 100644 index 00000000..369db18b --- /dev/null +++ b/storage/closer.go @@ -0,0 +1,7 @@ +package storage + +type nopCloser struct{} + +func (n *nopCloser) Close() error { return nil } + +var noopCloser = new(nopCloser) diff --git a/storage/memory.go b/storage/memory.go index 85eb8ffd..dcb1dbaa 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "fmt" + "io" "sort" "strings" "sync" @@ -104,6 +105,11 @@ func (m *memory) Get(key string) ([]byte, error) { return value, nil } +func (m *memory) GetP(key string) ([]byte, io.Closer, error) { + val, err := m.Get(key) + return val, noopCloser, err +} + func (m *memory) Set(key string, value []byte) error { m.Lock() defer m.Unlock() diff --git a/storage/null.go b/storage/null.go index c3d81b87..d6366ab5 100644 --- a/storage/null.go +++ b/storage/null.go @@ -1,5 +1,7 @@ package storage +import "io" + // Null storage discards everything that it is given. This can be useful for // debugging. type Null struct { @@ -31,6 +33,10 @@ func (n *Null) Get(key string) ([]byte, error) { return nil, nil } +func (n *Null) GetP(key string) ([]byte, io.Closer, error) { + return nil, noopCloser, nil +} + // Set will do nothing and doesn't error. func (n *Null) Set(key string, val []byte) error { return nil diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 48154295..cdf8725d 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -3,8 +3,10 @@ package redis import ( "errors" "fmt" + "io" "strconv" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/storage" redis "gopkg.in/redis.v5" @@ -51,6 +53,12 @@ func (s *redisStorage) Get(key string) ([]byte, error) { return value, nil } +func (s *redisStorage) GetP(key string) ([]byte, io.Closer, error) { + val, err := s.Get(key) + + return val, codec.NoopCloser, err +} + func (s *redisStorage) GetOffset(defValue int64) (int64, error) { data, err := s.Get(offsetKey) if err != nil { diff --git a/storage/storage.go b/storage/storage.go index 0816a1b1..622755fc 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "io" "strconv" "sync" "sync/atomic" @@ -57,6 +58,8 @@ type Storage interface { // exist, a nil will be returned. Get(key string) ([]byte, error) + GetP(key string) ([]byte, io.Closer, error) + // Set stores a key-value pair. Set(key string, value []byte) error @@ -138,6 +141,11 @@ func (s *storage) Get(key string) ([]byte, error) { return value, nil } +func (s *storage) GetP(key string) ([]byte, io.Closer, error) { + val, err := s.Get(key) + return val, noopCloser, err +} + func (s *storage) GetOffset(defValue int64) (int64, error) { // if we're recovered, read offset from the storage, otherwise // read our local copy, as it is probably newer diff --git a/view.go b/view.go index a614a219..a1c0537b 100644 --- a/view.go +++ b/view.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "io" "sync" "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -75,7 +77,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) if err != nil { return nil, fmt.Errorf("Error creating sarama consumer for brokers %+v: %v", brokers, err) } - opts.tableCodec = codec + opts.tableCodec = convertOrFakeCodec(codec) tmgr, err := opts.builders.topicmgr(brokers) if err != nil { @@ -330,6 +332,41 @@ func (v *View) Topic() string { // Get returns the value for the key in the view, if exists. Nil if it doesn't. // Get can be called by multiple goroutines concurrently. // Get can only be called after Recovered returns true. +func (v *View) GetP(key string) (interface{}, io.Closer, error) { + if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) { + return nil, codec.NoopCloser, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") + } + + // find partition where key is located + partTable, err := v.find(key) + if err != nil { + return nil, codec.NoopCloser, err + } + + // get key and return + data, storageCloser, err := partTable.Get(key) + if err != nil { + return nil, codec.NoopCloser, fmt.Errorf("error getting value (key %s): %v", key, err) + } + + if data == nil { + return nil, storageCloser, nil + } + + // decode value + value, codecCloser, err := v.opts.tableCodec.DecodeP(data) + + // drop the close-error, no way to handle that anyway + _ = storageCloser.Close() + + if err != nil { + return nil, codec.NoopCloser, fmt.Errorf("error decoding value (key %s): %v", key, err) + } + + // return value and the closer to close the codec pool + return value, codecCloser, nil +} + func (v *View) Get(key string) (interface{}, error) { if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) { return nil, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") @@ -342,14 +379,18 @@ func (v *View) Get(key string) (interface{}, error) { } // get key and return - data, err := partTable.Get(key) + data, storageCloser, err := partTable.Get(key) if err != nil { return nil, fmt.Errorf("error getting value (key %s): %v", key, err) - } else if data == nil { + } + + defer storageCloser.Close() + + if data == nil { return nil, nil } - // decode value + // decode value (not pooled) value, err := v.opts.tableCodec.Decode(data) if err != nil { return nil, fmt.Errorf("error decoding value (key %s): %v", key, err) diff --git a/view_test.go b/view_test.go index df4c5085..8a6a923c 100644 --- a/view_test.go +++ b/view_test.go @@ -69,7 +69,7 @@ func createTestView(t *testing.T, consumer sarama.Consumer) (*View, *builderMock viewTestRecoveredMessages = 0 opts := &voptions{ log: defaultLogger, - tableCodec: new(codec.String), + tableCodec: convertOrFakeCodec(&codec.String{}), updateCallback: func(ctx UpdateContext, s storage.Storage, key string, value []byte) error { if err := DefaultUpdate(ctx, s, key, value); err != nil { return err @@ -189,10 +189,10 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) - bm.mst.EXPECT().Get(key).Return([]byte(strconv.FormatInt(value, 10)), nil) + bm.mst.EXPECT().GetP(key).Return([]byte(strconv.FormatInt(value, 10)), codec.NoopCloser, nil) ret, err := view.Get(key) require.NoError(t, err) @@ -216,9 +216,9 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) - bm.mst.EXPECT().Get(key).Return(nil, nil) + bm.mst.EXPECT().GetP(key).Return(nil, codec.NoopCloser, nil) ret, err := view.Get(key) require.NoError(t, err) @@ -243,9 +243,9 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) - bm.mst.EXPECT().Get(key).Return(nil, errRet) + bm.mst.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errRet) _, err := view.Get(key) require.Error(t, err)