Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows Event Logs source #30

Merged
merged 11 commits into from
Sep 2, 2023
29 changes: 29 additions & 0 deletions cmd/kawad/config_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//go:build windows
// +build windows

package main

import (
"github.com/runreveal/kawa"
windowskawad "github.com/runreveal/kawa/cmd/kawad/internal/sources/windows"
"github.com/runreveal/kawa/cmd/kawad/internal/types"
"github.com/runreveal/kawa/x/windows"
"github.com/runreveal/lib/loader"
"golang.org/x/exp/slog"
)

func init() {
loader.Register("eventlog", func() loader.Builder[kawa.Source[types.Event]] {
return &EventLogConfig{}
})
}

type EventLogConfig struct {
Channel string `json:"channel"`
Query string `json:"query"`
}

func (c *EventLogConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring windows event log")
return windowskawad.NewEventLog(windows.WithChannel(c.Channel), windows.WithQuery(c.Query)), nil
}
49 changes: 49 additions & 0 deletions cmd/kawad/internal/sources/windows/eventlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//go:build windows
// +build windows

package windowskawad

import (
"context"
"encoding/json"

"github.com/runreveal/kawa"
"github.com/runreveal/kawa/cmd/kawad/internal/types"
"github.com/runreveal/kawa/x/windows"
)

type EventLog struct {
wrapped *windows.EventLogSource
}

func NewEventLog(opts ...windows.Option) *EventLog {
return &EventLog{wrapped: windows.NewEventLogSource(opts...)}
}

func (s *EventLog) Run(ctx context.Context) error {
return s.wrapped.Run(ctx)
}

func (s *EventLog) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) {
msg, ack, err := s.wrapped.Recv(ctx)
if err != nil {
return kawa.Message[types.Event]{}, nil, ctx.Err()
}

rawLog, err := json.Marshal(msg.Value)
if err != nil {
return kawa.Message[types.Event]{}, nil, ctx.Err()
}

eventMsg := kawa.Message[types.Event]{
Key: msg.Key,
Value: types.Event{
Timestamp: msg.Value.System.TimeCreated.SystemTime,
SourceType: "eventlog",
RawLog: rawLog,
}, Topic: msg.Topic,
Attributes: msg.Attributes,
}

return eventMsg, ack, err
}
15 changes: 15 additions & 0 deletions examples/windows/config_windows.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"pprof": "localhost:6060",
"sources": [
{
"type": "eventlog",
"channel": "Security",
"query": "*",
},
],
"destinations": [
{
"type": "printer",
},
],
}
98 changes: 98 additions & 0 deletions x/windows/event_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package windows

import (
"context"
"time"

"github.com/runreveal/kawa"
"golang.org/x/exp/slog"
)

type Option func(*EventLogSource)

func WithChannel(channel string) Option {
return func(s *EventLogSource) {
s.Channel = channel
}
}

func WithQuery(query string) Option {
return func(s *EventLogSource) {
s.Query = query
}
}

type EventLogSource struct {
msgC chan msgAck

Channel string
Query string

subscription *eventSubscription

Check failure on line 31 in x/windows/event_logs.go

View workflow job for this annotation

GitHub Actions / build

undefined: eventSubscription
}

type msgAck struct {
msg kawa.Message[EventLog]
ack func()
}

func NewEventLogSource(opts ...Option) *EventLogSource {
ret := &EventLogSource{}
for _, o := range opts {
o(ret)
}

if ret.Query == "" {
ret.Query = "*"
}

msgC := make(chan msgAck)
errorsChan := make(chan error)

eventSubscription := &eventSubscription{

Check failure on line 52 in x/windows/event_logs.go

View workflow job for this annotation

GitHub Actions / build

undefined: eventSubscription
Channel: ret.Channel,
Query: ret.Query, //[EventData[Data[@Name='LogonType']='2'] and System[(EventID=4624)]]", // Successful interactive logon events
SubscribeMethod: EvtSubscribeToFutureEvents,

Check failure on line 55 in x/windows/event_logs.go

View workflow job for this annotation

GitHub Actions / build

undefined: EvtSubscribeToFutureEvents
Errors: errorsChan,
Callback: msgC,
}

ret.msgC = msgC
ret.subscription = eventSubscription

return ret
}

func (s *EventLogSource) Run(ctx context.Context) error {
return s.recvLoop(ctx)
}

func (s *EventLogSource) recvLoop(ctx context.Context) error {

if err := s.subscription.create(); err != nil {
slog.Error("Failed to create event subscription: %s", err)
return nil
}
defer s.subscription.close()
defer close(s.subscription.Errors)

for {
select {
case err := <-s.subscription.Errors:
return err
case <-ctx.Done():
err := s.subscription.close()
return err
case <-time.After(60 * time.Second):
}
}
}

func (s *EventLogSource) Recv(ctx context.Context) (kawa.Message[EventLog], func(), error) {
select {
case <-ctx.Done():
return kawa.Message[EventLog]{}, nil, ctx.Err()
case pass := <-s.msgC:
return pass.msg, pass.ack, nil
}
}
160 changes: 160 additions & 0 deletions x/windows/system_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package windows

