Skip to content

Commit

Permalink
Add an sqs destination.
Browse files Browse the repository at this point in the history
  • Loading branch information
ejcx committed Aug 13, 2023
1 parent ac1c23b commit 9caa7b9
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 1 deletion.
25 changes: 25 additions & 0 deletions cmd/kawa/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/runreveal/kawa/internal/destinations"
"github.com/runreveal/kawa/internal/destinations/runreveal"
s3 "github.com/runreveal/kawa/internal/destinations/s3"
"github.com/runreveal/kawa/internal/destinations/sqs"
"github.com/runreveal/kawa/internal/sources"
"github.com/runreveal/kawa/internal/sources/journald"
"github.com/runreveal/kawa/internal/sources/syslog"
Expand All @@ -33,6 +34,9 @@ func init() {
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("sqs", func() loader.Builder[kawa.Destination[types.Event]] {
return &SQSConfig{}
})
loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] {
return &S3Config{}
})
Expand Down Expand Up @@ -93,6 +97,16 @@ type S3Config struct {
BatchSize int `json:"batchSize"`
}

type SQSConfig struct {
QueueURL string `json:"queueURL"`
Region string `json:"region"`

AccessKeyID string `json:"accessKeyID"`
AccessSecretKey string `json:"accessSecretKey"`

BatchSize int `json:"batchSize"`
}

func (c *S3Config) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring s3")
return s3.New(
Expand All @@ -106,6 +120,17 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) {
), nil
}

func (c *SQSConfig) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring sqs")
return sqs.New(
sqs.WithQueueURL(c.QueueURL),
sqs.WithRegion(c.Region),
sqs.WithAccessKeyID(c.AccessKeyID),
sqs.WithAccessSecretKey(c.AccessSecretKey),
sqs.WithBatchSize(c.BatchSize),
), nil
}

type JournaldConfig struct {
}

Expand Down
2 changes: 1 addition & 1 deletion internal/destinations/s3/s3.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package runreveal
package s3

import (
"bytes"
Expand Down
124 changes: 124 additions & 0 deletions internal/destinations/sqs/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sqs

import (
"context"
"errors"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
awssqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
batch "github.com/runreveal/kawa/x/batcher"
"github.com/segmentio/ksuid"
)

type Option func(*sqs)

func WithQueueURL(queueURL string) Option {
return func(s *sqs) {
s.queueURL = queueURL
}
}

func WithRegion(region string) Option {
return func(s *sqs) {
s.region = region
}
}

func WithAccessKeyID(accessKeyID string) Option {
return func(s *sqs) {
s.accessKeyID = accessKeyID
}
}

func WithAccessSecretKey(accessSecretKey string) Option {
return func(s *sqs) {
s.accessSecretKey = accessSecretKey
}
}

func WithBatchSize(batchSize int) Option {
return func(s *sqs) {
s.batchSize = batchSize
}
}

type sqs struct {
batcher *batch.Destination[types.Event]

queueURL string
region string

accessKeyID string
accessSecretKey string

batchSize int
}

func New(opts ...Option) *sqs {
ret := &sqs{}
for _, o := range opts {
o(ret)
}
if ret.batchSize == 0 {
ret.batchSize = 100
}
ret.batcher = batch.NewDestination[types.Event](ret,
batch.FlushLength(ret.batchSize),
batch.FlushFrequency(5*time.Second),
)
return ret
}

func (s *sqs) Run(ctx context.Context) error {
if s.queueURL == "" {
return errors.New("missing queue url")
}

return s.batcher.Run(ctx)
}

func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error {
return s.batcher.Send(ctx, ack, msgs...)
}

// Flush sends the given messages of type kawa.Message[type.Event] to an sqs queue
func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error {

var config = &aws.Config{}
if s.accessKeyID != "" && s.accessSecretKey != "" {
config.Credentials = credentials.NewStaticCredentials(s.accessKeyID, s.accessSecretKey, "")
}
if s.region != "" {
config.Region = aws.String(s.region)
}
sess, err := session.NewSession(config)
if err != nil {
return err
}
sqsClient := awssqs.New(sess)

var entries = []*awssqs.SendMessageBatchRequestEntry{}
for _, msg := range msgs {
entries = append(entries, &awssqs.SendMessageBatchRequestEntry{
Id: aws.String(ksuid.New().String()),
MessageBody: aws.String(string(msg.Value.RawLog)),
})
}

sendMessageInput := &awssqs.SendMessageBatchInput{
QueueUrl: aws.String(s.queueURL),
Entries: entries,
}

// Upload the file to S3
_, err = sqsClient.SendMessageBatch(sendMessageInput)
if err != nil {
return err
}
return nil
}

0 comments on commit 9caa7b9

Please sign in to comment.