diff --git a/cmd/kawa/config.go b/cmd/kawa/config.go index 4e62964..40e1d9a 100644 --- a/cmd/kawa/config.go +++ b/cmd/kawa/config.go @@ -7,6 +7,7 @@ import ( "github.com/runreveal/kawa/internal/destinations" "github.com/runreveal/kawa/internal/destinations/runreveal" s3 "github.com/runreveal/kawa/internal/destinations/s3" + "github.com/runreveal/kawa/internal/destinations/sqs" "github.com/runreveal/kawa/internal/sources" "github.com/runreveal/kawa/internal/sources/journald" "github.com/runreveal/kawa/internal/sources/syslog" @@ -33,6 +34,9 @@ func init() { loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} }) + loader.Register("sqs", func() loader.Builder[kawa.Destination[types.Event]] { + return &SQSConfig{} + }) loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] { return &S3Config{} }) @@ -93,6 +97,16 @@ type S3Config struct { BatchSize int `json:"batchSize"` } +type SQSConfig struct { + QueueURL string `json:"queueURL"` + Region string `json:"region"` + + AccessKeyID string `json:"accessKeyID"` + AccessSecretKey string `json:"accessSecretKey"` + + BatchSize int `json:"batchSize"` +} + func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { slog.Info("configuring s3") return s3.New( @@ -106,6 +120,17 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { ), nil } +func (c *SQSConfig) Configure() (kawa.Destination[types.Event], error) { + slog.Info("configuring sqs") + return sqs.New( + sqs.WithQueueURL(c.QueueURL), + sqs.WithRegion(c.Region), + sqs.WithAccessKeyID(c.AccessKeyID), + sqs.WithAccessSecretKey(c.AccessSecretKey), + sqs.WithBatchSize(c.BatchSize), + ), nil +} + type JournaldConfig struct { } diff --git a/internal/destinations/s3/s3.go b/internal/destinations/s3/s3.go index de713d9..d2d51c1 100644 --- a/internal/destinations/s3/s3.go +++ b/internal/destinations/s3/s3.go @@ -1,4 +1,4 @@ -package runreveal +package s3 import ( "bytes" diff --git a/internal/destinations/sqs/sqs.go b/internal/destinations/sqs/sqs.go new file mode 100644 index 0000000..be3f6e3 --- /dev/null +++ b/internal/destinations/sqs/sqs.go @@ -0,0 +1,124 @@ +package sqs + +import ( + "context" + "errors" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + awssqs "github.com/aws/aws-sdk-go/service/sqs" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + batch "github.com/runreveal/kawa/x/batcher" + "github.com/segmentio/ksuid" +) + +type Option func(*sqs) + +func WithQueueURL(queueURL string) Option { + return func(s *sqs) { + s.queueURL = queueURL + } +} + +func WithRegion(region string) Option { + return func(s *sqs) { + s.region = region + } +} + +func WithAccessKeyID(accessKeyID string) Option { + return func(s *sqs) { + s.accessKeyID = accessKeyID + } +} + +func WithAccessSecretKey(accessSecretKey string) Option { + return func(s *sqs) { + s.accessSecretKey = accessSecretKey + } +} + +func WithBatchSize(batchSize int) Option { + return func(s *sqs) { + s.batchSize = batchSize + } +} + +type sqs struct { + batcher *batch.Destination[types.Event] + + queueURL string + region string + + accessKeyID string + accessSecretKey string + + batchSize int +} + +func New(opts ...Option) *sqs { + ret := &sqs{} + for _, o := range opts { + o(ret) + } + if ret.batchSize == 0 { + ret.batchSize = 100 + } + ret.batcher = batch.NewDestination[types.Event](ret, + batch.FlushLength(ret.batchSize), + batch.FlushFrequency(5*time.Second), + ) + return ret +} + +func (s *sqs) Run(ctx context.Context) error { + if s.queueURL == "" { + return errors.New("missing queue url") + } + + return s.batcher.Run(ctx) +} + +func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { + return s.batcher.Send(ctx, ack, msgs...) +} + +// Flush sends the given messages of type kawa.Message[type.Event] to an sqs queue +func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { + + var config = &aws.Config{} + if s.accessKeyID != "" && s.accessSecretKey != "" { + config.Credentials = credentials.NewStaticCredentials(s.accessKeyID, s.accessSecretKey, "") + } + if s.region != "" { + config.Region = aws.String(s.region) + } + sess, err := session.NewSession(config) + if err != nil { + return err + } + sqsClient := awssqs.New(sess) + + var entries = []*awssqs.SendMessageBatchRequestEntry{} + for _, msg := range msgs { + entries = append(entries, &awssqs.SendMessageBatchRequestEntry{ + Id: aws.String(ksuid.New().String()), + MessageBody: aws.String(string(msg.Value.RawLog)), + }) + } + + sendMessageInput := &awssqs.SendMessageBatchInput{ + QueueUrl: aws.String(s.queueURL), + Entries: entries, + } + + // Upload the file to S3 + _, err = sqsClient.SendMessageBatch(sendMessageInput) + if err != nil { + return err + } + return nil +}