import (
"encoding/xml"
"fmt"
"syscall"
"unsafe"

"github.com/runreveal/kawa"
"golang.org/x/sys/windows"
)

const (
// EvtSubscribeToFutureEvents instructs the
// subscription to only receive events that occur
// after the subscription has been made
EvtSubscribeToFutureEvents = 1

// EvtSubscribeStartAtOldestRecord instructs the
// subscription to receive all events (past and future)
// that match the query
EvtSubscribeStartAtOldestRecord = 2

// evtSubscribeActionError defines a action
// code that may be received by the winAPICallback.
// ActionError defines that an internal error occurred
// while obtaining an event for the callback
evtSubscribeActionError = 0

// evtSubscribeActionDeliver defines a action
// code that may be received by the winAPICallback.
// ActionDeliver defines that the internal API was
// successful in obtaining an event that matched
// the subscription query
evtSubscribeActionDeliver = 1

// evtRenderEventXML instructs procEvtRender
// to render the event details as a XML string
evtRenderEventXML = 1
)

var (
modwevtapi = windows.NewLazySystemDLL("wevtapi.dll")

procEvtSubscribe = modwevtapi.NewProc("EvtSubscribe")
procEvtRender = modwevtapi.NewProc("EvtRender")
procEvtClose = modwevtapi.NewProc("EvtClose")
)

// EventSubscription is a subscription to
// Windows Events, it defines details about the
// subscription including the channel and query
type eventSubscription struct {
Channel string
Query string
SubscribeMethod int
Errors chan error
Callback chan msgAck

winAPIHandle windows.Handle
}

// Create will setup an event subscription in the
// windows kernel with the provided channel and
// event query
func (evtSub *eventSubscription) create() error {
if evtSub.winAPIHandle != 0 {
return fmt.Errorf("windows_events: subscription already created in kernel")
}

winChannel, err := windows.UTF16PtrFromString(evtSub.Channel)
if err != nil {
return fmt.Errorf("windows_events: bad channel name: %s", err)
}

winQuery, err := windows.UTF16PtrFromString(evtSub.Query)
if err != nil {
return fmt.Errorf("windows_events: bad query string: %s", err)
}

handle, _, err := procEvtSubscribe.Call(
0,
0,
uintptr(unsafe.Pointer(winChannel)),
uintptr(unsafe.Pointer(winQuery)),
0,
0,
syscall.NewCallback(evtSub.winAPICallback),
uintptr(evtSub.SubscribeMethod),
)

if handle == 0 {
return fmt.Errorf("windows_events: failed to subscribe to events: %s", err)
}

evtSub.winAPIHandle = windows.Handle(handle)
return nil
}

// Close tells the windows kernel to let go
// of the event subscription handle as we
// are now done with it
func (evtSub *eventSubscription) close() error {
if evtSub.winAPIHandle == 0 {
return fmt.Errorf("windows_events: no subscription to close")
}

if returnCode, _, err := procEvtClose.Call(uintptr(evtSub.winAPIHandle)); returnCode == 0 {
return fmt.Errorf("windows_events: encountered error while closing event handle: %s", err)
}

evtSub.winAPIHandle = 0
return nil
}

// winAPICallback receives the callback from the windows
// kernel when an event matching the query and channel is
// received. It will query the kernel to get the event rendered
// as a XML string, the XML string is then unmarshaled to an
// `Event` and the custom callback invoked
func (evtSub *eventSubscription) winAPICallback(action, userContext, event uintptr) uintptr {
switch action {
case evtSubscribeActionError:
evtSub.Errors <- fmt.Errorf("windows_events: encountered error during callback: Win32 Error %x", uint16(event))

case evtSubscribeActionDeliver:
renderSpace := make([]uint16, 4096)
bufferUsed := uint16(0)
propertyCount := uint16(0)

returnCode, _, err := procEvtRender.Call(0, event, evtRenderEventXML, 4096, uintptr(unsafe.Pointer(&renderSpace[0])), uintptr(unsafe.Pointer(&bufferUsed)), uintptr(unsafe.Pointer(&propertyCount)))

if returnCode == 0 {
evtSub.Errors <- fmt.Errorf("windows_event: failed to render event data: %s", err)
} else {
dataUTF8 := windows.UTF16ToString(renderSpace)
xEvt := xmlEvent{}
err := xml.Unmarshal([]byte(dataUTF8), &xEvt)

if err != nil {
evtSub.Errors <- fmt.Errorf("windows_event: failed to unmarshal event xml: %s", err)
} else {
// take dataParsed and convert back to json object for sending to server
jsonEvt := xEvt.ToJSONEvent()
msg := msgAck{
msg: kawa.Message[EventLog]{
Value: *jsonEvt,
},
ack: nil,
}
evtSub.Callback <- msg
}
}

default:
evtSub.Errors <- fmt.Errorf("windows_events: encountered error during callback: unsupported action code %x", uint16(action))
}

return 0
}
Loading
Loading