From 7e3b913af8339a9c55c93650884ea67f334b95cb Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Sun, 6 Aug 2023 17:15:35 -0400 Subject: [PATCH 1/8] Added mqtt destination --- cmd/kawa/config.go | 31 ++++++ examples/mqtt_config.json | 28 ++++++ go.mod | 7 +- go.sum | 11 +++ internal/destinations/mqtt/mqtt.go | 149 +++++++++++++++++++++++++++++ 5 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 examples/mqtt_config.json create mode 100644 internal/destinations/mqtt/mqtt.go diff --git a/cmd/kawa/config.go b/cmd/kawa/config.go index 4e62964..b0715f8 100644 --- a/cmd/kawa/config.go +++ b/cmd/kawa/config.go @@ -5,6 +5,7 @@ import ( "github.com/runreveal/kawa" "github.com/runreveal/kawa/internal/destinations" + "github.com/runreveal/kawa/internal/destinations/mqtt" "github.com/runreveal/kawa/internal/destinations/runreveal" s3 "github.com/runreveal/kawa/internal/destinations/s3" "github.com/runreveal/kawa/internal/sources" @@ -39,6 +40,9 @@ func init() { loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] { return &RunRevealConfig{} }) + loader.Register("mqtt", func() loader.Builder[kawa.Destination[types.Event]] { + return &MQTTConfig{} + }) } type ScannerConfig struct { @@ -81,6 +85,33 @@ func (c *RunRevealConfig) Configure() (kawa.Destination[types.Event], error) { ), nil } +type MQTTConfig struct { + Broker string `json:"broker"` + ClientID string `json:"clientID"` + Topic string `json:"topic"` + + UserName string `json:"userName"` + Password string `json:"password"` + + QOS byte `json:"qos"` + Retained bool `json:"retained"` + BatchSize int `json:"batchSize"` +} + +func (c *MQTTConfig) Configure() (kawa.Destination[types.Event], error) { + slog.Info("configuring mqtt") + return mqtt.New( + mqtt.WithBroker(c.Broker), + mqtt.WithClientID(c.ClientID), + mqtt.WithQOS(c.QOS), + mqtt.WithBatchSize(c.BatchSize), + mqtt.WithTopic(c.Topic), + mqtt.WithRetained(c.Retained), + mqtt.WithUserName(c.UserName), + mqtt.WithPassword(c.Password), + ), nil +} + type S3Config struct { BucketName string `json:"bucketName"` PathPrefix string `json:"pathPrefix"` diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json new file mode 100644 index 0000000..966544d --- /dev/null +++ b/examples/mqtt_config.json @@ -0,0 +1,28 @@ +{ + // this is an example config file for kawa + // it is parsed using hujson so you can use comments and trailing commas, but + // is otherwise identical to JSON + "sources": [ + { + "type": "syslog", + "addr": "0.0.0.0:5514", + // content-type tells the source how to parse logs received on this + // instance of syslog. We may explore using the syslog tag to indicate + // the schema as well down the line. + // "contentType": "application/json; rrtype=nginx-json", + }, + ], + "destinations": [ + { + "type": "mqtt", + "broker": "mqtt://mqtt.example.com:1883", + "clientID": "kawa", + "userName": "", + "password": "", + "topic": "kawa", + + } + ], + } + + \ No newline at end of file diff --git a/go.mod b/go.mod index 8bc5e65..bfc28e9 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -27,7 +29,8 @@ require ( github.com/tidwall/gjson v1.14.4 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect - golang.org/x/net v0.7.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.11.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 47970e1..cad5712 100644 --- a/go.sum +++ b/go.sum @@ -6,7 +6,11 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -55,8 +59,12 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -66,6 +74,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -74,6 +84,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go new file mode 100644 index 0000000..d5691c1 --- /dev/null +++ b/internal/destinations/mqtt/mqtt.go @@ -0,0 +1,149 @@ +package mqtt + +import ( + "context" + "encoding/json" + "errors" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + batch "github.com/runreveal/kawa/x/batcher" +) + +type Option func(*mqtt) + +func WithBroker(broker string) Option { + return func(m *mqtt) { + m.broker = broker + } +} + +func WithClientID(clientID string) Option { + return func(m *mqtt) { + m.clientID = clientID + } +} + +func WithTopic(topic string) Option { + return func(m *mqtt) { + m.topic = topic + } +} + +func WithQOS(qos byte) Option { + return func(m *mqtt) { + m.qos = qos + } +} + +func WithRetained(retained bool) Option { + return func(m *mqtt) { + m.retained = retained + } +} + +func WithBatchSize(batchSize int) Option { + return func(m *mqtt) { + m.batchSize = batchSize + } +} + +func WithUserName(userName string) Option { + return func(m *mqtt) { + m.userName = userName + } +} + +func WithPassword(password string) Option { + return func(m *mqtt) { + m.password = password + } +} + +type mqtt struct { + batcher *batch.Destination[types.Event] + + broker string + clientID string + topic string + + userName string + password string + + qos byte + retained bool + + client MQTT.Client + + batchSize int +} + +func New(opts ...Option) *mqtt { + ret := &mqtt{} + for _, o := range opts { + o(ret) + } + if ret.topic == "" { + ret.topic = "#" + } + if ret.qos == 0 { + ret.qos = 1 + } + if ret.batchSize == 0 { + ret.batchSize = 100 + } + + ret.batcher = batch.NewDestination[types.Event](ret, + batch.FlushLength(ret.batchSize), + batch.FlushFrequency(5*time.Second), + ) + return ret +} + +func (m *mqtt) Run(ctx context.Context) error { + if m.broker == "" { + return errors.New("missing broker") + } + if m.clientID == "" { + return errors.New("missing clientID") + } + + return m.batcher.Run(ctx) +} + +func (m *mqtt) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { + return m.batcher.Send(ctx, ack, msgs...) +} + +// Flush sends the given messages of type kawa.Message[type.Event] to an MQTT topic +func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { + opts := MQTT.NewClientOptions().AddBroker(m.broker).SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) + client := MQTT.NewClient(opts) + // m.client = client + + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + defer client.Disconnect(250) + + for _, msg := range msgs { + jsonData, err := json.Marshal(msg.Value) + if err != nil { + return err + } + + sendTopic := m.topic + if msg.Topic != "" { + sendTopic = msg.Topic + } + + token := client.Publish(sendTopic, m.qos, m.retained, jsonData) + token.Wait() + if token.Error() != nil { + return token.Error() + } + } + return nil +} From 773b6b221c03761f0712c6d46d2b70226b7c9dc9 Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Sun, 6 Aug 2023 20:46:40 -0400 Subject: [PATCH 2/8] Added mqtt source --- cmd/kawa/config.go | 31 +++++ examples/mqtt_config.json | 36 +++--- internal/destinations/mqtt/mqtt.go | 10 +- internal/sources/mqtt/mqtt.go | 189 +++++++++++++++++++++++++++++ 4 files changed, 242 insertions(+), 24 deletions(-) create mode 100644 internal/sources/mqtt/mqtt.go diff --git a/cmd/kawa/config.go b/cmd/kawa/config.go index b0715f8..f531abe 100644 --- a/cmd/kawa/config.go +++ b/cmd/kawa/config.go @@ -10,6 +10,7 @@ import ( s3 "github.com/runreveal/kawa/internal/destinations/s3" "github.com/runreveal/kawa/internal/sources" "github.com/runreveal/kawa/internal/sources/journald" + mqttsrc "github.com/runreveal/kawa/internal/sources/mqtt" "github.com/runreveal/kawa/internal/sources/syslog" "github.com/runreveal/kawa/internal/types" "github.com/runreveal/lib/loader" @@ -30,6 +31,9 @@ func init() { loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] { return &JournaldConfig{} }) + loader.Register("mqtt", func() loader.Builder[kawa.Source[types.Event]] { + return &MQTTSrcConfig{} + }) loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} @@ -112,6 +116,33 @@ func (c *MQTTConfig) Configure() (kawa.Destination[types.Event], error) { ), nil } +type MQTTSrcConfig struct { + Broker string `json:"broker"` + ClientID string `json:"clientID"` + Topic string `json:"topic"` + + UserName string `json:"userName"` + Password string `json:"password"` + + QOS byte `json:"qos"` + Retained bool `json:"retained"` + BatchSize int `json:"batchSize"` +} + +func (c *MQTTSrcConfig) Configure() (kawa.Source[types.Event], error) { + slog.Info("configuring mqtt") + return mqttsrc.New( + mqttsrc.WithBroker(c.Broker), + mqttsrc.WithClientID(c.ClientID), + mqttsrc.WithQOS(c.QOS), + mqttsrc.WithBatchSize(c.BatchSize), + mqttsrc.WithTopic(c.Topic), + mqttsrc.WithRetained(c.Retained), + mqttsrc.WithUserName(c.UserName), + mqttsrc.WithPassword(c.Password), + ), nil +} + type S3Config struct { BucketName string `json:"bucketName"` PathPrefix string `json:"pathPrefix"` diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json index 966544d..c35caba 100644 --- a/examples/mqtt_config.json +++ b/examples/mqtt_config.json @@ -3,25 +3,27 @@ // it is parsed using hujson so you can use comments and trailing commas, but // is otherwise identical to JSON "sources": [ - { - "type": "syslog", - "addr": "0.0.0.0:5514", - // content-type tells the source how to parse logs received on this - // instance of syslog. We may explore using the syslog tag to indicate - // the schema as well down the line. - // "contentType": "application/json; rrtype=nginx-json", - }, + { + "type": "mqtt", + "broker": "mqtt://broker.mqtt:1883", + "clientID": "kawa_src", + "userName": "", + "password": "", + "topic": "iot/#", + }, ], "destinations": [ - { - "type": "mqtt", - "broker": "mqtt://mqtt.example.com:1883", - "clientID": "kawa", - "userName": "", - "password": "", - "topic": "kawa", - - } + { + "type": "mqtt", + "broker": "mqtt://broker.mqtt:1883", + "clientID": "kawa_dst", + "userName": "", + "password": "", + "topic": "kawa/dest", + }, + { + "type":"printer" + } ], } diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go index d5691c1..02aaaaa 100644 --- a/internal/destinations/mqtt/mqtt.go +++ b/internal/destinations/mqtt/mqtt.go @@ -119,7 +119,8 @@ func (m *mqtt) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types. // Flush sends the given messages of type kawa.Message[type.Event] to an MQTT topic func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { - opts := MQTT.NewClientOptions().AddBroker(m.broker).SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) + opts := MQTT.NewClientOptions().AddBroker(m.broker). + SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) client := MQTT.NewClient(opts) // m.client = client @@ -134,12 +135,7 @@ func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) erro return err } - sendTopic := m.topic - if msg.Topic != "" { - sendTopic = msg.Topic - } - - token := client.Publish(sendTopic, m.qos, m.retained, jsonData) + token := client.Publish(m.topic, m.qos, m.retained, jsonData) token.Wait() if token.Error() != nil { return token.Error() diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go new file mode 100644 index 0000000..cf3284c --- /dev/null +++ b/internal/sources/mqtt/mqtt.go @@ -0,0 +1,189 @@ +package mqtt + +import ( + "context" + "encoding/json" + "fmt" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + "golang.org/x/exp/slog" +) + +type mqttMsg struct { + Payload string `json:"payload"` + Topic string `json:"topic"` + Duplicate bool `json:"duplicate"` + MessageID uint16 `json:"messageID"` + QOS byte `json:"qos"` + Retained bool `json:"retained"` +} + +type mqtt struct { + msgC chan msgAck + + broker string + clientID string + topic string + + userName string + password string + + qos byte + retained bool + + client MQTT.Client + + batchSize int +} + +type msgAck struct { + msg kawa.Message[types.Event] + ack func() +} + +type Option func(*mqtt) + +func WithBroker(broker string) Option { + return func(m *mqtt) { + m.broker = broker + } +} + +func WithClientID(clientID string) Option { + return func(m *mqtt) { + m.clientID = clientID + } +} + +func WithTopic(topic string) Option { + return func(m *mqtt) { + m.topic = topic + } +} + +func WithQOS(qos byte) Option { + return func(m *mqtt) { + m.qos = qos + } +} + +func WithRetained(retained bool) Option { + return func(m *mqtt) { + m.retained = retained + } +} + +func WithBatchSize(batchSize int) Option { + return func(m *mqtt) { + m.batchSize = batchSize + } +} + +func WithUserName(userName string) Option { + return func(m *mqtt) { + m.userName = userName + } +} + +func WithPassword(password string) Option { + return func(m *mqtt) { + m.password = password + } +} + +func New(opts ...Option) *mqtt { + ret := &mqtt{ + msgC: make(chan msgAck), + } + + for _, o := range opts { + o(ret) + } + if ret.topic == "" { + ret.topic = "#" + } + if ret.qos == 0 { + ret.qos = 1 + } + + return ret +} + +func (m *mqtt) Run(ctx context.Context) error { + return m.recvLoop(ctx) +} + +func (m *mqtt) recvLoop(ctx context.Context) error { + // Open file to check and save high watermark + opts := MQTT.NewClientOptions().AddBroker(m.broker). + SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) + //SetAutoReconnect(true).SetConnectRetry(true) + + client := MQTT.NewClient(opts) + + newMessage := func(client MQTT.Client, message MQTT.Message) { + rawMsg := mqttMsg{ + Payload: string(message.Payload()), + Topic: message.Topic(), + Duplicate: message.Duplicate(), + MessageID: message.MessageID(), + QOS: message.Qos(), + Retained: message.Retained(), + } + + rawMsgBts, err := json.Marshal(rawMsg) + if err != nil { + slog.Error(fmt.Sprintf("unmarshaling %+v", err)) + } + + select { + case m.msgC <- msgAck{ + msg: kawa.Message[types.Event]{ + Value: types.Event{ + Timestamp: time.Now().UTC(), + SourceType: "mqtt", + RawLog: rawMsgBts, + }, + Key: fmt.Sprintf("%d", rawMsg.MessageID), + Topic: rawMsg.Topic, + }, + ack: message.Ack, + }: + case <-ctx.Done(): + return + } + } + + if token := client.Connect(); token.Wait() && token.Error() != nil { + return fmt.Errorf("mqtt connect error: %s", token.Error()) + } + + token := client.Subscribe(m.topic, m.qos, newMessage) + token.Wait() + if token.Error() != nil { + return fmt.Errorf("mqtt subscribe error: %s", token.Error()) + } + + defer client.Unsubscribe(m.topic) + defer client.Disconnect(250) + + for { + select { + case <-time.After(60 * time.Second): + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (s *mqtt) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { + select { + case <-ctx.Done(): + return kawa.Message[types.Event]{}, nil, ctx.Err() + case pass := <-s.msgC: + return pass.msg, pass.ack, nil + } +} From a45899b27ca8150d24b7ef19fb78af82b2ac120c Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Sun, 6 Aug 2023 21:02:50 -0400 Subject: [PATCH 3/8] Remove unused fields --- internal/destinations/mqtt/mqtt.go | 2 -- internal/sources/mqtt/mqtt.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go index 02aaaaa..5dbf70d 100644 --- a/internal/destinations/mqtt/mqtt.go +++ b/internal/destinations/mqtt/mqtt.go @@ -75,8 +75,6 @@ type mqtt struct { qos byte retained bool - client MQTT.Client - batchSize int } diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go index cf3284c..92b1d0c 100644 --- a/internal/sources/mqtt/mqtt.go +++ b/internal/sources/mqtt/mqtt.go @@ -34,8 +34,6 @@ type mqtt struct { qos byte retained bool - client MQTT.Client - batchSize int } From fde5985da50ab6a85f1383b3e9925b0380fa9e37 Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Mon, 7 Aug 2023 21:02:55 -0400 Subject: [PATCH 4/8] Updated config and readme --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index 7ebe8b2..62f353d 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,26 @@ Read from stdin. Useful for testing or doing something you probably shouldn't. } ``` +## MQTT +MQTT will listen on the supplied topic for new events. + +broker, clientID, and topic are required to receive data. +clientID must be unique from any other mqtt destinations or sources + +``` +{ + "type": "mqtt", + "broker": "mqtt://broker.mqtt:1883", + "clientID": "kawa_src", + "userName": "", + "password": "", + "topic": "kawa/src", + + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included +} +``` + # Destination Configuration ## RunReveal @@ -152,6 +172,27 @@ Printer will print the results to stdout. Useful for testing and development. } ``` +## MQTT +MQTT will send events to the supplied topic. + +broker, clientID, and topic are required to send data. +clientID must be unique from any other mqtt destinations or sources + +``` +{ + "type": "mqtt", + "broker": "mqtt://broker.mqtt:1883", + "clientID": "kawa_dst", + "userName": "", + "password": "", + "topic": "kawa/dest", + + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included + "batchSize": 100, // Optional defaults to 100 if not included +} +``` + # Source / Destination Wishlist - Kafka - redis From fd5379929b392348744e3524716c37677e67c8de Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Mon, 7 Aug 2023 21:04:00 -0400 Subject: [PATCH 5/8] Updated config and readme --- README.md | 8 ++++++-- cmd/kawa/config.go | 1 - examples/mqtt_config.json | 7 +++++++ internal/destinations/mqtt/mqtt.go | 16 ++++++---------- internal/sources/mqtt/mqtt.go | 19 ++++--------------- 5 files changed, 23 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 62f353d..12722e7 100644 --- a/README.md +++ b/README.md @@ -120,8 +120,11 @@ Read from stdin. Useful for testing or doing something you probably shouldn't. ## MQTT MQTT will listen on the supplied topic for new events. -broker, clientID, and topic are required to receive data. +broker and clientID are required to receive data. clientID must be unique from any other mqtt destinations or sources +If topic is not supplied, it will default to the wildcard `#`. + +Do not read events from the same topic that an MQTT destination is sending to otherwise kawa will create an infinite loop and eventually crash. ``` { @@ -175,8 +178,9 @@ Printer will print the results to stdout. Useful for testing and development. ## MQTT MQTT will send events to the supplied topic. -broker, clientID, and topic are required to send data. +broker and clientID are required to send data. clientID must be unique from any other mqtt destinations or sources +If topic is not supplied, it will default to the wildcard `#`. ``` { diff --git a/cmd/kawa/config.go b/cmd/kawa/config.go index f531abe..f9da18a 100644 --- a/cmd/kawa/config.go +++ b/cmd/kawa/config.go @@ -135,7 +135,6 @@ func (c *MQTTSrcConfig) Configure() (kawa.Source[types.Event], error) { mqttsrc.WithBroker(c.Broker), mqttsrc.WithClientID(c.ClientID), mqttsrc.WithQOS(c.QOS), - mqttsrc.WithBatchSize(c.BatchSize), mqttsrc.WithTopic(c.Topic), mqttsrc.WithRetained(c.Retained), mqttsrc.WithUserName(c.UserName), diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json index c35caba..956b38c 100644 --- a/examples/mqtt_config.json +++ b/examples/mqtt_config.json @@ -10,6 +10,9 @@ "userName": "", "password": "", "topic": "iot/#", + + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included }, ], "destinations": [ @@ -20,6 +23,10 @@ "userName": "", "password": "", "topic": "kawa/dest", + + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included + "batchSize": 100, // Optional defaults to 100 if not included }, { "type":"printer" diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go index 5dbf70d..39af23c 100644 --- a/internal/destinations/mqtt/mqtt.go +++ b/internal/destinations/mqtt/mqtt.go @@ -79,19 +79,15 @@ type mqtt struct { } func New(opts ...Option) *mqtt { - ret := &mqtt{} + ret := &mqtt{ + qos: 1, + retained: false, + batchSize: 100, + topic: "#", + } for _, o := range opts { o(ret) } - if ret.topic == "" { - ret.topic = "#" - } - if ret.qos == 0 { - ret.qos = 1 - } - if ret.batchSize == 0 { - ret.batchSize = 100 - } ret.batcher = batch.NewDestination[types.Event](ret, batch.FlushLength(ret.batchSize), diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go index 92b1d0c..66bdb7b 100644 --- a/internal/sources/mqtt/mqtt.go +++ b/internal/sources/mqtt/mqtt.go @@ -33,8 +33,6 @@ type mqtt struct { qos byte retained bool - - batchSize int } type msgAck struct { @@ -74,12 +72,6 @@ func WithRetained(retained bool) Option { } } -func WithBatchSize(batchSize int) Option { - return func(m *mqtt) { - m.batchSize = batchSize - } -} - func WithUserName(userName string) Option { return func(m *mqtt) { m.userName = userName @@ -94,18 +86,15 @@ func WithPassword(password string) Option { func New(opts ...Option) *mqtt { ret := &mqtt{ - msgC: make(chan msgAck), + msgC: make(chan msgAck), + qos: 1, + retained: false, + topic: "#", } for _, o := range opts { o(ret) } - if ret.topic == "" { - ret.topic = "#" - } - if ret.qos == 0 { - ret.qos = 1 - } return ret } From 65998cf963a06c7e89c79715cd0f3dd223ae7a0a Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Mon, 7 Aug 2023 23:41:43 -0400 Subject: [PATCH 6/8] Fixed some issues with implementation --- README.md | 4 +-- cmd/kawa/config.go | 6 +++-- examples/mqtt_config.json | 4 +-- internal/destinations/mqtt/mqtt.go | 42 +++++++++++++++++------------- internal/sources/mqtt/mqtt.go | 13 +++++---- 5 files changed, 38 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 12722e7..5410fcb 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ Do not read events from the same topic that an MQTT destination is sending to ot "password": "", "topic": "kawa/src", - "qos": 1, // Optional defaults to 1 if not included + "qos": 1, // Optional defaults to 0 if not included "retained": false, // Optional defaults to false if not included } ``` @@ -191,7 +191,7 @@ If topic is not supplied, it will default to the wildcard `#`. "password": "", "topic": "kawa/dest", - "qos": 1, // Optional defaults to 1 if not included + "qos": 1, // Optional defaults to 0 if not included "retained": false, // Optional defaults to false if not included "batchSize": 100, // Optional defaults to 100 if not included } diff --git a/cmd/kawa/config.go b/cmd/kawa/config.go index f9da18a..23b2fab 100644 --- a/cmd/kawa/config.go +++ b/cmd/kawa/config.go @@ -104,7 +104,7 @@ type MQTTConfig struct { func (c *MQTTConfig) Configure() (kawa.Destination[types.Event], error) { slog.Info("configuring mqtt") - return mqtt.New( + mqttDst := mqtt.New( mqtt.WithBroker(c.Broker), mqtt.WithClientID(c.ClientID), mqtt.WithQOS(c.QOS), @@ -113,7 +113,9 @@ func (c *MQTTConfig) Configure() (kawa.Destination[types.Event], error) { mqtt.WithRetained(c.Retained), mqtt.WithUserName(c.UserName), mqtt.WithPassword(c.Password), - ), nil + ) + + return mqttDst, nil } type MQTTSrcConfig struct { diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json index 956b38c..e01906c 100644 --- a/examples/mqtt_config.json +++ b/examples/mqtt_config.json @@ -11,7 +11,7 @@ "password": "", "topic": "iot/#", - "qos": 1, // Optional defaults to 1 if not included + "qos": 1, // Optional defaults to 0 if not included "retained": false, // Optional defaults to false if not included }, ], @@ -24,7 +24,7 @@ "password": "", "topic": "kawa/dest", - "qos": 1, // Optional defaults to 1 if not included + "qos": 1, // Optional defaults to 0 if not included "retained": false, // Optional defaults to false if not included "batchSize": 100, // Optional defaults to 100 if not included }, diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go index 39af23c..026bed8 100644 --- a/internal/destinations/mqtt/mqtt.go +++ b/internal/destinations/mqtt/mqtt.go @@ -28,7 +28,11 @@ func WithClientID(clientID string) Option { func WithTopic(topic string) Option { return func(m *mqtt) { - m.topic = topic + if topic == "" { + m.topic = "#" + } else { + m.topic = topic + } } } @@ -46,7 +50,12 @@ func WithRetained(retained bool) Option { func WithBatchSize(batchSize int) Option { return func(m *mqtt) { - m.batchSize = batchSize + if batchSize > 0 { + m.batchSize = batchSize + } else { + m.batchSize = 100 + } + } } @@ -64,6 +73,7 @@ func WithPassword(password string) Option { type mqtt struct { batcher *batch.Destination[types.Event] + client MQTT.Client broker string clientID string @@ -79,12 +89,7 @@ type mqtt struct { } func New(opts ...Option) *mqtt { - ret := &mqtt{ - qos: 1, - retained: false, - batchSize: 100, - topic: "#", - } + ret := &mqtt{} for _, o := range opts { o(ret) } @@ -104,6 +109,16 @@ func (m *mqtt) Run(ctx context.Context) error { return errors.New("missing clientID") } + opts := MQTT.NewClientOptions().AddBroker(m.broker). + SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) + client := MQTT.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + m.client = client + return m.batcher.Run(ctx) } @@ -113,15 +128,6 @@ func (m *mqtt) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types. // Flush sends the given messages of type kawa.Message[type.Event] to an MQTT topic func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { - opts := MQTT.NewClientOptions().AddBroker(m.broker). - SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) - client := MQTT.NewClient(opts) - // m.client = client - - if token := client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - defer client.Disconnect(250) for _, msg := range msgs { jsonData, err := json.Marshal(msg.Value) @@ -129,7 +135,7 @@ func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) erro return err } - token := client.Publish(m.topic, m.qos, m.retained, jsonData) + token := m.client.Publish(m.topic, m.qos, m.retained, jsonData) token.Wait() if token.Error() != nil { return token.Error() diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go index 66bdb7b..8d0abe6 100644 --- a/internal/sources/mqtt/mqtt.go +++ b/internal/sources/mqtt/mqtt.go @@ -56,7 +56,11 @@ func WithClientID(clientID string) Option { func WithTopic(topic string) Option { return func(m *mqtt) { - m.topic = topic + if topic == "" { + m.topic = "#" + } else { + m.topic = topic + } } } @@ -86,10 +90,7 @@ func WithPassword(password string) Option { func New(opts ...Option) *mqtt { ret := &mqtt{ - msgC: make(chan msgAck), - qos: 1, - retained: false, - topic: "#", + msgC: make(chan msgAck), } for _, o := range opts { @@ -104,10 +105,8 @@ func (m *mqtt) Run(ctx context.Context) error { } func (m *mqtt) recvLoop(ctx context.Context) error { - // Open file to check and save high watermark opts := MQTT.NewClientOptions().AddBroker(m.broker). SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) - //SetAutoReconnect(true).SetConnectRetry(true) client := MQTT.NewClient(opts) From 8511ec6432a61ff9ec06c247825bcaf838b13c78 Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Tue, 8 Aug 2023 00:02:00 -0400 Subject: [PATCH 7/8] Added better error handling --- internal/sources/mqtt/mqtt.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go index 8d0abe6..4a6fc4f 100644 --- a/internal/sources/mqtt/mqtt.go +++ b/internal/sources/mqtt/mqtt.go @@ -9,7 +9,6 @@ import ( MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/runreveal/kawa" "github.com/runreveal/kawa/internal/types" - "golang.org/x/exp/slog" ) type mqttMsg struct { @@ -105,10 +104,7 @@ func (m *mqtt) Run(ctx context.Context) error { } func (m *mqtt) recvLoop(ctx context.Context) error { - opts := MQTT.NewClientOptions().AddBroker(m.broker). - SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) - - client := MQTT.NewClient(opts) + errc := make(chan error) newMessage := func(client MQTT.Client, message MQTT.Message) { rawMsg := mqttMsg{ @@ -122,7 +118,7 @@ func (m *mqtt) recvLoop(ctx context.Context) error { rawMsgBts, err := json.Marshal(rawMsg) if err != nil { - slog.Error(fmt.Sprintf("unmarshaling %+v", err)) + errc <- fmt.Errorf("unmarshaling mqtt %+v", err) } select { @@ -143,6 +139,15 @@ func (m *mqtt) recvLoop(ctx context.Context) error { } } + connLost := func(client MQTT.Client, err error) { + errc <- err + } + + opts := MQTT.NewClientOptions().AddBroker(m.broker). + SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password).SetConnectionLostHandler(connLost) + + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { return fmt.Errorf("mqtt connect error: %s", token.Error()) } @@ -159,6 +164,8 @@ func (m *mqtt) recvLoop(ctx context.Context) error { for { select { case <-time.After(60 * time.Second): + case err := <-errc: + return err case <-ctx.Done(): return ctx.Err() } From f00ec6dffd525512d18e91ccc88497d4b9cb3247 Mon Sep 17 00:00:00 2001 From: Michael Harpe Date: Mon, 28 Aug 2023 11:04:37 -0400 Subject: [PATCH 8/8] Update mqtt to follow the pattern of the others --- README.md | 5 +- cmd/kawad/config.go | 64 +++++ cmd/kawad/internal/destinations/mqtt/mqtt.go | 31 +++ cmd/kawad/internal/sources/mqtt/mqtt.go | 41 ++++ examples/mqtt_config.json | 67 +++-- internal/destinations/mqtt/mqtt.go | 145 ----------- internal/sources/mqtt/mqtt.go | 182 -------------- x/mqtt/mqtt.go | 246 +++++++++++++++++++ 8 files changed, 415 insertions(+), 366 deletions(-) create mode 100644 cmd/kawad/internal/destinations/mqtt/mqtt.go create mode 100644 cmd/kawad/internal/sources/mqtt/mqtt.go delete mode 100644 internal/destinations/mqtt/mqtt.go delete mode 100644 internal/sources/mqtt/mqtt.go create mode 100644 x/mqtt/mqtt.go diff --git a/README.md b/README.md index c63b4e6..5dfe44d 100644 --- a/README.md +++ b/README.md @@ -264,7 +264,7 @@ Do not read events from the same topic that an MQTT destination is sending to ot "password": "", "topic": "kawa/src", - "qos": 1, // Optional defaults to 0 if not included + "qos": 1, // Optional defaults to 1 if not included "retained": false, // Optional defaults to false if not included } ``` @@ -325,9 +325,8 @@ If topic is not supplied, it will default to the wildcard `#`. "password": "", "topic": "kawa/dest", - "qos": 1, // Optional defaults to 0 if not included + "qos": 1, // Optional defaults to 1 if not included "retained": false, // Optional defaults to false if not included - "batchSize": 100, // Optional defaults to 100 if not included } ``` diff --git a/cmd/kawad/config.go b/cmd/kawad/config.go index 2978867..7fddbfe 100644 --- a/cmd/kawad/config.go +++ b/cmd/kawad/config.go @@ -4,13 +4,16 @@ import ( "os" "github.com/runreveal/kawa" + mqttDstkawad "github.com/runreveal/kawa/cmd/kawad/internal/destinations/mqtt" "github.com/runreveal/kawa/cmd/kawad/internal/destinations/printer" "github.com/runreveal/kawa/cmd/kawad/internal/destinations/runreveal" s3kawad "github.com/runreveal/kawa/cmd/kawad/internal/destinations/s3" "github.com/runreveal/kawa/cmd/kawad/internal/sources/journald" + mqttSrckawad "github.com/runreveal/kawa/cmd/kawad/internal/sources/mqtt" "github.com/runreveal/kawa/cmd/kawad/internal/sources/scanner" "github.com/runreveal/kawa/cmd/kawad/internal/sources/syslog" "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/x/mqtt" "github.com/runreveal/kawa/x/s3" "github.com/runreveal/lib/loader" "golang.org/x/exp/slog" @@ -30,6 +33,9 @@ func init() { loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] { return &JournaldConfig{} }) + loader.Register("mqtt", func() loader.Builder[kawa.Source[types.Event]] { + return &MQTTSrcConfig{} + }) loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} @@ -40,6 +46,10 @@ func init() { loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] { return &RunRevealConfig{} }) + loader.Register("mqtt", func() loader.Builder[kawa.Destination[types.Event]] { + return &MQTTDestConfig{} + }) + } type ScannerConfig struct { @@ -114,3 +124,57 @@ func (c *JournaldConfig) Configure() (kawa.Source[types.Event], error) { slog.Info("configuring journald") return journald.New(), nil } + +type MQTTDestConfig struct { + Broker string `json:"broker"` + ClientID string `json:"clientID"` + Topic string `json:"topic"` + + UserName string `json:"userName"` + Password string `json:"password"` + + QOS byte `json:"qos"` + Retained bool `json:"retained"` +} + +func (c *MQTTDestConfig) Configure() (kawa.Destination[types.Event], error) { + slog.Info("configuring mqtt dest") + mqttDst := mqttDstkawad.NewMQTT( + mqtt.WithBroker(c.Broker), + mqtt.WithClientID(c.ClientID), + mqtt.WithQOS(c.QOS), + mqtt.WithTopic(c.Topic), + mqtt.WithRetained(c.Retained), + mqtt.WithUserName(c.UserName), + mqtt.WithPassword(c.Password), + ) + + return mqttDst, nil +} + +type MQTTSrcConfig struct { + Broker string `json:"broker"` + ClientID string `json:"clientID"` + Topic string `json:"topic"` + + UserName string `json:"userName"` + Password string `json:"password"` + + QOS byte `json:"qos"` + Retained bool `json:"retained"` +} + +func (c *MQTTSrcConfig) Configure() (kawa.Source[types.Event], error) { + slog.Info("configuring mqtt src") + mqttSrc := mqttSrckawad.NewMQTT( + mqtt.WithBroker(c.Broker), + mqtt.WithClientID(c.ClientID), + mqtt.WithQOS(c.QOS), + mqtt.WithTopic(c.Topic), + mqtt.WithRetained(c.Retained), + mqtt.WithUserName(c.UserName), + mqtt.WithPassword(c.Password), + ) + + return mqttSrc, nil +} diff --git a/cmd/kawad/internal/destinations/mqtt/mqtt.go b/cmd/kawad/internal/destinations/mqtt/mqtt.go new file mode 100644 index 0000000..f996b94 --- /dev/null +++ b/cmd/kawad/internal/destinations/mqtt/mqtt.go @@ -0,0 +1,31 @@ +package mqttDstkawad + +import ( + "context" + + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/x/mqtt" +) + +type MQTT struct { + wrapped *mqtt.Destination +} + +func NewMQTT(opts ...mqtt.OptFunc) *MQTT { + return &MQTT{wrapped: mqtt.NewDestination(opts...)} +} + +func (p *MQTT) Run(ctx context.Context) error { + return p.wrapped.Run(ctx) +} + +func (p *MQTT) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error { + for _, m := range msg { + err := p.wrapped.Send(ctx, ack, kawa.Message[[]byte]{Value: m.Value.RawLog}) + if err != nil { + return err + } + } + return nil +} diff --git a/cmd/kawad/internal/sources/mqtt/mqtt.go b/cmd/kawad/internal/sources/mqtt/mqtt.go new file mode 100644 index 0000000..29b5634 --- /dev/null +++ b/cmd/kawad/internal/sources/mqtt/mqtt.go @@ -0,0 +1,41 @@ +package mqttSrckawad + +import ( + "context" + "time" + + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/x/mqtt" +) + +type MQTT struct { + wrapped *mqtt.Source +} + +func NewMQTT(opts ...mqtt.OptFunc) *MQTT { + return &MQTT{wrapped: mqtt.NewSource(opts...)} +} + +func (s *MQTT) Run(ctx context.Context) error { + return s.wrapped.Run(ctx) +} + +func (s *MQTT) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { + msg, ack, err := s.wrapped.Recv(ctx) + if err != nil { + return kawa.Message[types.Event]{}, nil, ctx.Err() + } + + eventMsg := kawa.Message[types.Event]{ + Key: msg.Key, + Value: types.Event{ + Timestamp: time.Now(), + SourceType: "mqtt", + RawLog: msg.Value, + }, Topic: msg.Topic, + Attributes: msg.Attributes, + } + + return eventMsg, ack, err +} diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json index e01906c..dff6a60 100644 --- a/examples/mqtt_config.json +++ b/examples/mqtt_config.json @@ -1,37 +1,32 @@ { - // this is an example config file for kawa - // it is parsed using hujson so you can use comments and trailing commas, but - // is otherwise identical to JSON - "sources": [ - { - "type": "mqtt", - "broker": "mqtt://broker.mqtt:1883", - "clientID": "kawa_src", - "userName": "", - "password": "", - "topic": "iot/#", - - "qos": 1, // Optional defaults to 0 if not included - "retained": false, // Optional defaults to false if not included - }, - ], - "destinations": [ - { - "type": "mqtt", - "broker": "mqtt://broker.mqtt:1883", - "clientID": "kawa_dst", - "userName": "", - "password": "", - "topic": "kawa/dest", - - "qos": 1, // Optional defaults to 0 if not included - "retained": false, // Optional defaults to false if not included - "batchSize": 100, // Optional defaults to 100 if not included - }, - { - "type":"printer" - } - ], - } - - \ No newline at end of file + // this is an example config file for kawa + // it is parsed using hujson so you can use comments and trailing commas, but + // is otherwise identical to JSON + "sources": [ + { + "type": "mqtt", + "broker": "mqtt://broker.localhost:1883", + "clientID": "kawa_src", + "userName": "", + "password": "", + "topic": "kawad/src", + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included + }, + ], + "destinations": [ + { + "type": "mqtt", + "broker": "mqtt://broker.localhost:1883", + "clientID": "kawa_dst", + "userName": "", + "password": "", + "topic": "kawad/dest", + "qos": 1, // Optional defaults to 1 if not included + "retained": false, // Optional defaults to false if not included + }, + { + "type": "printer" + } + ], +} \ No newline at end of file diff --git a/internal/destinations/mqtt/mqtt.go b/internal/destinations/mqtt/mqtt.go deleted file mode 100644 index 026bed8..0000000 --- a/internal/destinations/mqtt/mqtt.go +++ /dev/null @@ -1,145 +0,0 @@ -package mqtt - -import ( - "context" - "encoding/json" - "errors" - "time" - - MQTT "github.com/eclipse/paho.mqtt.golang" - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/internal/types" - batch "github.com/runreveal/kawa/x/batcher" -) - -type Option func(*mqtt) - -func WithBroker(broker string) Option { - return func(m *mqtt) { - m.broker = broker - } -} - -func WithClientID(clientID string) Option { - return func(m *mqtt) { - m.clientID = clientID - } -} - -func WithTopic(topic string) Option { - return func(m *mqtt) { - if topic == "" { - m.topic = "#" - } else { - m.topic = topic - } - } -} - -func WithQOS(qos byte) Option { - return func(m *mqtt) { - m.qos = qos - } -} - -func WithRetained(retained bool) Option { - return func(m *mqtt) { - m.retained = retained - } -} - -func WithBatchSize(batchSize int) Option { - return func(m *mqtt) { - if batchSize > 0 { - m.batchSize = batchSize - } else { - m.batchSize = 100 - } - - } -} - -func WithUserName(userName string) Option { - return func(m *mqtt) { - m.userName = userName - } -} - -func WithPassword(password string) Option { - return func(m *mqtt) { - m.password = password - } -} - -type mqtt struct { - batcher *batch.Destination[types.Event] - client MQTT.Client - - broker string - clientID string - topic string - - userName string - password string - - qos byte - retained bool - - batchSize int -} - -func New(opts ...Option) *mqtt { - ret := &mqtt{} - for _, o := range opts { - o(ret) - } - - ret.batcher = batch.NewDestination[types.Event](ret, - batch.FlushLength(ret.batchSize), - batch.FlushFrequency(5*time.Second), - ) - return ret -} - -func (m *mqtt) Run(ctx context.Context) error { - if m.broker == "" { - return errors.New("missing broker") - } - if m.clientID == "" { - return errors.New("missing clientID") - } - - opts := MQTT.NewClientOptions().AddBroker(m.broker). - SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password) - client := MQTT.NewClient(opts) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - - m.client = client - - return m.batcher.Run(ctx) -} - -func (m *mqtt) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { - return m.batcher.Send(ctx, ack, msgs...) -} - -// Flush sends the given messages of type kawa.Message[type.Event] to an MQTT topic -func (m *mqtt) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { - - for _, msg := range msgs { - jsonData, err := json.Marshal(msg.Value) - if err != nil { - return err - } - - token := m.client.Publish(m.topic, m.qos, m.retained, jsonData) - token.Wait() - if token.Error() != nil { - return token.Error() - } - } - return nil -} diff --git a/internal/sources/mqtt/mqtt.go b/internal/sources/mqtt/mqtt.go deleted file mode 100644 index 4a6fc4f..0000000 --- a/internal/sources/mqtt/mqtt.go +++ /dev/null @@ -1,182 +0,0 @@ -package mqtt - -import ( - "context" - "encoding/json" - "fmt" - "time" - - MQTT "github.com/eclipse/paho.mqtt.golang" - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/internal/types" -) - -type mqttMsg struct { - Payload string `json:"payload"` - Topic string `json:"topic"` - Duplicate bool `json:"duplicate"` - MessageID uint16 `json:"messageID"` - QOS byte `json:"qos"` - Retained bool `json:"retained"` -} - -type mqtt struct { - msgC chan msgAck - - broker string - clientID string - topic string - - userName string - password string - - qos byte - retained bool -} - -type msgAck struct { - msg kawa.Message[types.Event] - ack func() -} - -type Option func(*mqtt) - -func WithBroker(broker string) Option { - return func(m *mqtt) { - m.broker = broker - } -} - -func WithClientID(clientID string) Option { - return func(m *mqtt) { - m.clientID = clientID - } -} - -func WithTopic(topic string) Option { - return func(m *mqtt) { - if topic == "" { - m.topic = "#" - } else { - m.topic = topic - } - } -} - -func WithQOS(qos byte) Option { - return func(m *mqtt) { - m.qos = qos - } -} - -func WithRetained(retained bool) Option { - return func(m *mqtt) { - m.retained = retained - } -} - -func WithUserName(userName string) Option { - return func(m *mqtt) { - m.userName = userName - } -} - -func WithPassword(password string) Option { - return func(m *mqtt) { - m.password = password - } -} - -func New(opts ...Option) *mqtt { - ret := &mqtt{ - msgC: make(chan msgAck), - } - - for _, o := range opts { - o(ret) - } - - return ret -} - -func (m *mqtt) Run(ctx context.Context) error { - return m.recvLoop(ctx) -} - -func (m *mqtt) recvLoop(ctx context.Context) error { - errc := make(chan error) - - newMessage := func(client MQTT.Client, message MQTT.Message) { - rawMsg := mqttMsg{ - Payload: string(message.Payload()), - Topic: message.Topic(), - Duplicate: message.Duplicate(), - MessageID: message.MessageID(), - QOS: message.Qos(), - Retained: message.Retained(), - } - - rawMsgBts, err := json.Marshal(rawMsg) - if err != nil { - errc <- fmt.Errorf("unmarshaling mqtt %+v", err) - } - - select { - case m.msgC <- msgAck{ - msg: kawa.Message[types.Event]{ - Value: types.Event{ - Timestamp: time.Now().UTC(), - SourceType: "mqtt", - RawLog: rawMsgBts, - }, - Key: fmt.Sprintf("%d", rawMsg.MessageID), - Topic: rawMsg.Topic, - }, - ack: message.Ack, - }: - case <-ctx.Done(): - return - } - } - - connLost := func(client MQTT.Client, err error) { - errc <- err - } - - opts := MQTT.NewClientOptions().AddBroker(m.broker). - SetClientID(m.clientID).SetUsername(m.userName).SetPassword(m.password).SetConnectionLostHandler(connLost) - - client := MQTT.NewClient(opts) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - return fmt.Errorf("mqtt connect error: %s", token.Error()) - } - - token := client.Subscribe(m.topic, m.qos, newMessage) - token.Wait() - if token.Error() != nil { - return fmt.Errorf("mqtt subscribe error: %s", token.Error()) - } - - defer client.Unsubscribe(m.topic) - defer client.Disconnect(250) - - for { - select { - case <-time.After(60 * time.Second): - case err := <-errc: - return err - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func (s *mqtt) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - select { - case <-ctx.Done(): - return kawa.Message[types.Event]{}, nil, ctx.Err() - case pass := <-s.msgC: - return pass.msg, pass.ack, nil - } -} diff --git a/x/mqtt/mqtt.go b/x/mqtt/mqtt.go new file mode 100644 index 0000000..6b114d4 --- /dev/null +++ b/x/mqtt/mqtt.go @@ -0,0 +1,246 @@ +package mqtt + +import ( + "context" + "errors" + "fmt" + "strconv" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/runreveal/kawa" +) + +type OptFunc func(*Opts) + +type Opts struct { + broker string + clientID string + topic string + + userName string + password string + + qos byte + retained bool +} + +func WithBroker(broker string) func(*Opts) { + return func(opts *Opts) { + opts.broker = broker + } +} + +func WithClientID(clientID string) func(*Opts) { + return func(opts *Opts) { + opts.clientID = clientID + } +} + +func WithTopic(topic string) func(*Opts) { + return func(opts *Opts) { + if topic == "" { + opts.topic = "#" + } else { + opts.topic = topic + } + } +} + +func WithQOS(qos byte) func(*Opts) { + return func(opts *Opts) { + opts.qos = qos + } +} + +func WithRetained(retained bool) func(*Opts) { + return func(opts *Opts) { + opts.retained = retained + } +} + +func WithUserName(userName string) func(*Opts) { + return func(opts *Opts) { + opts.userName = userName + } +} + +func WithPassword(password string) func(*Opts) { + return func(opts *Opts) { + opts.password = password + } +} + +type Destination struct { + client MQTT.Client + cfg Opts +} + +type Source struct { + msgC chan msgAck + cfg Opts +} + +type msgAck struct { + msg kawa.Message[[]byte] + ack func() +} + +func loadOpts(opts []OptFunc) Opts { + cfg := Opts{ + topic: "#", + retained: false, + qos: 1, + } + + for _, o := range opts { + o(&cfg) + } + return cfg +} + +func NewSource(opts ...OptFunc) *Source { + cfg := loadOpts(opts) + + ret := &Source{ + msgC: make(chan msgAck), + cfg: cfg, + } + + return ret +} + +func NewDestination(opts ...OptFunc) *Destination { + cfg := loadOpts(opts) + ret := &Destination{ + cfg: cfg, + } + + return ret +} + +func clientConnect(opts Opts, onLost MQTT.ConnectionLostHandler) (MQTT.Client, error) { + + if opts.broker == "" { + return nil, errors.New("mqtt: missing broker") + } + if opts.clientID == "" { + return nil, errors.New("mqtt: missing clientID") + } + + clientOpts := MQTT.NewClientOptions().AddBroker(opts.broker).SetClientID(opts.clientID).SetConnectionLostHandler(onLost) + + if opts.userName != "" { + clientOpts = clientOpts.SetUsername(opts.userName) + } + if opts.password != "" { + clientOpts = clientOpts.SetPassword(opts.password) + } + + client := MQTT.NewClient(clientOpts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("mqtt connect error: %s", token.Error()) + } + + return client, nil +} + +func (dest *Destination) Run(ctx context.Context) error { + var err error + errc := make(chan error) + + connLost := func(client MQTT.Client, err error) { + errc <- err + } + + dest.client, err = clientConnect(dest.cfg, connLost) + if err != nil { + return err + } + +loop: + for { + select { + case err = <-errc: + break loop + case <-ctx.Done(): + err = ctx.Err() + break loop + } + } + + dest.client.Disconnect(1000) + return err +} + +func (dest *Destination) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error { + for _, msg := range msgs { + + token := dest.client.Publish(dest.cfg.topic, dest.cfg.qos, dest.cfg.retained, string(msg.Value)) + token.Wait() + if token.Error() != nil { + return token.Error() + } + } + return nil +} + +func (src *Source) Run(ctx context.Context) error { + return src.recvLoop(ctx) +} + +func (src *Source) recvLoop(ctx context.Context) error { + errc := make(chan error) + + newMessage := func(client MQTT.Client, message MQTT.Message) { + select { + case src.msgC <- msgAck{ + msg: kawa.Message[[]byte]{ + Value: message.Payload(), + Key: strconv.FormatUint(uint64(message.MessageID()), 10), + Topic: message.Topic(), + }, + ack: message.Ack, + }: + case <-ctx.Done(): + return + } + } + + connLost := func(client MQTT.Client, err error) { + errc <- err + } + + client, err := clientConnect(src.cfg, connLost) + if err != nil { + return err + } + + token := client.Subscribe(src.cfg.topic, src.cfg.qos, newMessage) + token.Wait() + if token.Error() != nil { + return fmt.Errorf("mqtt subscribe error: %s", token.Error()) + } + + defer client.Unsubscribe(src.cfg.topic) + defer client.Disconnect(250) + + for { + select { + // case <-time.After(60 * time.Second): + case err := <-errc: + return err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (src *Source) Recv(ctx context.Context) (kawa.Message[[]byte], func(), error) { + select { + case <-ctx.Done(): + return kawa.Message[[]byte]{}, nil, ctx.Err() + case pass := <-src.msgC: + return pass.msg, pass.ack, nil + } +}