diff --git a/README.md b/README.md index db9bdd2..efa123c 100644 --- a/README.md +++ b/README.md @@ -18,21 +18,8 @@ kawa ("Kaa-Wah") is an opinionated framework for scalable, reliable stream processing. -kawad ("Kaa-Wah-Dee") is a daemon for collecting system logs and metrics. - # Installation -## Kawad - -Find the package for your OS and architecture on the releases page. Download -that file to the machine, and install somewhere visible on your $path. - - curl -L https://github.com/runreveal/kawa/releases/download//kawa-linux-amd64.tar.gz | sudo tar --directory /usr/local/bin -xz - -Copy an example config from the examples/ directory, then run it! There is -also an example for deploying as a systemd service. Additionally, we'll have -kubernetes examples soon. - ## Kawa Add the library to your project as you would any other Go library: @@ -47,9 +34,7 @@ See https://blog.runreveal.com/kawa-the-event-processor-for-the-grug-brained-dev # Roadmap -- Ensure that consumers of kawa aren't subject to all the dependencies of the - kawa program. -- Related: consider breaking apart the library from the daemon. +- Ensure that consumers of kawa aren't subject to all the dependencies of the plugins. - Event Routing and/or Multiple Processors in kawa program - Dynamic Sources (e.g. Kafka Consumer Groups) @@ -59,45 +44,6 @@ This is nascent software, subject to breaking changes as we reach a good working set of APIs, interfaces and data models. Please try it out and help shape the direction of the project by giving us feedback! -# Getting started using Kawad - -An example use case might be shipping your nginx logs to s3. Save the following -config.json, and fill in the config file. - -``` -{ - "sources": [ - { - "type": "syslog", - "addr": "0.0.0.0:5514", - "contentType": "application/json; rrtype=nginx-json", - }, - ], - "destinations": [ - { - "type": "s3", - "bucketName": "{{YOUR-S3-BUCKET-NAME}}", - "bucketRegion": "us-east-2", - }, - ], -} -``` - -Next, add the following line to your nginx server config. - -``` -server { - access_log syslog:server=127.0.0.1:5514; - # ... other config ... -} -``` - -Run it! - -``` -$ kawa run --config config.json -``` - # Development & Extension The source and destination interfaces are designed for simplicity of use and diff --git a/cmd/kawad/config.go b/cmd/kawad/config.go deleted file mode 100644 index 7ca4432..0000000 --- a/cmd/kawad/config.go +++ /dev/null @@ -1,181 +0,0 @@ -package main - -import ( - "os" - "time" - - "log/slog" - - "github.com/runreveal/kawa" - mqttDstkawad "github.com/runreveal/kawa/cmd/kawad/internal/destinations/mqtt" - "github.com/runreveal/kawa/cmd/kawad/internal/destinations/printer" - "github.com/runreveal/kawa/cmd/kawad/internal/destinations/runreveal" - s3kawad "github.com/runreveal/kawa/cmd/kawad/internal/destinations/s3" - "github.com/runreveal/kawa/cmd/kawad/internal/sources/journald" - mqttSrckawad "github.com/runreveal/kawa/cmd/kawad/internal/sources/mqtt" - "github.com/runreveal/kawa/cmd/kawad/internal/sources/scanner" - "github.com/runreveal/kawa/cmd/kawad/internal/sources/syslog" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/mqtt" - "github.com/runreveal/kawa/x/s3" - "github.com/runreveal/lib/loader" - // We could register and configure these in a separate package - // using the init() function. - // That would make it easy to "dynamically" enable and disable them at - // compile time since it would simply be updating the import list. -) - -func init() { - loader.Register("scanner", func() loader.Builder[kawa.Source[types.Event]] { - return &ScannerConfig{} - }) - loader.Register("syslog", func() loader.Builder[kawa.Source[types.Event]] { - return &SyslogConfig{} - }) - loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] { - return &JournaldConfig{} - }) - loader.Register("mqtt", func() loader.Builder[kawa.Source[types.Event]] { - return &MQTTSrcConfig{} - }) - loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { - return &PrinterConfig{} - }) - loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] { - return &S3Config{} - }) - loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] { - return &RunRevealConfig{} - }) - loader.Register("mqtt", func() loader.Builder[kawa.Destination[types.Event]] { - return &MQTTDestConfig{} - }) - -} - -type ScannerConfig struct { -} - -func (c *ScannerConfig) Configure() (kawa.Source[types.Event], error) { - slog.Info("configuring scanner") - return scanner.NewScanner(os.Stdin), nil -} - -type SyslogConfig struct { - Addr string `json:"addr"` - ContentType string `json:"contentType"` -} - -func (c *SyslogConfig) Configure() (kawa.Source[types.Event], error) { - slog.Info("configuring syslog") - return syslog.NewSyslogSource(syslog.SyslogCfg{ - Addr: c.Addr, - ContentType: c.ContentType, - }), nil -} - -type PrinterConfig struct { -} - -func (c *PrinterConfig) Configure() (kawa.Destination[types.Event], error) { - slog.Info("configuring printer") - return printer.NewPrinter(os.Stdout), nil -} - -type RunRevealConfig struct { - WebhookURL string `json:"webhookURL"` - BatchSize int `json:"batchSize"` - FlushFreq time.Duration `json:"flushFreq"` -} - -func (c *RunRevealConfig) Configure() (kawa.Destination[types.Event], error) { - slog.Info("configuring runreveal") - return runreveal.New( - runreveal.WithWebhookURL(c.WebhookURL), - runreveal.WithBatchSize(c.BatchSize), - runreveal.WithFlushFrequency(c.FlushFreq), - ), nil -} - -type S3Config struct { - BucketName string `json:"bucketName"` - PathPrefix string `json:"pathPrefix"` - BucketRegion string `json:"bucketRegion"` - - CustomEndpoint string `json:"customEndpoint"` - AccessKeyID string `json:"accessKeyID"` - AccessSecretKey string `json:"accessSecretKey"` - - BatchSize int `json:"batchSize"` -} - -func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { - slog.Info("configuring s3") - return s3kawad.NewS3( - s3.WithBucketName(c.BucketName), - s3.WithBucketRegion(c.BucketRegion), - s3.WithPathPrefix(c.PathPrefix), - s3.WithCustomEndpoint(c.CustomEndpoint), - s3.WithAccessKeyID(c.AccessKeyID), - s3.WithAccessSecretKey(c.AccessSecretKey), - s3.WithBatchSize(c.BatchSize), - ), nil -} - -type JournaldConfig struct { -} - -func (c *JournaldConfig) Configure() (kawa.Source[types.Event], error) { - slog.Info("configuring journald") - return journald.New(), nil -} - -type MQTTDestConfig struct { - Broker string `json:"broker"` - ClientID string `json:"clientID"` - Topic string `json:"topic"` - - UserName string `json:"userName"` - Password string `json:"password"` - - QOS byte `json:"qos"` - Retained bool `json:"retained"` -} - -func (c *MQTTDestConfig) Configure() (kawa.Destination[types.Event], error) { - slog.Info("configuring mqtt dest") - return mqttDstkawad.NewMQTT( - mqtt.WithBroker(c.Broker), - mqtt.WithClientID(c.ClientID), - mqtt.WithQOS(c.QOS), - mqtt.WithTopic(c.Topic), - mqtt.WithRetained(c.Retained), - mqtt.WithUserName(c.UserName), - mqtt.WithPassword(c.Password), - ) -} - -type MQTTSrcConfig struct { - Broker string `json:"broker"` - ClientID string `json:"clientID"` - Topic string `json:"topic"` - - UserName string `json:"userName"` - Password string `json:"password"` - - QOS byte `json:"qos"` - Retained bool `json:"retained"` -} - -func (c *MQTTSrcConfig) Configure() (kawa.Source[types.Event], error) { - slog.Info("configuring mqtt src") - return mqttSrckawad.NewMQTT( - mqtt.WithBroker(c.Broker), - mqtt.WithClientID(c.ClientID), - mqtt.WithQOS(c.QOS), - mqtt.WithTopic(c.Topic), - mqtt.WithRetained(c.Retained), - mqtt.WithUserName(c.UserName), - mqtt.WithPassword(c.Password), - ) -} diff --git a/cmd/kawad/config_windows.go b/cmd/kawad/config_windows.go deleted file mode 100644 index ec97528..0000000 --- a/cmd/kawad/config_windows.go +++ /dev/null @@ -1,30 +0,0 @@ -//go:build windows -// +build windows - -package main - -import ( - "log/slog" - - "github.com/runreveal/kawa" - windowskawad "github.com/runreveal/kawa/cmd/kawad/internal/sources/windows" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/windows" - "github.com/runreveal/lib/loader" -) - -func init() { - loader.Register("eventlog", func() loader.Builder[kawa.Source[types.Event]] { - return &EventLogConfig{} - }) -} - -type EventLogConfig struct { - Channel string `json:"channel"` - Query string `json:"query"` -} - -func (c *EventLogConfig) Configure() (kawa.Source[types.Event], error) { - slog.Info("configuring windows event log") - return windowskawad.NewEventLog(windows.WithChannel(c.Channel), windows.WithQuery(c.Query)), nil -} diff --git a/cmd/kawad/internal/README.md b/cmd/kawad/internal/README.md deleted file mode 100644 index f9254db..0000000 --- a/cmd/kawad/internal/README.md +++ /dev/null @@ -1 +0,0 @@ -The internal directory contains the kawa implementations for kawad. diff --git a/cmd/kawad/internal/destinations/mqtt/mqtt.go b/cmd/kawad/internal/destinations/mqtt/mqtt.go deleted file mode 100644 index acf960e..0000000 --- a/cmd/kawad/internal/destinations/mqtt/mqtt.go +++ /dev/null @@ -1,35 +0,0 @@ -package mqttDstkawad - -import ( - "context" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/mqtt" -) - -type MQTT struct { - wrapped *mqtt.Destination -} - -func NewMQTT(opts ...mqtt.OptFunc) (*MQTT, error) { - dst, err := mqtt.NewDestination(opts...) - if err != nil { - return nil, err - } - return &MQTT{wrapped: dst}, nil -} - -func (p *MQTT) Run(ctx context.Context) error { - return p.wrapped.Run(ctx) -} - -func (p *MQTT) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error { - for _, m := range msg { - err := p.wrapped.Send(ctx, ack, kawa.Message[[]byte]{Value: m.Value.RawLog}) - if err != nil { - return err - } - } - return nil -} diff --git a/cmd/kawad/internal/destinations/printer/printer.go b/cmd/kawad/internal/destinations/printer/printer.go deleted file mode 100644 index 296c5a7..0000000 --- a/cmd/kawad/internal/destinations/printer/printer.go +++ /dev/null @@ -1,28 +0,0 @@ -package printer - -import ( - "context" - "io" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/printer" -) - -type Printer struct { - wrapped *printer.Printer -} - -func NewPrinter(writer io.Writer) *Printer { - return &Printer{wrapped: printer.NewPrinter(writer)} -} - -func (p *Printer) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error { - for _, m := range msg { - err := p.wrapped.Send(ctx, ack, kawa.Message[[]byte]{Value: m.Value.RawLog}) - if err != nil { - return err - } - } - return nil -} diff --git a/cmd/kawad/internal/destinations/runreveal/runreveal.go b/cmd/kawad/internal/destinations/runreveal/runreveal.go deleted file mode 100644 index 1d8d1dd..0000000 --- a/cmd/kawad/internal/destinations/runreveal/runreveal.go +++ /dev/null @@ -1,124 +0,0 @@ -package runreveal - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "time" - - "log/slog" - - "github.com/carlmjohnson/requests" - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - batch "github.com/runreveal/kawa/x/batcher" -) - -type Option func(*RunReveal) - -func WithWebhookURL(url string) Option { - return func(r *RunReveal) { - r.webhookURL = url - } -} - -func WithHTTPClient(httpc *http.Client) Option { - return func(r *RunReveal) { - r.httpc = httpc - } -} - -func WithBatchSize(size int) Option { - return func(r *RunReveal) { - r.batchSize = size - } -} - -func WithFlushFrequency(t time.Duration) Option { - return func(r *RunReveal) { - r.flushFreq = t - } -} - -type RunReveal struct { - httpc *http.Client - batcher *batch.Destination[types.Event] - - batchSize int - flushFreq time.Duration - webhookURL string - reqConf requests.Config -} - -func New(opts ...Option) *RunReveal { - ret := &RunReveal{ - httpc: http.DefaultClient, - } - for _, o := range opts { - o(ret) - } - - if ret.batchSize <= 0 { - ret.batchSize = 100 - } - if ret.flushFreq <= 0 { - ret.flushFreq = 15 * time.Second - } - - ret.batcher = batch.NewDestination[types.Event](ret, - batch.FlushLength(ret.batchSize), - batch.FlushFrequency(ret.flushFreq), - batch.FlushParallelism(2), - ) - return ret -} - -func (r *RunReveal) Run(ctx context.Context) error { - if r.webhookURL == "" { - return errors.New("missing webhook url") - } - - r.reqConf = func(rb *requests.Builder) { - rb. - UserAgent("kawa"). - Accept("application/json"). - BaseURL(r.webhookURL). - Header("Content-Type", "application/json") - } - - return r.batcher.Run(ctx) -} - -func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { - return r.batcher.Send(ctx, ack, msgs...) -} - -func (r *RunReveal) newReq() *requests.Builder { - return requests.New(r.reqConf) -} - -// Flush sends the given messages of type kawa.Message[type.Event] to the RunReveal api -func (r *RunReveal) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { - - slog.Debug("sending batch to runreveal", "count", len(msgs)) - - batch := make([]json.RawMessage, len(msgs)) - var err error - for i, msg := range msgs { - batch[i], err = json.Marshal(msg.Value) - if err != nil { - slog.Error("error marshalling event", "err", err) - continue - } - } - - // Send events to the webhookURL using POST - err = r.newReq().BodyJSON(batch).Fetch(ctx) - if err != nil { - slog.Error("error sending batch to runreveal", "err", err) - return err - } - // TODO: retries - return nil -} diff --git a/cmd/kawad/internal/destinations/s3/s3.go b/cmd/kawad/internal/destinations/s3/s3.go deleted file mode 100644 index e821006..0000000 --- a/cmd/kawad/internal/destinations/s3/s3.go +++ /dev/null @@ -1,27 +0,0 @@ -package s3kawad - -import ( - "context" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/s3" -) - -type S3 struct { - wrapped *s3.S3 -} - -func NewS3(opts ...s3.Option) *S3 { - return &S3{wrapped: s3.New(opts...)} -} - -func (p *S3) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error { - for _, m := range msg { - err := p.wrapped.Send(ctx, ack, kawa.Message[[]byte]{Value: m.Value.RawLog}) - if err != nil { - return err - } - } - return nil -} diff --git a/cmd/kawad/internal/queue/queue.go b/cmd/kawad/internal/queue/queue.go deleted file mode 100644 index 3985b3e..0000000 --- a/cmd/kawad/internal/queue/queue.go +++ /dev/null @@ -1,110 +0,0 @@ -package queue - -import ( - "context" - "errors" - - "log/slog" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/multi" - "github.com/runreveal/lib/await" -) - -type Option func(*Queue) - -type Source struct { - Name string - Source kawa.Source[types.Event] -} - -type Destination struct { - Name string - Destination kawa.Destination[types.Event] -} - -func WithSources(srcs map[string]Source) Option { - return func(q *Queue) { - q.Sources = srcs - } -} - -func WithDestinations(dsts map[string]Destination) Option { - return func(q *Queue) { - q.Destinations = dsts - } -} - -type Queue struct { - Sources map[string]Source - Destinations map[string]Destination -} - -var ( - ErrNoSources = errors.New("no sources configured") - ErrNoDestinations = errors.New("no destinations configured") -) - -func (q *Queue) Validate() error { - if len(q.Sources) == 0 { - return ErrNoSources - } - if len(q.Destinations) == 0 { - return ErrNoDestinations - } - return nil -} - -func New(opts ...Option) *Queue { - var q Queue - - for _, opt := range opts { - opt(&q) - } - - return &q -} - -func (q *Queue) Run(ctx context.Context) error { - if err := q.Validate(); err != nil { - return err - } - w := await.New(await.WithSignals) - - var srcs []kawa.Source[types.Event] - for _, s := range q.Sources { - if r, ok := s.Source.(await.Runner); ok { - w.AddNamed(r, s.Name) - } - srcs = append(srcs, s.Source) - } - - var dsts []kawa.Destination[types.Event] - for _, d := range q.Destinations { - if r, ok := d.Destination.(await.Runner); ok { - w.AddNamed(r, d.Name) - } - dsts = append(dsts, d.Destination) - } - multiDst := multi.NewMultiDestination(dsts) - multiSrc := multi.NewMultiSource(srcs) - - w.AddNamed(multiSrc, "multi-source") - - p, err := kawa.New(kawa.Config[types.Event, types.Event]{ - Source: multiSrc, - Destination: multiDst, - Handler: kawa.Pipe[types.Event](), - // NOTE(alan): don't increase parallelism on this processor until we've - // verified thread safety thread-safe story. - }, kawa.Parallelism(1)) - if err != nil { - return err - } - w.AddNamed(p, "processor") - slog.Info("running queue") - err = w.Run(ctx) - slog.Error("stopping", "error", err) - return err -} diff --git a/cmd/kawad/internal/sources/journald/journald.go b/cmd/kawad/internal/sources/journald/journald.go deleted file mode 100644 index 7453fce..0000000 --- a/cmd/kawad/internal/sources/journald/journald.go +++ /dev/null @@ -1,233 +0,0 @@ -package journald - -import ( - "bufio" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "strings" - "sync" - "time" - - "log/slog" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" -) - -type Journald struct { - msgC chan kawa.MsgAck[types.Event] -} - -func New() *Journald { - return &Journald{ - msgC: make(chan kawa.MsgAck[types.Event]), - } -} - -func (s *Journald) Run(ctx context.Context) error { - return s.recvLoop(ctx) -} - -func (s *Journald) recvLoop(ctx context.Context) error { - // Open file to check and save high watermark - hwmFile, err := os.OpenFile("/tmp/kawad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644)) - if err != nil { - return err - } - defer hwmFile.Close() - - // Read high watermark from file - bts, err := io.ReadAll(hwmFile) - if err != nil { - return err - } - - // Save high watermark to file - ack := func(cursor string) { - var err error - defer func() { - if err != nil { - slog.Error(fmt.Sprintf("writing high watermark: %+v", err)) - } - }() - err = hwmFile.Truncate(0) - if err != nil { - return - } - _, err = hwmFile.Seek(0, 0) - if err != nil { - return - } - _, err = hwmFile.WriteString(cursor) - if err != nil { - return - } - } - - args := []string{ - "journalctl", "-b", "-af", "-o", "json", - } - if len(bts) > 0 { - // Resume reading from the location of a previous invocation - args = append(args, "--after-cursor", string(bts)) - } else { - // Read all logs for this boot - args = append(args, "--since", "1970-01-01 00:00:00") - } - slog.Debug(fmt.Sprintf("running: `%s`", strings.Join(args, " "))) - - cmd := exec.Command(args[0], args[1:]...) - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - if err := cmd.Start(); err != nil { - return err - } - scanner := bufio.NewScanner(stdout) - var wg sync.WaitGroup - - slog.Info("reading journald") - -loop: - for scanner.Scan() { - bts := make([]byte, len(scanner.Bytes())) - copy(bts, scanner.Bytes()) - - // Parse timestamp from log - log := autoGeneratedJournal{} - var ts time.Time - if err := json.Unmarshal(bts, &log); err != nil { - slog.Error(fmt.Sprintf("unmarshaling: %+v", err)) - continue - } else { - ts, err = parseUnixMicroseconds(log.RealtimeTimestamp) - if err != nil { - slog.Error(fmt.Sprintf("parsing timestamp: %+v", err)) - } - } - - wg.Add(1) - select { - case s.msgC <- kawa.MsgAck[types.Event]{ - Msg: kawa.Message[types.Event]{ - Value: types.Event{ - Timestamp: ts, - SourceType: "journald", - RawLog: bts, - }, - }, - Ack: func() { - ack(log.Cursor) - wg.Done() - }, - }: - case <-ctx.Done(): - break loop - } - } - if err := scanner.Err(); err != nil { - return fmt.Errorf("scanning: %+w", err) - } - - slog.Info("waiting for journald to exit") - - c := make(chan struct{}) - go func() { - wg.Wait() - close(c) - }() - - select { - // We've received all the logs - case <-c: - case <-ctx.Done(): - return ctx.Err() - } - return cmd.Wait() -} - -func (s *Journald) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - select { - case <-ctx.Done(): - return kawa.Message[types.Event]{}, nil, ctx.Err() - case pass := <-s.msgC: - return pass.Msg, pass.Ack, nil - } -} - -func parseUnixMicroseconds(s string) (time.Time, error) { - microseconds, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return time.Time{}, err - } - - // Convert microseconds to seconds and remainder microseconds - sec := microseconds / 1e6 - nsec := (microseconds % 1e6) * 1e3 - - // Create a new time.Time value - return time.Unix(sec, nsec), nil -} - -type journalMsg []byte - -func (jm *journalMsg) UnmarshalJSON(b []byte) error { - if len(b) == 0 { - return errors.New("unexpected end of JSON input for journalMsg") - } - var err error - switch b[0] { - case '"': - var s string - err = json.Unmarshal(b, &s) - if err != nil { - return err - } - *jm = []byte(s) - case '[': - var bts []byte - err = json.Unmarshal(b, &bts) - if err != nil { - return err - } - *jm = bts - default: - err = fmt.Errorf("unexpected character in journalMsg: %s. expecting string or list", string(b[0])) - } - return err -} - -// There are other fields, but these should be on just about every journald event -type autoGeneratedJournal struct { - Message journalMsg `json:"MESSAGE"` - // Unix Timestamp in Microseconds since epoch as string - RealtimeTimestamp string `json:"__REALTIME_TIMESTAMP"` - SyslogIdentifier string `json:"SYSLOG_IDENTIFIER"` - Hostname string `json:"_HOSTNAME"` - Cursor string `json:"__CURSOR"` - - // BootID string `json:"_BOOT_ID"` - // CapEffective string `json:"_CAP_EFFECTIVE"` - // Cmdline string `json:"_CMDLINE"` - // Comm string `json:"_COMM"` - // Exe string `json:"_EXE"` - // Gid string `json:"_GID"` - // MachineID string `json:"_MACHINE_ID"` - // MonotonicTimestamp string `json:"__MONOTONIC_TIMESTAMP"` - // Pid string `json:"_PID"` - // Priority string `json:"PRIORITY"` - // SelinuxContext string `json:"_SELINUX_CONTEXT"` - // SyslogFacility string `json:"SYSLOG_FACILITY"` - // SystemdCgroup string `json:"_SYSTEMD_CGROUP"` - // SystemdSlice string `json:"_SYSTEMD_SLICE"` - // SystemdUnit string `json:"_SYSTEMD_UNIT"` - // Transport string `json:"_TRANSPORT"` - // UID string `json:"_UID"` -} diff --git a/cmd/kawad/internal/sources/mqtt/mqtt.go b/cmd/kawad/internal/sources/mqtt/mqtt.go deleted file mode 100644 index 834fccd..0000000 --- a/cmd/kawad/internal/sources/mqtt/mqtt.go +++ /dev/null @@ -1,45 +0,0 @@ -package mqttSrckawad - -import ( - "context" - "time" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/mqtt" -) - -type MQTT struct { - wrapped *mqtt.Source -} - -func NewMQTT(opts ...mqtt.OptFunc) (*MQTT, error) { - src, err := mqtt.NewSource(opts...) - if err != nil { - return nil, err - } - return &MQTT{wrapped: src}, nil -} - -func (s *MQTT) Run(ctx context.Context) error { - return s.wrapped.Run(ctx) -} - -func (s *MQTT) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - msg, ack, err := s.wrapped.Recv(ctx) - if err != nil { - return kawa.Message[types.Event]{}, nil, ctx.Err() - } - - eventMsg := kawa.Message[types.Event]{ - Key: msg.Key, - Value: types.Event{ - Timestamp: time.Now(), - SourceType: "mqtt", - RawLog: msg.Value, - }, Topic: msg.Topic, - Attributes: msg.Attributes, - } - - return eventMsg, ack, err -} diff --git a/cmd/kawad/internal/sources/scanner/scanner.go b/cmd/kawad/internal/sources/scanner/scanner.go deleted file mode 100644 index 4015c46..0000000 --- a/cmd/kawad/internal/sources/scanner/scanner.go +++ /dev/null @@ -1,39 +0,0 @@ -package scanner - -import ( - "context" - "io" - "time" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/scanner" -) - -type Scanner struct { - wrapped *scanner.Scanner -} - -func NewScanner(reader io.Reader) *Scanner { - return &Scanner{ - wrapped: scanner.NewScanner(reader), - } -} - -func (s *Scanner) Run(ctx context.Context) error { - return s.wrapped.Run(ctx) -} - -func (s *Scanner) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - msg, ack, err := s.wrapped.Recv(ctx) - if err != nil { - return kawa.Message[types.Event]{}, nil, err - } - return kawa.Message[types.Event]{ - Value: types.Event{ - Timestamp: time.Now(), - SourceType: "scanner", - RawLog: msg.Value, - }, - }, ack, nil -} diff --git a/cmd/kawad/internal/sources/syslog/syslog.go b/cmd/kawad/internal/sources/syslog/syslog.go deleted file mode 100644 index b523882..0000000 --- a/cmd/kawad/internal/sources/syslog/syslog.go +++ /dev/null @@ -1,95 +0,0 @@ -package syslog - -import ( - "context" - "fmt" - "time" - - "log/slog" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "gopkg.in/mcuadros/go-syslog.v2" -) - -type SyslogCfg struct { - Addr string `json:"addr"` - ContentType string `json:"contentType"` -} - -type SyslogSource struct { - cfg SyslogCfg - server *syslog.Server - syslogPartsC syslog.LogPartsChannel -} - -func NewSyslogSource(cfg SyslogCfg) *SyslogSource { - server := syslog.NewServer() - channel := make(syslog.LogPartsChannel) - handler := syslog.NewChannelHandler(channel) - server.SetFormat(syslog.RFC3164) - server.SetHandler(handler) - return &SyslogSource{ - cfg: cfg, - server: server, - syslogPartsC: channel, - } -} - -func (s *SyslogSource) Run(ctx context.Context) error { - slog.Info(fmt.Sprintf("starting syslog server on socket %s", s.cfg.Addr)) - err := s.server.ListenUDP(s.cfg.Addr) - if err != nil { - return err - } - err = s.server.Boot() - if err != nil { - return err - } - - done := make(chan struct{}) - go func() { - s.server.Wait() - close(done) - }() - - select { - case <-ctx.Done(): - slog.Info("stopping syslog server") - err := s.server.Kill() - return err - case <-done: - } - return nil -} - -func (s *SyslogSource) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - select { - case logParts := <-s.syslogPartsC: - if content, ok := logParts["content"]; ok { - rawLog := []byte(content.(string)) - - ts := time.Now().UTC() - if timestamp, ok := logParts["timestamp"]; ok { - if ts, ok = timestamp.(time.Time); !ok { - ts = time.Now().UTC() - } - } - - msg := kawa.Message[types.Event]{ - Value: types.Event{ - Timestamp: ts, - SourceType: "syslog", - RawLog: rawLog, - ContentType: s.cfg.ContentType, - }, - } - return msg, nil, nil - } else { - fmt.Println("warn: found syslog without 'content' key") - } - case <-ctx.Done(): - return kawa.Message[types.Event]{}, nil, ctx.Err() - } - panic("unreachable!") -} diff --git a/cmd/kawad/internal/sources/windows/eventlog.go b/cmd/kawad/internal/sources/windows/eventlog.go deleted file mode 100644 index ae2f8c1..0000000 --- a/cmd/kawad/internal/sources/windows/eventlog.go +++ /dev/null @@ -1,52 +0,0 @@ -//go:build windows -// +build windows - -package windowskawad - -import ( - "context" - "encoding/json" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/kawa/x/windows" -) - -type EventLog struct { - wrapped *windows.EventLogSource -} - -func NewEventLog(opts ...windows.Option) *EventLog { - return &EventLog{ - wrapped: windows.NewEventLogSource(opts...), - } -} - -func (s *EventLog) Run(ctx context.Context) error { - return s.wrapped.Run(ctx) -} - -func (s *EventLog) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { - msg, ack, err := s.wrapped.Recv(ctx) - if err != nil { - return kawa.Message[types.Event]{}, nil, ctx.Err() - } - - rawLog, err := json.Marshal(msg.Value) - if err != nil { - return kawa.Message[types.Event]{}, nil, ctx.Err() - } - - eventMsg := kawa.Message[types.Event]{ - Key: msg.Key, - Value: types.Event{ - Timestamp: msg.Value.System.TimeCreated.SystemTime, - SourceType: "eventlog", - RawLog: rawLog, - }, - Topic: msg.Topic, - Attributes: msg.Attributes, - } - - return eventMsg, ack, err -} diff --git a/cmd/kawad/internal/types/types.go b/cmd/kawad/internal/types/types.go deleted file mode 100644 index b344bca..0000000 --- a/cmd/kawad/internal/types/types.go +++ /dev/null @@ -1,10 +0,0 @@ -package types - -import "time" - -type Event struct { - Timestamp time.Time `json:"ts"` - SourceType string `json:"sourceType"` - ContentType string `json:"contentType"` - RawLog []byte `json:"rawLog"` -} diff --git a/cmd/kawad/main.go b/cmd/kawad/main.go deleted file mode 100644 index 227ce92..0000000 --- a/cmd/kawad/main.go +++ /dev/null @@ -1,170 +0,0 @@ -package main - -import ( - "context" - "fmt" - "os" - "path" - "path/filepath" - - "log/slog" - - "github.com/runreveal/kawa" - "github.com/runreveal/kawa/cmd/kawad/internal/queue" - "github.com/runreveal/kawa/cmd/kawad/internal/types" - "github.com/runreveal/lib/await" - "github.com/runreveal/lib/loader" - "github.com/spf13/cobra" -) - -var ( - version = "dev" -) - -func init() { - replace := func(groups []string, a slog.Attr) slog.Attr { - // Remove the directory from the source's filename. - if a.Key == slog.SourceKey { - source := a.Value.Any().(*slog.Source) - source.File = filepath.Base(source.File) - } - return a - } - level := slog.LevelInfo - if _, ok := os.LookupEnv("KAWA_DEBUG"); ok { - level = slog.LevelDebug - } - - h := slog.NewTextHandler( - os.Stderr, - &slog.HandlerOptions{ - Level: level, - AddSource: true, - ReplaceAttr: replace, - }, - ) - - slogger := slog.New(h) - slog.SetDefault(slogger) -} - -func main() { - slog.Info(fmt.Sprintf("starting %s", path.Base(os.Args[0])), "version", version) - rootCmd := NewRootCommand() - kawaCmd := NewRunCommand() - rootCmd.AddCommand(kawaCmd) - - if err := rootCmd.Execute(); err != nil { - slog.Error(fmt.Sprintf("%+v", err)) - os.Exit(1) - } -} - -// Build the cobra command that handles our command line tool. -func NewRootCommand() *cobra.Command { - rootCmd := &cobra.Command{ - Use: path.Base(os.Args[0]), - Short: `kawa is an all-in-one event ingestion daemon`, - Long: `kawa is an all-in-one event ingestion daemon. -It is designed to be a single binary that can be deployed to a server and -configured to receive events from a variety of sources and send them to a -variety of destinations.`, - RunE: func(cmd *cobra.Command, args []string) error { - return cmd.Help() - }, - } - return rootCmd -} - -type MonConfig struct { - Addr string `json:"addr"` - PProf struct { - Path string `json:"path"` - } `json:"pprof"` - Metrics struct { - Path string `json:"path"` - } `json:"metrics"` -} - -type Config struct { - Sources map[string]loader.Loader[kawa.Source[types.Event]] `json:"sources"` - Destinations map[string]loader.Loader[kawa.Destination[types.Event]] `json:"destinations"` - - Monitoring MonConfig `json:"monitoring"` -} - -// Build the cobra command that handles our command line tool. -func NewRunCommand() *cobra.Command { - // Use configuration defined outside the main package 🎉 - var config Config - var configFile string - - cmd := &cobra.Command{ - Use: "run", - Short: "run the all-in-one event ingestion daemon", - RunE: func(cmd *cobra.Command, args []string) error { - bts, err := os.ReadFile(configFile) - if err != nil { - return err - } - err = loader.LoadConfig(bts, &config) - if err != nil { - return err - } - - w := await.New(await.WithSignals) - - // if config.Monitoring.Addr != "" { - // mux := http.NewServeMux() - // if config.Monitoring.PProf.Path != "" { - // prefix := config.Monitoring.PProf.Path - // http.HandleFunc(prefix, pprof.Index) - // http.HandleFunc(prefix+"cmdline", pprof.Cmdline) - // http.HandleFunc(prefix+"profile", pprof.Profile) - // http.HandleFunc(prefix+"symbol", pprof.Symbol) - // http.HandleFunc(prefix+"trace", pprof.Trace) - // } - // if config.Monitoring.Metrics.Path != "" { - // mux.Handle(config.Monitoring.Metrics.Path, promhttp.Handler()) - // } - // server := &http.Server{Addr: config.Monitoring.Addr, Handler: mux} - // w.AddNamed(await.ListenAndServe(server), "monitoring") - // } - - slog.Info(fmt.Sprintf("config: %+v", config)) - - ctx := context.Background() - srcs := map[string]queue.Source{} - for k, v := range config.Sources { - src, err := v.Configure() - if err != nil { - return err - } - srcs[k] = queue.Source{Name: k, Source: src} - } - - dsts := map[string]queue.Destination{} - for k, v := range config.Destinations { - dst, err := v.Configure() - if err != nil { - return err - } - dsts[k] = queue.Destination{Name: k, Destination: dst} - } - - q := queue.New(queue.WithSources(srcs), queue.WithDestinations(dsts)) - w.AddNamed(q, "queue") - err = w.Run(ctx) - slog.Error(fmt.Sprintf("closing: %+v", err)) - return err - }, - } - - cmd.Flags().StringVar(&configFile, "config", "config.json", "where to load the configuration from") - err := cmd.MarkFlagRequired("config") - if err != nil { - panic(err) - } - - return cmd -} diff --git a/examples/config.json b/examples/config.json deleted file mode 100644 index 1f6604f..0000000 --- a/examples/config.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - // this is an example config file for kawa - // it is parsed using hujson so you can use comments and trailing commas, but - // is otherwise identical to JSON - "sources": [ - { - "type": "syslog", - "addr": "0.0.0.0:5514", - // content-type tells the source how to parse logs received on this - // instance of syslog. We may explore using the syslog tag to indicate - // the schema as well down the line. - "contentType": "application/json; rrtype=nginx-json", - }, - { - "type": "journald", - }, - ], - "destinations": [ - { - "type": "s3", - "bucketName": "the-lumber-mill", - "bucketRegion": "us-west-2", - }, - { - "type": "runreveal", - // Replace this webhook URL with your own, created on https://www.runreveal.com - // as a "Kawa" type source - "webhookURL": "https://example.runreveal.com/sources/kawa/webhook/0123456789", - // You can also use environment variables by referencing them with a - // dollar sign. The value must be quoted, start with a dollar sign and be - // a valid environment variable name - // "webhookURL": "$WEBHOOK_URL", - }, - ], -} - diff --git a/examples/kawad.service b/examples/kawad.service deleted file mode 100644 index 295f16f..0000000 --- a/examples/kawad.service +++ /dev/null @@ -1,14 +0,0 @@ -[Unit] -Description=Kawad Collector -After=network.target - -[Service] -Type=simple -# adjust the locations to your environment as necessary -ExecStart=/usr/local/kawad run --config /etc/kawad/config.json -MemoryMax=1G -Restart=always - -[Install] -WantedBy=multi-user.target - diff --git a/examples/mqtt_config.json b/examples/mqtt_config.json deleted file mode 100644 index dff6a60..0000000 --- a/examples/mqtt_config.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - // this is an example config file for kawa - // it is parsed using hujson so you can use comments and trailing commas, but - // is otherwise identical to JSON - "sources": [ - { - "type": "mqtt", - "broker": "mqtt://broker.localhost:1883", - "clientID": "kawa_src", - "userName": "", - "password": "", - "topic": "kawad/src", - "qos": 1, // Optional defaults to 1 if not included - "retained": false, // Optional defaults to false if not included - }, - ], - "destinations": [ - { - "type": "mqtt", - "broker": "mqtt://broker.localhost:1883", - "clientID": "kawa_dst", - "userName": "", - "password": "", - "topic": "kawad/dest", - "qos": 1, // Optional defaults to 1 if not included - "retained": false, // Optional defaults to false if not included - }, - { - "type": "printer" - } - ], -} \ No newline at end of file diff --git a/examples/nginx_json.conf b/examples/nginx_json.conf deleted file mode 100644 index 89c1857..0000000 --- a/examples/nginx_json.conf +++ /dev/null @@ -1,80 +0,0 @@ -# Put this file into /etc/nginx/conf.d/ and reload or restart nginx. -# You can then use the log_format in your vhost config. -# -# Example: -# server { -# listen 80; -# server_name example.com; -# access_log syslog:server=127.0.0.1:5514 json_combined; -# location / { -# ... location settings -# } -# } - - -map $upstream_bytes_received $upstreamBytesReceived { - default $upstream_bytes_received; - "" 0; -} - -map $upstream_bytes_sent $upstreamBytesSent { - default $upstream_bytes_sent; - "" 0; -} - -map $upstream_response_time $upstreamResponseTime { - default $upstream_response_time; - "" 0; -} - -map $upstream_response_length $upstreamResponseLength { - default $upstream_response_length; - "" 0; -} - -map $status $statusCode { - default $status; - "" 0; -} - -map $body_bytes_sent $bodyBytesSent { - default $body_bytes_sent; - "" 0; -} - -map $request_time $requestTime { - default $request_time; - "" 0; -} - -log_format json_combined escape=json -'{' - '"ts":"$time_iso8601",' - '"remote_addr":"$remote_addr",' - '"remote_user":"$remote_user",' - '"request_time":$requestTime,' - '"request":{' - '"http_referrer":"$http_referer",' - '"http_user_agent":"$http_user_agent",' - '"method":"$request_method",' - '"scheme":"$scheme",' - '"host":"$host",' - '"server_addr":"$server_addr",' - '"uri":"$uri",' - '"query":"$query_string",' - '"request_uri":"$request_uri",' - '"xfwd":"$proxy_add_x_forwarded_for"' - '},' - '"upstream":{' - '"addr":"$upstream_addr",' - '"bytes_received":$upstreamBytesReceived,' - '"bytes_sent":$upstreamBytesSent,' - '"response_time":$upstreamResponseTime,' - '"response_length":$upstreamResponseLength' - '},' - '"response":{' - '"status":$statusCode,' - '"body_bytes_sent":$bodyBytesSent' - '}' -'}'; - diff --git a/examples/windows/config_windows.json b/examples/windows/config_windows.json deleted file mode 100644 index d24906a..0000000 --- a/examples/windows/config_windows.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "pprof": "localhost:6060", - "sources": [ - { - "type": "eventlog", - "channel": "Security", - "query": "*", //"*[EventData[Data[@Name='LogonType']='2'] and System[(EventID=4624)]]" - }, - ], - "destinations": [ - { - "type": "printer", - }, - ], -} \ No newline at end of file