diff --git a/README.md b/README.md index 5dfe44d..110c1e5 100644 --- a/README.md +++ b/README.md @@ -205,12 +205,15 @@ more important than message volume. - syslog - scanner - journald + - mqtt + - windows event logs # Supported destinations - s3 / r2 - printer - runreveal + - mqtt # Configuring the Daemon @@ -269,6 +272,25 @@ Do not read events from the same topic that an MQTT destination is sending to ot } ``` +### Windows Event Logs +Listen for new windows event logs on the specified channel. + +Windows event log collection only works on Windows machines. Use the Windows build to run Kawad on a Windows machine. Kawad will need to be run as an administrator to have access to the event log stream. + +The source config needs a required channel and an optional query. +The channel is the windows event log full name, e.g. to log the operational logs for the TaskScheduler the channel would be 'Microsoft-Windows-TaskScheduler/Operational'. +The query is a filter that can be used to limit the logs that are collected to specific events. View [Microsoft documentation](https://techcommunity.microsoft.com/t5/ask-the-directory-services-team/advanced-xml-filtering-in-the-windows-event-viewer/ba-p/399761) on how filtering works and how to create one to use. + +The following example shows how to log every Security event on the machine. + +``` +{ + "type": "eventlog", + "channel": "Security", + "query": "*" + } +``` + ## Destination Configuration diff --git a/cmd/kawad/config_windows.go b/cmd/kawad/config_windows.go new file mode 100644 index 0000000..b90b728 --- /dev/null +++ b/cmd/kawad/config_windows.go @@ -0,0 +1,29 @@ +//go:build windows +// +build windows + +package main + +import ( + "github.com/runreveal/kawa" + windowskawad "github.com/runreveal/kawa/cmd/kawad/internal/sources/windows" + "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/x/windows" + "github.com/runreveal/lib/loader" + "golang.org/x/exp/slog" +) + +func init() { + loader.Register("eventlog", func() loader.Builder[kawa.Source[types.Event]] { + return &EventLogConfig{} + }) +} + +type EventLogConfig struct { + Channel string `json:"channel"` + Query string `json:"query"` +} + +func (c *EventLogConfig) Configure() (kawa.Source[types.Event], error) { + slog.Info("configuring windows event log") + return windowskawad.NewEventLog(windows.WithChannel(c.Channel), windows.WithQuery(c.Query)), nil +} diff --git a/cmd/kawad/internal/sources/windows/eventlog.go b/cmd/kawad/internal/sources/windows/eventlog.go new file mode 100644 index 0000000..ae2f8c1 --- /dev/null +++ b/cmd/kawad/internal/sources/windows/eventlog.go @@ -0,0 +1,52 @@ +//go:build windows +// +build windows + +package windowskawad + +import ( + "context" + "encoding/json" + + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/x/windows" +) + +type EventLog struct { + wrapped *windows.EventLogSource +} + +func NewEventLog(opts ...windows.Option) *EventLog { + return &EventLog{ + wrapped: windows.NewEventLogSource(opts...), + } +} + +func (s *EventLog) Run(ctx context.Context) error { + return s.wrapped.Run(ctx) +} + +func (s *EventLog) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { + msg, ack, err := s.wrapped.Recv(ctx) + if err != nil { + return kawa.Message[types.Event]{}, nil, ctx.Err() + } + + rawLog, err := json.Marshal(msg.Value) + if err != nil { + return kawa.Message[types.Event]{}, nil, ctx.Err() + } + + eventMsg := kawa.Message[types.Event]{ + Key: msg.Key, + Value: types.Event{ + Timestamp: msg.Value.System.TimeCreated.SystemTime, + SourceType: "eventlog", + RawLog: rawLog, + }, + Topic: msg.Topic, + Attributes: msg.Attributes, + } + + return eventMsg, ack, err +} diff --git a/examples/windows/config_windows.json b/examples/windows/config_windows.json new file mode 100644 index 0000000..d24906a --- /dev/null +++ b/examples/windows/config_windows.json @@ -0,0 +1,15 @@ +{ + "pprof": "localhost:6060", + "sources": [ + { + "type": "eventlog", + "channel": "Security", + "query": "*", //"*[EventData[Data[@Name='LogonType']='2'] and System[(EventID=4624)]]" + }, + ], + "destinations": [ + { + "type": "printer", + }, + ], +} \ No newline at end of file diff --git a/x/windows/event_logs.go b/x/windows/event_logs.go new file mode 100644 index 0000000..1a7daf5 --- /dev/null +++ b/x/windows/event_logs.go @@ -0,0 +1,101 @@ +//go:build windows +// +build windows + +package windows + +import ( + "context" + "time" + + "github.com/runreveal/kawa" + "golang.org/x/exp/slog" +) + +type Option func(*EventLogSource) + +func WithChannel(channel string) Option { + return func(s *EventLogSource) { + s.Channel = channel + } +} + +func WithQuery(query string) Option { + return func(s *EventLogSource) { + s.Query = query + } +} + +type EventLogSource struct { + msgC chan msgAck + + Channel string + Query string + + subscription *eventSubscription +} + +type msgAck struct { + msg kawa.Message[EventLog] + ack func() +} + +func NewEventLogSource(opts ...Option) *EventLogSource { + ret := &EventLogSource{} + for _, o := range opts { + o(ret) + } + + if ret.Query == "" { + ret.Query = "*" + } + + msgC := make(chan msgAck) + errorsChan := make(chan error) + + eventSubscription := &eventSubscription{ + Channel: ret.Channel, + Query: ret.Query, //[EventData[Data[@Name='LogonType']='2'] and System[(EventID=4624)]]", // Successful interactive logon events + SubscribeMethod: evtSubscribeToFutureEvents, + Errors: errorsChan, + Callback: msgC, + } + + ret.msgC = msgC + ret.subscription = eventSubscription + + return ret +} + +func (s *EventLogSource) Run(ctx context.Context) error { + return s.recvLoop(ctx) +} + +func (s *EventLogSource) recvLoop(ctx context.Context) error { + + if err := s.subscription.create(); err != nil { + slog.Error("Failed to create event subscription: %s", err) + return nil + } + defer s.subscription.close() + defer close(s.subscription.Errors) + + for { + select { + case err := <-s.subscription.Errors: + return err + case <-ctx.Done(): + err := s.subscription.close() + return err + case <-time.After(60 * time.Second): + } + } +} + +func (s *EventLogSource) Recv(ctx context.Context) (kawa.Message[EventLog], func(), error) { + select { + case <-ctx.Done(): + return kawa.Message[EventLog]{}, nil, ctx.Err() + case pass := <-s.msgC: + return pass.msg, pass.ack, nil + } +} diff --git a/x/windows/system_windows.go b/x/windows/system_windows.go new file mode 100644 index 0000000..2dee31a --- /dev/null +++ b/x/windows/system_windows.go @@ -0,0 +1,172 @@ +//go:build windows +// +build windows + +package windows + +import ( + "encoding/xml" + "fmt" + "syscall" + "unsafe" + + "github.com/runreveal/kawa" + sysWindows "golang.org/x/sys/windows" +) + +const ( + // EvtSubscribeToFutureEvents instructs the + // subscription to only receive events that occur + // after the subscription has been made + evtSubscribeToFutureEvents = 1 + + // EvtSubscribeStartAtOldestRecord instructs the + // subscription to receive all events (past and future) + // that match the query + evtSubscribeStartAtOldestRecord = 2 + + // evtSubscribeActionError defines a action + // code that may be received by the winAPICallback. + // ActionError defines that an internal error occurred + // while obtaining an event for the callback + evtSubscribeActionError = 0 + + // evtSubscribeActionDeliver defines a action + // code that may be received by the winAPICallback. + // ActionDeliver defines that the internal API was + // successful in obtaining an event that matched + // the subscription query + evtSubscribeActionDeliver = 1 + + // evtRenderEventXML instructs procEvtRender + // to render the event details as a XML string + evtRenderEventXML = 1 +) + +var ( + modwevtapi = sysWindows.NewLazySystemDLL("wevtapi.dll") + + procEvtSubscribe = modwevtapi.NewProc("EvtSubscribe") + procEvtRender = modwevtapi.NewProc("EvtRender") + procEvtClose = modwevtapi.NewProc("EvtClose") +) + +// EventSubscription is a subscription to +// Windows Events, it defines details about the +// subscription including the channel and query +type eventSubscription struct { + Channel string + Query string + SubscribeMethod int + Errors chan error + Callback chan msgAck + + winAPIHandle sysWindows.Handle +} + +// Create will setup an event subscription in the +// windows kernel with the provided channel and +// event query +func (evtSub *eventSubscription) create() error { + if evtSub.winAPIHandle != 0 { + return fmt.Errorf("windows_events: subscription already created in kernel") + } + + winChannel, err := sysWindows.UTF16PtrFromString(evtSub.Channel) + if err != nil { + return fmt.Errorf("windows_events: bad channel name: %s", err) + } + + winQuery, err := sysWindows.UTF16PtrFromString(evtSub.Query) + if err != nil { + return fmt.Errorf("windows_events: bad query string: %s", err) + } + + handle, _, err := procEvtSubscribe.Call( + 0, + 0, + uintptr(unsafe.Pointer(winChannel)), + uintptr(unsafe.Pointer(winQuery)), + 0, + 0, + syscall.NewCallback(evtSub.winAPICallback), + uintptr(evtSub.SubscribeMethod), + ) + + if handle == 0 { + return fmt.Errorf("windows_events: failed to subscribe to events: %s", err) + } + + evtSub.winAPIHandle = sysWindows.Handle(handle) + return nil +} + +// Close tells the windows kernel to let go +// of the event subscription handle as we +// are now done with it +func (evtSub *eventSubscription) close() error { + if evtSub.winAPIHandle == 0 { + return fmt.Errorf("windows_events: no subscription to close") + } + + if returnCode, _, err := procEvtClose.Call(uintptr(evtSub.winAPIHandle)); returnCode == 0 { + return fmt.Errorf("windows_events: encountered error while closing event handle: %s", err) + } + + evtSub.winAPIHandle = 0 + return nil +} + +// winAPICallback receives the callback from the windows +// kernel when an event matching the query and channel is +// received. It will query the kernel to get the event rendered +// as a XML string, the XML string is then unmarshaled to an +// `Event` and the custom callback invoked +func (evtSub *eventSubscription) winAPICallback(action, userContext, event uintptr) uintptr { + switch action { + case evtSubscribeActionError: + evtSub.Errors <- fmt.Errorf("windows_events: encountered error during callback: Win32 Error %x", uint16(event)) + + case evtSubscribeActionDeliver: + renderSpace := make([]uint16, 4096) + bufferUsed := uint16(0) + propertyCount := uint16(0) + + returnCode, _, err := procEvtRender.Call( + 0, + event, + evtRenderEventXML, + 4096, + uintptr(unsafe.Pointer(&renderSpace[0])), + uintptr(unsafe.Pointer(&bufferUsed)), + uintptr(unsafe.Pointer(&propertyCount)), + ) + + if returnCode == 0 { + evtSub.Errors <- fmt.Errorf("windows_event: failed to render event data: %s", err) + } else { + dataUTF8 := sysWindows.UTF16ToString(renderSpace) + xEvt := xmlEvent{} + err := xml.Unmarshal([]byte(dataUTF8), &xEvt) + + if err != nil { + evtSub.Errors <- fmt.Errorf("windows_event: failed to unmarshal event xml: %s", err) + } else { + // take dataParsed and convert back to json object for sending to server + jsonEvt := xEvt.ToJSONEvent() + msg := msgAck{ + msg: kawa.Message[EventLog]{ + Value: *jsonEvt, + Topic: evtSub.Channel, + }, + ack: nil, + } + evtSub.Callback <- msg + } + } + + default: + evtSub.Errors <- fmt.Errorf("windows_events: encountered error during callback: unsupported action code %x", uint16(action)) + } + + return 0 +} diff --git a/x/windows/windows_event.go b/x/windows/windows_event.go new file mode 100644 index 0000000..c259c43 --- /dev/null +++ b/x/windows/windows_event.go @@ -0,0 +1,178 @@ +//go:build windows +// +build windows + +package windows + +import ( + "bytes" + "encoding/xml" + "io" + "time" +) + +type xmlMap map[string]interface{} + +type xmlMapEntry struct { + XMLName xml.Name + Value string `xml:",chardata"` + InnerXML string `xml:",innerxml"` +} + +func (m *xmlMap) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { + *m = xmlMap{} + for { + var e xmlMapEntry + + err := d.Decode(&e) + if err == io.EOF { + break + } else if err != nil { + return err + } + if e.InnerXML != "" { + var sm xmlMap + r := bytes.NewBuffer([]byte(e.InnerXML)) + dec := xml.NewDecoder(r) + err := sm.UnmarshalXML(dec, xml.StartElement{}) + + if err == io.EOF { + break + } else if err != nil { + return err + } + (*m)[e.XMLName.Local] = sm + } + if e.Value != "" { + (*m)[e.XMLName.Local] = e.Value + } + } + return nil +} + +type Data struct { + Name string `xml:"Name,attr"` + Value string `xml:",innerxml"` +} //`xml:"Data"` + +type xmlEvent struct { + // seems to always have the same format + // if not consider using XMLMap + EventData struct { + Data []Data + } `xml:"EventData,omitempty"` + // Using XMLMap type because we don't know what is inside (a priori) + UserData xmlMap + System struct { + Provider struct { + Name string `xml:"Name,attr"` + Guid string `xml:"Guid,attr"` + } `xml:"Provider"` + EventID string `xml:"EventID"` + Version string `xml:"Version"` + Level string `xml:"Level"` + Task string `xml:"Task"` + Opcode string `xml:"Opcode"` + Keywords string `xml:"Keywords"` + TimeCreated struct { + SystemTime time.Time `xml:"SystemTime,attr"` + } `xml:"TimeCreated"` + EventRecordID string `xml:"EventRecordID"` + Correlation struct { + } `xml:"Correlation"` + Execution struct { + ProcessID string `xml:"ProcessID,attr"` + ThreadID string `xml:"ThreadID,attr"` + } `xml:"Execution"` + Channel string `xml:"Channel"` + Computer string `xml:"Computer"` + Security struct { + UserID string `xml:"UserID,attr"` + } `xml:"Security"` + } `xml:"System"` +} + +// ToMap converts an XMLEvent to an accurate structure to be serialized +// where EventData / UserData does not appear if empty +func (xe *xmlEvent) ToMap() *map[string]interface{} { + m := make(map[string]interface{}) + m["Event"] = make(map[string]interface{}) + if len(xe.EventData.Data) > 0 { + m["Event"].(map[string]interface{})["EventData"] = make(map[string]interface{}) + for _, d := range xe.EventData.Data { + m["Event"].(map[string]interface{})["EventData"].(map[string]interface{})[d.Name] = d.Value + } + } + if len(xe.UserData) > 0 { + m["Event"].(map[string]interface{})["UserData"] = xe.UserData + } + m["Event"].(map[string]interface{})["System"] = xe.System + return &m +} + +func (xe *xmlEvent) ToJSONEvent() *EventLog { + event := newEventLog() + for _, d := range xe.EventData.Data { + if d.Name != "" { + event.EventDataMap[d.Name] = d.Value + } else { + event.EventData = append(event.EventData, d.Value) + } + } + event.UserData = xe.UserData + event.System.Provider.Name = xe.System.Provider.Name + event.System.Provider.Guid = xe.System.Provider.Guid + event.System.EventID = xe.System.EventID + event.System.Version = xe.System.Version + event.System.Level = xe.System.Level + event.System.Task = xe.System.Task + event.System.Opcode = xe.System.Opcode + event.System.Keywords = xe.System.Keywords + event.System.TimeCreated.SystemTime = xe.System.TimeCreated.SystemTime + event.System.EventRecordID = xe.System.EventRecordID + event.System.Correlation = xe.System.Correlation + event.System.Execution.ProcessID = xe.System.Execution.ProcessID + event.System.Execution.ThreadID = xe.System.Execution.ThreadID + event.System.Channel = xe.System.Channel + event.System.Computer = xe.System.Computer + event.System.Security.UserID = xe.System.Security.UserID + return &event +} + +type EventLog struct { + EventDataMap map[string]string `xml:"EventData" json:"eventDataMap,omitempty"` + EventData []string ` json:"eventData,omitempty"` + UserData map[string]interface{} ` json:"userData,omitempty"` + System struct { + Provider struct { + Name string `xml:"Name,attr" json:"name"` + Guid string `xml:"Guid,attr" json:"guid"` + } `xml:"Provider" json:"provider"` + EventID string `xml:"EventID" json:"eventId"` + Version string `xml:"Version" json:"version"` + Level string `xml:"Level" json:"level"` + Task string `xml:"Task" json:"task"` + Opcode string `xml:"Opcode" json:"opcode"` + Keywords string `xml:"Keywords" json:"keywords"` + TimeCreated struct { + SystemTime time.Time `xml:"SystemTime,attr" json:"systemTime"` + } `xml:"TimeCreated" json:"timeCreated"` + EventRecordID string `xml:"EventRecordID" json:"eventRecordId"` + Correlation struct { + } `xml:"Correlation" json:"correlation"` + Execution struct { + ProcessID string `xml:"ProcessID,attr" json:"processId"` + ThreadID string `xml:"ThreadID,attr" json:"threadId"` + } `xml:"Execution" json:"execution"` + Channel string `xml:"Channel" json:"channel"` + Computer string `xml:"Computer" json:"computer"` + Security struct { + UserID string `xml:"UserID,attr" json:"userId"` + } `xml:"Security" json:"security"` + } `xml:"System" json:"system"` +} + +// NewJSONEvent creates a new JSONEvent structure +func newEventLog() (el EventLog) { + el.EventDataMap = make(map[string]string) + return el +}