From c1f7942dd4d5c277529b8395700625b619407241 Mon Sep 17 00:00:00 2001 From: frairon Date: Sun, 7 Jan 2024 07:21:40 +0100 Subject: [PATCH] revert breaking API changes, wrap CodecP internally --- codec.go | 33 ++++++++++++++--- codec/closer.go | 6 ++-- codec/codec.go | 15 -------- examples/2-clicks/main.go | 6 ---- examples/3-messaging/blocker/blocker.go | 12 ------- examples/3-messaging/collector/collector.go | 7 ---- examples/3-messaging/detector/detector.go | 7 ---- examples/3-messaging/message.go | 12 ------- examples/5-multiple/main.go | 6 ---- examples/7-redis/codec.go | 8 ----- examples/8-monitoring/main.go | 6 ---- graph.go | 40 +++++++++++---------- graph_test.go | 2 +- integrationtest/processor_test.go | 5 --- iterator.go | 2 +- iterator_test.go | 2 +- options.go | 2 +- partition_table.go | 3 +- storage/closer.go | 2 +- storage/memory.go | 2 +- storage/null.go | 2 +- storage/redis/redis.go | 3 +- storage/storage.go | 2 +- view.go | 11 +++--- view_test.go | 8 ++--- 25 files changed, 74 insertions(+), 130 deletions(-) diff --git a/codec.go b/codec.go index d3d621c7..6ab1b2eb 100644 --- a/codec.go +++ b/codec.go @@ -1,17 +1,42 @@ package goka -import "io" +import ( + "io" + + "github.com/lovoo/goka/codec" +) // Codec decodes and encodes from and to []byte type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) +} + +type CodecP interface { + Codec + DecodeP(data []byte) (value interface{}, closer io.Closer, err error) } -// FuncCloser implements io.Closer-interface for convenience -type FuncCloser func() error +// CloserFunc implements io.Closer-interface for convenience when wrapping functions +type CloserFunc func() error -func (f FuncCloser) Close() error { +func (f CloserFunc) Close() error { return f() } + +type codecWrapper struct { + Codec +} + +func (cw *codecWrapper) DecodeP(data []byte) (value interface{}, closer io.Closer, err error) { + val, err := cw.Codec.Decode(data) + return val, codec.NoopCloser, err +} + +func convertOrFakeCodec(c Codec) CodecP { + if cp, ok := c.(CodecP); ok { + return cp + } + return &codecWrapper{Codec: c} +} diff --git a/codec/closer.go b/codec/closer.go index 62224e13..801fd845 100644 --- a/codec/closer.go +++ b/codec/closer.go @@ -1,11 +1,9 @@ package codec -type Closer interface { - Close() -} - type nullCloser struct{} func (n *nullCloser) Close() error { return nil } +// NoopCloser can be used for returning io.Closer interfaces, whose Close call does +// nothing. var NoopCloser = new(nullCloser) diff --git a/codec/codec.go b/codec/codec.go index bb815b16..23187a83 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -2,7 +2,6 @@ package codec import ( "fmt" - "io" "strconv" ) @@ -24,11 +23,6 @@ func (d *Bytes) Decode(data []byte) (interface{}, error) { return data, nil } -// Decode of defaultCodec simply returns the data -func (d *Bytes) DecodeP(data []byte) (interface{}, io.Closer, error) { - return data, NoopCloser, nil -} - // String is a commonly used codec to encode and decode string <-> []byte type String struct{} @@ -46,10 +40,6 @@ func (c *String) Decode(data []byte) (interface{}, error) { return string(data), nil } -func (c *String) DecodeP(data []byte) (interface{}, io.Closer, error) { - return string(data), NoopCloser, nil -} - // Int64 is a commonly used codec to encode and decode string <-> []byte type Int64 struct{} @@ -70,8 +60,3 @@ func (c *Int64) Decode(data []byte) (interface{}, error) { } return intVal, nil } - -func (c *Int64) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, NoopCloser, err -} diff --git a/examples/2-clicks/main.go b/examples/2-clicks/main.go index ba825759..51a0e888 100644 --- a/examples/2-clicks/main.go +++ b/examples/2-clicks/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "net/http" "time" @@ -59,11 +58,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) { return &c, nil } -func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := jc.Decode(data) - return dec, codec.NoopCloser, err -} - func runEmitter() { emitter, err := goka.NewEmitter(brokers, topic, new(codec.String)) diff --git a/examples/3-messaging/blocker/blocker.go b/examples/3-messaging/blocker/blocker.go index 5e172ab6..67c7ee92 100644 --- a/examples/3-messaging/blocker/blocker.go +++ b/examples/3-messaging/blocker/blocker.go @@ -3,10 +3,8 @@ package blocker import ( "context" "encoding/json" - "io" "github.com/lovoo/goka" - "github.com/lovoo/goka/codec" "github.com/lovoo/goka/examples/3-messaging/topicinit" ) @@ -31,11 +29,6 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) { return &m, json.Unmarshal(data, &m) } -func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} - type BlockValue struct { Blocked bool } @@ -50,11 +43,6 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) { return &m, json.Unmarshal(data, &m) } -func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} - func block(ctx goka.Context, msg interface{}) { var s *BlockValue if v := ctx.Value(); v == nil { diff --git a/examples/3-messaging/collector/collector.go b/examples/3-messaging/collector/collector.go index 594ac3ef..e6a68631 100644 --- a/examples/3-messaging/collector/collector.go +++ b/examples/3-messaging/collector/collector.go @@ -3,10 +3,8 @@ package collector import ( "context" "encoding/json" - "io" "github.com/lovoo/goka" - "github.com/lovoo/goka/codec" messaging "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/topicinit" ) @@ -30,11 +28,6 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) { return m, err } -func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} - func collect(ctx goka.Context, msg interface{}) { var ml []messaging.Message if v := ctx.Value(); v != nil { diff --git a/examples/3-messaging/detector/detector.go b/examples/3-messaging/detector/detector.go index 1cfd6089..b50642bd 100644 --- a/examples/3-messaging/detector/detector.go +++ b/examples/3-messaging/detector/detector.go @@ -3,10 +3,8 @@ package detector import ( "context" "encoding/json" - "io" "github.com/lovoo/goka" - "github.com/lovoo/goka/codec" messaging "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/blocker" "github.com/lovoo/goka/examples/3-messaging/topicinit" @@ -35,11 +33,6 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) { return &m, json.Unmarshal(data, &m) } -func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} - func getValue(ctx goka.Context) *Counters { if v := ctx.Value(); v != nil { return v.(*Counters) diff --git a/examples/3-messaging/message.go b/examples/3-messaging/message.go index 81e48f07..17c61a5f 100644 --- a/examples/3-messaging/message.go +++ b/examples/3-messaging/message.go @@ -2,10 +2,8 @@ package messaging import ( "encoding/json" - "io" "github.com/lovoo/goka" - "github.com/lovoo/goka/codec" ) var ( @@ -30,11 +28,6 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) { return &m, json.Unmarshal(data, &m) } -func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} - type MessageListCodec struct{} func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) { @@ -46,8 +39,3 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) { err := json.Unmarshal(data, &m) return m, err } - -func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} diff --git a/examples/5-multiple/main.go b/examples/5-multiple/main.go index 0ed33634..0f994a4b 100644 --- a/examples/5-multiple/main.go +++ b/examples/5-multiple/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "net/http" "net/http/pprof" @@ -68,11 +67,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) { return &c, nil } -func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := jc.Decode(data) - return dec, codec.NoopCloser, err -} - func runEmitter(ctx context.Context) (rerr error) { emitterA, err := goka.NewEmitter(brokers, inputA, new(codec.String)) if err != nil { diff --git a/examples/7-redis/codec.go b/examples/7-redis/codec.go index db2119a9..7ce9237b 100644 --- a/examples/7-redis/codec.go +++ b/examples/7-redis/codec.go @@ -2,9 +2,6 @@ package main import ( "encoding/json" - "io" - - "github.com/lovoo/goka/codec" ) type Codec struct{} @@ -21,8 +18,3 @@ func (c *Codec) Decode(data []byte) (interface{}, error) { err := json.Unmarshal(data, event) return event, err } - -func (c *Codec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := c.Decode(data) - return dec, codec.NoopCloser, err -} diff --git a/examples/8-monitoring/main.go b/examples/8-monitoring/main.go index f2605ae9..744603f4 100644 --- a/examples/8-monitoring/main.go +++ b/examples/8-monitoring/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "net/http" "net/http/pprof" @@ -64,11 +63,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) { return &c, nil } -func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) { - dec, err := jc.Decode(data) - return dec, codec.NoopCloser, err -} - func runEmitter(ctx context.Context) (rerr error) { emitter, err := goka.NewEmitter(brokers, topic, new(codec.String)) diff --git a/graph.go b/graph.go index 4184b6af..ccbb89c1 100644 --- a/graph.go +++ b/graph.go @@ -67,7 +67,7 @@ type GroupGraph struct { visitors []Edge // those fields cache the info from above edges or are used to avoid naming/codec collisions - codecs map[string]Codec + codecs map[string]CodecP callbacks map[string]ProcessCallback outputStreamTopics map[Stream]struct{} @@ -121,11 +121,11 @@ func (gg *GroupGraph) OutputStreams() Edges { // AllEdges returns a list of all edges for the group graph. // This allows to modify a graph by cloning it's edges into a new one. // -// var existing Graph -// edges := existiting.AllEdges() -// // modify edges as required -// // recreate the modifiedg raph -// newGraph := DefineGroup(existing.Groug(), edges...) +// var existing Graph +// edges := existiting.AllEdges() +// // modify edges as required +// // recreate the modifiedg raph +// newGraph := DefineGroup(existing.Groug(), edges...) func (gg *GroupGraph) AllEdges() Edges { return chainEdges( gg.inputTables, @@ -153,7 +153,7 @@ func (gg *GroupGraph) copartitioned() Edges { return chainEdges(gg.inputStreams, gg.inputTables) } -func (gg *GroupGraph) codec(topic string) Codec { +func (gg *GroupGraph) codec(topic string) CodecP { return gg.codecs[topic] } @@ -170,7 +170,7 @@ func (gg *GroupGraph) joint(topic string) bool { func DefineGroup(group Group, edges ...Edge) *GroupGraph { gg := GroupGraph{ group: string(group), - codecs: make(map[string]Codec), + codecs: make(map[string]CodecP), callbacks: make(map[string]ProcessCallback), joinCheck: make(map[string]bool), outputStreamTopics: make(map[Stream]struct{}), @@ -264,7 +264,7 @@ func (gg *GroupGraph) Validate() error { type Edge interface { String() string Topic() string - Codec() Codec + Codec() CodecP } // Edges is a slice of edge objects. @@ -296,7 +296,7 @@ func (e Edges) Topics() []string { type topicDef struct { name string - codec Codec + codec CodecP } func (t *topicDef) Topic() string { @@ -307,7 +307,7 @@ func (t *topicDef) String() string { return fmt.Sprintf("%s/%T", t.name, t.codec) } -func (t *topicDef) Codec() Codec { +func (t *topicDef) Codec() CodecP { return t.codec } @@ -322,7 +322,7 @@ type inputStream struct { // the group and with the group table. // The group starts reading the topic from the newest offset. func Input(topic Stream, c Codec, cb ProcessCallback) Edge { - return &inputStream{&topicDef{string(topic), c}, cb} + return &inputStream{&topicDef{string(topic), convertOrFakeCodec(c)}, cb} } type inputStreams Edges @@ -347,7 +347,7 @@ func (is inputStreams) Topic() string { return strings.Join(topics, ",") } -func (is inputStreams) Codec() Codec { +func (is inputStreams) Codec() CodecP { if is == nil { return nil } @@ -375,9 +375,11 @@ type visitor struct { func (m *visitor) Topic() string { return m.name } -func (m *visitor) Codec() Codec { + +func (m *visitor) Codec() CodecP { return nil } + func (m *visitor) String() string { return fmt.Sprintf("visitor %s", m.name) } @@ -399,7 +401,7 @@ type loopStream inputStream // process the messages of the topic. Context.Loopback() is used to write // messages into this topic from any callback of the group. func Loop(c Codec, cb ProcessCallback) Edge { - return &loopStream{&topicDef{codec: c}, cb} + return &loopStream{&topicDef{codec: convertOrFakeCodec(c)}, cb} } func (s *loopStream) setGroup(group Group) { @@ -416,7 +418,7 @@ type inputTable struct { // The processing of input streams is blocked until all partitions of the table // are recovered. func Join(topic Table, c Codec) Edge { - return &inputTable{&topicDef{string(topic), c}} + return &inputTable{&topicDef{string(topic), convertOrFakeCodec(c)}} } type crossTable struct { @@ -429,7 +431,7 @@ type crossTable struct { // The processing of input streams is blocked until the table is fully // recovered. func Lookup(topic Table, c Codec) Edge { - return &crossTable{&topicDef{string(topic), c}} + return &crossTable{&topicDef{string(topic), convertOrFakeCodec(c)}} } type groupTable struct { @@ -448,7 +450,7 @@ type groupTable struct { // // The topic name is derived from the group name by appending "-table". func Persist(c Codec) Edge { - return &groupTable{&topicDef{codec: c}} + return &groupTable{&topicDef{codec: convertOrFakeCodec(c)}} } func (t *groupTable) setGroup(group Group) { @@ -465,7 +467,7 @@ type outputStream struct { // graph. // The topic does not have to be copartitioned with the input streams. func Output(topic Stream, c Codec) Edge { - return &outputStream{&topicDef{string(topic), c}} + return &outputStream{&topicDef{string(topic), convertOrFakeCodec(c)}} } // GroupTable returns the name of the group table of group. diff --git a/graph_test.go b/graph_test.go index bf98228a..3bbc6ef2 100644 --- a/graph_test.go +++ b/graph_test.go @@ -11,7 +11,7 @@ import ( ) var ( - c = new(codec.String) + c = convertOrFakeCodec(new(codec.String)) cb = func(ctx Context, msg interface{}) {} ) diff --git a/integrationtest/processor_test.go b/integrationtest/processor_test.go index 9c3c4f1c..c2c020d0 100644 --- a/integrationtest/processor_test.go +++ b/integrationtest/processor_test.go @@ -3,7 +3,6 @@ package integrationtest import ( "context" "fmt" - "io" "strings" "testing" "time" @@ -23,10 +22,6 @@ func (fc *failingDecode) Decode(_ []byte) (interface{}, error) { return nil, fmt.Errorf("Error decoding") } -func (fc *failingDecode) DecodeP(_ []byte) (interface{}, io.Closer, error) { - return nil, codec.NoopCloser, fmt.Errorf("Error decoding") -} - func (fc *failingDecode) Encode(msg interface{}) ([]byte, error) { return fc.codec.Encode(msg) } diff --git a/iterator.go b/iterator.go index 26fcada2..b814d0e8 100644 --- a/iterator.go +++ b/iterator.go @@ -35,7 +35,7 @@ type iterator struct { deferreds []func() error deferredErrs error - codec Codec + codec CodecP } // Next advances the iterator to the next key. diff --git a/iterator_test.go b/iterator_test.go index ea94a652..48cba55e 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -37,7 +37,7 @@ func TestIterator(t *testing.T) { it := &iterator{ iter: storage.NewMultiIterator([]storage.Iterator{iter}), - codec: new(codec.String), + codec: convertOrFakeCodec(&codec.String{}), } defer it.Release() count := 0 diff --git a/options.go b/options.go index d93711e7..380d40f2 100644 --- a/options.go +++ b/options.go @@ -398,7 +398,7 @@ type ViewOption func(*voptions, Table, Codec) type voptions struct { log logger clientID string - tableCodec Codec + tableCodec CodecP updateCallback UpdateCallback hasher func() hash.Hash32 autoreconnect bool diff --git a/partition_table.go b/partition_table.go index 2ce23680..417ee709 100644 --- a/partition_table.go +++ b/partition_table.go @@ -9,6 +9,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -570,7 +571,7 @@ func (p *PartitionTable) WaitRecovered() <-chan struct{} { // Get returns the value for passed key func (p *PartitionTable) Get(key string) ([]byte, io.Closer, error) { if err := p.readyToRead(); err != nil { - return nil, storage.NoopCloser, err + return nil, codec.NoopCloser, err } return p.st.GetP(key) } diff --git a/storage/closer.go b/storage/closer.go index 0e578f8a..369db18b 100644 --- a/storage/closer.go +++ b/storage/closer.go @@ -4,4 +4,4 @@ type nopCloser struct{} func (n *nopCloser) Close() error { return nil } -var NoopCloser = new(nopCloser) +var noopCloser = new(nopCloser) diff --git a/storage/memory.go b/storage/memory.go index a54d89d2..dcb1dbaa 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -107,7 +107,7 @@ func (m *memory) Get(key string) ([]byte, error) { func (m *memory) GetP(key string) ([]byte, io.Closer, error) { val, err := m.Get(key) - return val, NoopCloser, err + return val, noopCloser, err } func (m *memory) Set(key string, value []byte) error { diff --git a/storage/null.go b/storage/null.go index f84e99d2..d6366ab5 100644 --- a/storage/null.go +++ b/storage/null.go @@ -34,7 +34,7 @@ func (n *Null) Get(key string) ([]byte, error) { } func (n *Null) GetP(key string) ([]byte, io.Closer, error) { - return nil, NoopCloser, nil + return nil, noopCloser, nil } // Set will do nothing and doesn't error. diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 9af53569..cdf8725d 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -6,6 +6,7 @@ import ( "io" "strconv" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/storage" redis "gopkg.in/redis.v5" @@ -55,7 +56,7 @@ func (s *redisStorage) Get(key string) ([]byte, error) { func (s *redisStorage) GetP(key string) ([]byte, io.Closer, error) { val, err := s.Get(key) - return val, storage.NoopCloser, err + return val, codec.NoopCloser, err } func (s *redisStorage) GetOffset(defValue int64) (int64, error) { diff --git a/storage/storage.go b/storage/storage.go index 19ce7f51..622755fc 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -143,7 +143,7 @@ func (s *storage) Get(key string) ([]byte, error) { func (s *storage) GetP(key string) ([]byte, io.Closer, error) { val, err := s.Get(key) - return val, NoopCloser, err + return val, noopCloser, err } func (s *storage) GetOffset(defValue int64) (int64, error) { diff --git a/view.go b/view.go index 4c35e51b..a1c0537b 100644 --- a/view.go +++ b/view.go @@ -9,6 +9,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -76,7 +77,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) if err != nil { return nil, fmt.Errorf("Error creating sarama consumer for brokers %+v: %v", brokers, err) } - opts.tableCodec = codec + opts.tableCodec = convertOrFakeCodec(codec) tmgr, err := opts.builders.topicmgr(brokers) if err != nil { @@ -333,19 +334,19 @@ func (v *View) Topic() string { // Get can only be called after Recovered returns true. func (v *View) GetP(key string) (interface{}, io.Closer, error) { if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) { - return nil, storage.NoopCloser, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") + return nil, codec.NoopCloser, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") } // find partition where key is located partTable, err := v.find(key) if err != nil { - return nil, storage.NoopCloser, err + return nil, codec.NoopCloser, err } // get key and return data, storageCloser, err := partTable.Get(key) if err != nil { - return nil, storage.NoopCloser, fmt.Errorf("error getting value (key %s): %v", key, err) + return nil, codec.NoopCloser, fmt.Errorf("error getting value (key %s): %v", key, err) } if data == nil { @@ -359,7 +360,7 @@ func (v *View) GetP(key string) (interface{}, io.Closer, error) { _ = storageCloser.Close() if err != nil { - return nil, storage.NoopCloser, fmt.Errorf("error decoding value (key %s): %v", key, err) + return nil, codec.NoopCloser, fmt.Errorf("error decoding value (key %s): %v", key, err) } // return value and the closer to close the codec pool diff --git a/view_test.go b/view_test.go index cd70e253..8a6a923c 100644 --- a/view_test.go +++ b/view_test.go @@ -69,7 +69,7 @@ func createTestView(t *testing.T, consumer sarama.Consumer) (*View, *builderMock viewTestRecoveredMessages = 0 opts := &voptions{ log: defaultLogger, - tableCodec: new(codec.String), + tableCodec: convertOrFakeCodec(&codec.String{}), updateCallback: func(ctx UpdateContext, s storage.Storage, key string, value []byte) error { if err := DefaultUpdate(ctx, s, key, value); err != nil { return err @@ -189,7 +189,7 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) bm.mst.EXPECT().GetP(key).Return([]byte(strconv.FormatInt(value, 10)), codec.NoopCloser, nil) @@ -216,7 +216,7 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) bm.mst.EXPECT().GetP(key).Return(nil, codec.NoopCloser, nil) @@ -243,7 +243,7 @@ func TestView_Get(t *testing.T) { state: newPartitionTableState().SetState(State(PartitionRunning)), }, } - view.opts.tableCodec = &codec.Int64{} + view.opts.tableCodec = convertOrFakeCodec(&codec.Int64{}) view.state.SetState(State(ViewStateRunning)) bm.mst.EXPECT().GetP(key).Return(nil, codec.NoopCloser, errRet)