From 8b6b522aaeab4dcd7d277c49ed00f4c308cb8c85 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 27 Aug 2024 15:15:24 +0200 Subject: [PATCH 1/9] [service] Improve rng tickfeeder --- base/rng/tickfeeder.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/base/rng/tickfeeder.go b/base/rng/tickfeeder.go index 42fd5e113..84f3dee2e 100644 --- a/base/rng/tickfeeder.go +++ b/base/rng/tickfeeder.go @@ -38,11 +38,15 @@ func tickFeeder(ctx *mgr.WorkerCtx) error { feeder := NewFeeder() defer feeder.CloseFeeder() - tickDuration := getTickFeederTickDuration() + ticker := time.NewTicker(getTickFeederTickDuration()) + defer ticker.Stop() for { - // wait for tick - time.Sleep(tickDuration) + select { + case <-ticker.C: + case <-ctx.Done(): + return nil + } // add tick value value = (value << 1) | (time.Now().UnixNano() % 2) @@ -64,13 +68,6 @@ func tickFeeder(ctx *mgr.WorkerCtx) error { case <-ctx.Done(): return nil } - } else { - // check if are done - select { - case <-ctx.Done(): - return nil - default: - } } } } From 57f08eccd78f287cf2e91374231c226b8ab7d7e6 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 27 Aug 2024 16:32:23 +0200 Subject: [PATCH 2/9] [service] Add worker info system --- go.mod | 1 + go.sum | 7 + service/core/api.go | 1 + service/core/core.go | 2 + service/debug.go | 64 ++++++ service/debug_test.go | 31 +++ service/mgr/manager.go | 12 +- service/mgr/worker.go | 25 ++- service/mgr/worker_info.go | 387 +++++++++++++++++++++++++++++++++++++ service/mgr/worker_test.go | 51 +++++ service/mgr/workermgr.go | 22 ++- 11 files changed, 590 insertions(+), 13 deletions(-) create mode 100644 service/debug.go create mode 100644 service/debug_test.go create mode 100644 service/mgr/worker_info.go create mode 100644 service/mgr/worker_test.go diff --git a/go.mod b/go.mod index d103c4d01..436df0948 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/maruel/panicparse/v2 v2.3.1 // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.5.1 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect diff --git a/go.sum b/go.sum index d5d5552d3..50df69d64 100644 --- a/go.sum +++ b/go.sum @@ -173,12 +173,16 @@ github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw= github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/magiconair/properties v1.7.4-0.20170902060319-8d7837e64d3c/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/maruel/panicparse/v2 v2.3.1 h1:NtJavmbMn0DyzmmSStE8yUsmPZrZmudPH7kplxBinOA= +github.com/maruel/panicparse/v2 v2.3.1/go.mod h1:s3UmQB9Fm/n7n/prcD2xBGDkwXD6y2LeZnhbEXvs9Dg= github.com/mat/besticon v3.12.0+incompatible h1:1KTD6wisfjfnX+fk9Kx/6VEZL+MAW1LhCkL9Q47H9Bg= github.com/mat/besticon v3.12.0+incompatible/go.mod h1:mA1auQYHt6CW5e7L9HJLmqVQC8SzNk2gVwouO0AbiEU= github.com/mattn/go-colorable v0.0.10-0.20170816031813-ad5389df28cd/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.2/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= @@ -205,6 +209,7 @@ github.com/mdlayher/socket v0.1.0/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5A github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos= github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= +github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ= github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= @@ -402,6 +407,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -409,6 +415,7 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/service/core/api.go b/service/core/api.go index 1684a4eb7..c4758cda3 100644 --- a/service/core/api.go +++ b/service/core/api.go @@ -151,6 +151,7 @@ func debugInfo(ar *api.Request) (data []byte, err error) { // Detailed information. updates.AddToDebugInfo(di) compat.AddToDebugInfo(di) + module.instance.AddWorkerInfoToDebugInfo(di) di.AddGoroutineStack() // Return data. diff --git a/service/core/core.go b/service/core/core.go index e14789d08..ecbcf9487 100644 --- a/service/core/core.go +++ b/service/core/core.go @@ -8,6 +8,7 @@ import ( "github.com/safing/portmaster/base/log" "github.com/safing/portmaster/base/metrics" + "github.com/safing/portmaster/base/utils/debug" _ "github.com/safing/portmaster/service/broadcasts" "github.com/safing/portmaster/service/mgr" _ "github.com/safing/portmaster/service/netenv" @@ -112,4 +113,5 @@ func New(instance instance) (*Core, error) { type instance interface { Shutdown() + AddWorkerInfoToDebugInfo(di *debug.Info) } diff --git a/service/debug.go b/service/debug.go new file mode 100644 index 000000000..46a66421c --- /dev/null +++ b/service/debug.go @@ -0,0 +1,64 @@ +package service + +import ( + "bytes" + "errors" + "fmt" + "io" + "runtime" + + "github.com/maruel/panicparse/v2/stack" + + "github.com/safing/portmaster/base/utils/debug" + "github.com/safing/portmaster/service/mgr" +) + +// GetWorkerInfo returns the worker info of all running workers. +func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) { + snapshot, _, err := stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts()) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("get stack: %w", err) + } + + infos := make([]*mgr.WorkerInfo, 0, 32) + for _, m := range i.serviceGroup.Modules() { + wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. + infos = append(infos, wi) + } + for _, m := range i.SpnGroup.Modules() { + wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. + infos = append(infos, wi) + } + + return mgr.MergeWorkerInfo(infos...), nil +} + +// AddWorkerInfoToDebugInfo adds the worker info of all running workers to the debug info. +func (i *Instance) AddWorkerInfoToDebugInfo(di *debug.Info) { + info, err := i.GetWorkerInfo() + if err != nil { + di.AddSection( + "Worker Status Failed", + debug.UseCodeSection, + err.Error(), + ) + return + } + + di.AddSection( + fmt.Sprintf("Worker Status: %d/%d (%d?)", info.Running, len(info.Workers), info.Missing+info.Other), + debug.UseCodeSection, + info.Format(), + ) +} + +func fullStack() []byte { + buf := make([]byte, 8096) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} diff --git a/service/debug_test.go b/service/debug_test.go new file mode 100644 index 000000000..52fbe0c7c --- /dev/null +++ b/service/debug_test.go @@ -0,0 +1,31 @@ +package service + +import ( + "testing" + "time" + + "github.com/safing/portmaster/base/notifications" + "github.com/safing/portmaster/service/mgr" +) + +func TestDebug(t *testing.T) { + // Create test instance with at least one worker. + i := &Instance{} + n, err := notifications.New(i) + if err != nil { + t.Fatal(err) + } + i.serviceGroup = mgr.NewGroup(n) + i.SpnGroup = mgr.NewExtendedGroup() + err = i.Start() + if err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + + info, err := i.GetWorkerInfo() + if err != nil { + t.Fatal(err) + } + t.Log(info) +} diff --git a/service/mgr/manager.go b/service/mgr/manager.go index 2070a2909..68c17043b 100644 --- a/service/mgr/manager.go +++ b/service/mgr/manager.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "runtime" + "sync" "sync/atomic" "time" ) @@ -21,6 +22,10 @@ type Manager struct { workerCnt atomic.Int32 workersDone chan struct{} + + workers []*WorkerCtx + workersIndex int + workersLock sync.Mutex } // New returns a new manager. @@ -33,6 +38,7 @@ func newManager(name string) *Manager { name: name, logger: slog.Default().With(ManagerNameSLogKey, name), workersDone: make(chan struct{}), + workers: make([]*WorkerCtx, 4), } m.ctx, m.cancelCtx = context.WithCancel(context.Background()) return m @@ -196,11 +202,13 @@ func (m *Manager) waitForWorkers(max time.Duration, limit int32) (done bool) { } } -func (m *Manager) workerStart() { +func (m *Manager) workerStart(w *WorkerCtx) { + m.registerWorker(w) m.workerCnt.Add(1) } -func (m *Manager) workerDone() { +func (m *Manager) workerDone(w *WorkerCtx) { + m.unregisterWorker(w) if m.workerCnt.Add(-1) <= 1 { // Notify all waiters. for { diff --git a/service/mgr/worker.go b/service/mgr/worker.go index 510dedfef..ff06f05ad 100644 --- a/service/mgr/worker.go +++ b/service/mgr/worker.go @@ -21,6 +21,9 @@ var WorkerCtxContextKey = workerContextKey{} // WorkerCtx provides workers with the necessary environment for flow control // and logging. type WorkerCtx struct { + name string + workFunc func(w *WorkerCtx) error + ctx context.Context cancelCtx context.CancelFunc @@ -161,14 +164,16 @@ func (m *Manager) Go(name string, fn func(w *WorkerCtx) error) { } func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) { - m.workerStart() - defer m.workerDone() - w := &WorkerCtx{ - logger: m.logger.With("worker", name), + name: name, + workFunc: fn, + logger: m.logger.With("worker", name), } w.ctx = m.ctx + m.workerStart(w) + defer m.workerDone(w) + backoff := time.Second failCnt := 0 @@ -244,15 +249,17 @@ func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) { // - Panic catching. // - Flow control helpers. func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error { - m.workerStart() - defer m.workerDone() - // Create context. w := &WorkerCtx{ - ctx: m.Ctx(), - logger: m.logger.With("worker", name), + name: name, + workFunc: fn, + ctx: m.Ctx(), + logger: m.logger.With("worker", name), } + m.workerStart(w) + defer m.workerDone(w) + // Run worker. panicInfo, err := m.runWorker(w, fn) switch { diff --git a/service/mgr/worker_info.go b/service/mgr/worker_info.go new file mode 100644 index 000000000..9f281da23 --- /dev/null +++ b/service/mgr/worker_info.go @@ -0,0 +1,387 @@ +package mgr + +import ( + "bytes" + "errors" + "fmt" + "io" + "reflect" + "runtime" + "slices" + "strconv" + "strings" + "text/tabwriter" + + "github.com/maruel/panicparse/v2/stack" +) + +// WorkerInfoModule is used for interface checks on modules. +type WorkerInfoModule interface { + WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error) +} + +func (m *Manager) registerWorker(w *WorkerCtx) { + m.workersLock.Lock() + defer m.workersLock.Unlock() + + // Iterate forwards over the ring buffer. + end := (m.workersIndex - 1 + len(m.workers)) % len(m.workers) + for { + // Check if entry is available. + if m.workers[m.workersIndex] == nil { + m.workers[m.workersIndex] = w + return + } + // Check if we checked the whole ring buffer. + if m.workersIndex == end { + break + } + // Go to next index. + m.workersIndex = (m.workersIndex + 1) % len(m.workers) + } + + // Increase ring buffer. + newRingBuf := make([]*WorkerCtx, len(m.workers)*4) + copy(newRingBuf, m.workers) + // Add new entry. + m.workersIndex = len(m.workers) + newRingBuf[m.workersIndex] = w + m.workersIndex++ + // Switch to new ring buffer. + m.workers = newRingBuf +} + +func (m *Manager) unregisterWorker(w *WorkerCtx) { + m.workersLock.Lock() + defer m.workersLock.Unlock() + + // Iterate backwards over the ring buffer. + i := m.workersIndex + end := (i + 1) % len(m.workers) + for { + // Check if entry is the one we want to remove. + if m.workers[i] == w { + m.workers[i] = nil + return + } + // Check if we checked the whole ring buffer. + if i == end { + break + } + // Go to next index. + i = (i - 1 + len(m.workers)) % len(m.workers) + } +} + +// WorkerInfo holds status information about a managers workers. +type WorkerInfo struct { + Running int + Waiting int + + Other int + Missing int + + Workers []*WorkerInfoDetail +} + +// WorkerInfoDetail holds status information about a single worker. +type WorkerInfoDetail struct { + Count int + State string + Mgr string + Name string + Func string + CurrentLine string + ExtraInfo string +} + +// WorkerInfo returns status information for all running workers of this manager. +func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error) { + m.workersLock.Lock() + defer m.workersLock.Unlock() + + var err error + if s == nil { + s, _, err = stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts()) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("get stack: %w", err) + } + } + + wi := &WorkerInfo{ + Workers: make([]*WorkerInfoDetail, 0, len(m.workers)), + } + + // Go through all registered workers of manager. + for _, w := range m.workers { + // Ignore empty slots. + if w == nil { + continue + } + + // Setup worker detail struct. + wd := &WorkerInfoDetail{ + Count: 1, + Mgr: m.name, + } + if w.workerMgr != nil { + wd.Name = w.workerMgr.name + wd.Func = getFuncName(w.workerMgr.fn) + } else { + wd.Name = w.name + wd.Func = getFuncName(w.workFunc) + } + + // Search for stack of this worker. + goroutines: + for _, gr := range s.Goroutines { + for _, call := range gr.Stack.Calls { + // Check if the can find the worker function in a call stack. + fullFuncName := call.Func.ImportPath + "." + call.Func.Name + if fullFuncName == wd.Func { + wd.State = gr.State + + // Find most useful line for where the goroutine currently is at. + // Cut import path prefix to domain/user, eg. github.com/safing + importPathPrefix := call.ImportPath + splitted := strings.SplitN(importPathPrefix, "/", 3) + if len(splitted) == 3 { + importPathPrefix = splitted[0] + "/" + splitted[1] + "/" + } + // Find "last" call within that import path prefix. + for _, call = range gr.Stack.Calls { + if strings.HasPrefix(call.ImportPath, importPathPrefix) { + wd.CurrentLine = call.ImportPath + "/" + call.SrcName + ":" + strconv.Itoa(call.Line) + break + } + } + // Fall back to last call if no better line was found. + if wd.CurrentLine == "" { + wd.CurrentLine = gr.Stack.Calls[0].ImportPath + "/" + gr.Stack.Calls[0].SrcName + ":" + strconv.Itoa(gr.Stack.Calls[0].Line) + } + + // Add some extra info in some cases. + if wd.State == "sleep" { //nolint:goconst + wd.ExtraInfo = gr.SleepString() + } + + break goroutines + } + } + } + + // Summarize and add to list. + switch wd.State { + case "idle", "runnable", "running", "syscall", + "waiting", "dead", "enqueue", "copystack": + wi.Running++ + case "chan send", "chan receive", "select", "IO wait", + "panicwait", "semacquire", "semarelease", "sleep": + wi.Waiting++ + case "": + if w.workerMgr != nil { + wi.Waiting++ + wd.State = "scheduled" + wd.ExtraInfo = w.workerMgr.Status() + } else { + wi.Missing++ + wd.State = "missing" + } + default: + wi.Other++ + } + + wi.Workers = append(wi.Workers, wd) + } + + // Sort and return. + wi.clean() + return wi, nil +} + +// Format formats the worker information as a readable table. +func (wi *WorkerInfo) Format() string { + buf := bytes.NewBuffer(nil) + + // Add summary. + buf.WriteString(fmt.Sprintf( + "%d Workers: %d running, %d waiting\n\n", + len(wi.Workers), + wi.Running, + wi.Waiting, + )) + + // Build table. + tabWriter := tabwriter.NewWriter(buf, 4, 4, 3, ' ', 0) + fmt.Fprintf(tabWriter, "#\tState\tModule\tName\tWorker Func\tCurrent Line\tExtra Info\n") + + for _, wd := range wi.Workers { + fmt.Fprintf(tabWriter, + "%d\t%s\t%s\t%s\t%s\t%s\t%s\n", + wd.Count, + wd.State, + wd.Mgr, + wd.Name, + wd.Func, + wd.CurrentLine, + wd.ExtraInfo, + ) + } + _ = tabWriter.Flush() + + return buf.String() +} + +func getFuncName(fn func(w *WorkerCtx) error) string { + name := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + return strings.TrimSuffix(name, "-fm") +} + +func fullStack() []byte { + buf := make([]byte, 8096) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} + +// MergeWorkerInfo merges multiple worker infos into one. +func MergeWorkerInfo(infos ...*WorkerInfo) *WorkerInfo { + // Calculate total registered workers. + var totalWorkers int + for _, status := range infos { + totalWorkers += len(status.Workers) + } + + // Merge all worker infos. + wi := &WorkerInfo{ + Workers: make([]*WorkerInfoDetail, 0, totalWorkers), + } + for _, info := range infos { + wi.Running += info.Running + wi.Waiting += info.Waiting + wi.Other += info.Other + wi.Missing += info.Missing + wi.Workers = append(wi.Workers, info.Workers...) + } + + // Sort and return. + wi.clean() + return wi +} + +func (wi *WorkerInfo) clean() { + // Check if there is anything to do. + if len(wi.Workers) <= 1 { + return + } + + // Sort for deduplication. + slices.SortFunc(wi.Workers, sortWorkerInfoDetail) + + // Count duplicate worker details. + current := wi.Workers[0] + for i := 1; i < len(wi.Workers); i++ { + if workerDetailsAreEqual(current, wi.Workers[i]) { + current.Count++ + } else { + current = wi.Workers[i] + } + } + // Deduplicate worker details. + wi.Workers = slices.CompactFunc(wi.Workers, workerDetailsAreEqual) + + // Sort for presentation. + slices.SortFunc(wi.Workers, sortWorkerInfoDetailByCount) +} + +// sortWorkerInfoDetail is a sort function to sort worker info details by their content. +func sortWorkerInfoDetail(a, b *WorkerInfoDetail) int { + switch { + case a.State != b.State: + return strings.Compare(a.State, b.State) + case a.Mgr != b.Mgr: + return strings.Compare(a.Mgr, b.Mgr) + case a.Name != b.Name: + return strings.Compare(a.Name, b.Name) + case a.Func != b.Func: + return strings.Compare(a.Func, b.Func) + case a.CurrentLine != b.CurrentLine: + return strings.Compare(a.CurrentLine, b.CurrentLine) + case a.ExtraInfo != b.ExtraInfo: + return strings.Compare(a.ExtraInfo, b.ExtraInfo) + case a.Count != b.Count: + return b.Count - a.Count + default: + return 0 + } +} + +// sortWorkerInfoDetailByCount is a sort function to sort worker info details by their count and then by content. +func sortWorkerInfoDetailByCount(a, b *WorkerInfoDetail) int { + stateA, stateB := goroutineStateOrder(a.State), goroutineStateOrder(b.State) + switch { + case stateA != stateB: + return stateA - stateB + case a.State != b.State: + return strings.Compare(a.State, b.State) + case a.Count != b.Count: + return b.Count - a.Count + case a.Mgr != b.Mgr: + return strings.Compare(a.Mgr, b.Mgr) + case a.Name != b.Name: + return strings.Compare(a.Name, b.Name) + case a.Func != b.Func: + return strings.Compare(a.Func, b.Func) + case a.CurrentLine != b.CurrentLine: + return strings.Compare(a.CurrentLine, b.CurrentLine) + case a.ExtraInfo != b.ExtraInfo: + return strings.Compare(a.ExtraInfo, b.ExtraInfo) + default: + return 0 + } +} + +// workerDetailsAreEqual is a deduplication function for worker details. +func workerDetailsAreEqual(a, b *WorkerInfoDetail) bool { + switch { + case a.State != b.State: + return false + case a.Mgr != b.Mgr: + return false + case a.Name != b.Name: + return false + case a.Func != b.Func: + return false + case a.CurrentLine != b.CurrentLine: + return false + case a.ExtraInfo != b.ExtraInfo: + return false + default: + return true + } +} + +func goroutineStateOrder(state string) int { + switch state { + case "runnable", "running", "syscall": + return 0 // Active. + case "idle", "waiting", "dead", "enqueue", "copystack": + return 1 // Active-ish. + case "semacquire", "semarelease", "sleep", "panicwait": + return 2 // Bad (practice) blocking. + case "chan send", "chan receive", "select": + return 3 // Potentially bad (practice), but normal blocking. + case "IO wait": + return 4 // Normal blocking. + case "scheduled": + return 5 // Not running. + case "missing", "": + return 6 // Warning of undetected workers. + default: + return 9 + } +} diff --git a/service/mgr/worker_test.go b/service/mgr/worker_test.go new file mode 100644 index 000000000..f9eb9c3e5 --- /dev/null +++ b/service/mgr/worker_test.go @@ -0,0 +1,51 @@ +package mgr + +import ( + "fmt" + "testing" + "time" +) + +func TestWorkerInfo(t *testing.T) { //nolint:paralleltest + mgr := New("test") + mgr.Go("test func one", testFunc1) + mgr.Go("test func two", testFunc2) + mgr.Go("test func three", testFunc3) + defer mgr.Cancel() + + time.Sleep(100 * time.Millisecond) + + info, err := mgr.WorkerInfo(nil) + if err != nil { + t.Fatal(err) + } + if info.Waiting != 3 { + t.Errorf("expected three waiting workers") + } + + fmt.Printf("%+v\n", info) +} + +func testFunc1(ctx *WorkerCtx) error { + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + } + return nil +} + +func testFunc2(ctx *WorkerCtx) error { + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + } + return nil +} + +func testFunc3(ctx *WorkerCtx) error { + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + } + return nil +} diff --git a/service/mgr/workermgr.go b/service/mgr/workermgr.go index 55ba3b3ac..138ab1670 100644 --- a/service/mgr/workermgr.go +++ b/service/mgr/workermgr.go @@ -125,14 +125,15 @@ func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, errorFn run: make(chan struct{}, 1), selectAction: make(chan struct{}, 1), } + wCtx.workerMgr = s go s.taskMgr() return s } func (s *WorkerMgr) taskMgr() { - s.mgr.workerStart() - defer s.mgr.workerDone() + s.mgr.workerStart(s.ctx) + defer s.mgr.workerDone(s.ctx) // If the task manager ends, end all descendants too. defer s.ctx.cancelCtx() @@ -229,6 +230,23 @@ manage: } } +// Status returns the current status of the worker manager. +func (s *WorkerMgr) Status() string { + s.actionLock.Lock() + defer s.actionLock.Unlock() + + switch { + case s.delay != nil: + return "delayed" + case s.repeat != nil: + return "repeated every " + s.repeat.interval.String() + case s.keepAlive != nil: + return "on demand" + default: + return "created" + } +} + // Go executes the worker immediately. // If the worker is currently being executed, // the next execution will commence afterwards. From 7f0b5ca1492799ae64c9d8dd43eb1fa3ecf8110a Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 27 Aug 2024 16:42:22 +0200 Subject: [PATCH 3/9] [service] Fix starting and stopping of SPN --- service/instance.go | 2 +- spn/access/module.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/service/instance.go b/service/instance.go index 8eab70eb4..ad6e9dab9 100644 --- a/service/instance.go +++ b/service/instance.go @@ -619,7 +619,7 @@ func (i *Instance) shutdown(exitCode int) { // Stopping returns whether the instance is shutting down. func (i *Instance) Stopping() bool { - return i.ctx.Err() == nil + return i.ctx.Err() != nil } // Stopped returns a channel that is triggered when the instance has shut down. diff --git a/spn/access/module.go b/spn/access/module.go index 2b0765865..d49524f93 100644 --- a/spn/access/module.go +++ b/spn/access/module.go @@ -86,22 +86,24 @@ func start() error { enabled := config.GetAsBool("spn/enable", false) if enabled() { + log.Info("spn: starting SPN") module.mgr.Go("ensure SPN is started", module.instance.SPNGroup().EnsureStartedWorker) } else { + log.Info("spn: stopping SPN") module.mgr.Go("ensure SPN is stopped", module.instance.SPNGroup().EnsureStoppedWorker) } return false, nil }) + // Load tokens from database. + loadTokens() + // Check if we need to enable SPN now. enabled := config.GetAsBool("spn/enable", false) if enabled() { module.mgr.Go("ensure SPN is started", module.instance.SPNGroup().EnsureStartedWorker) } - // Load tokens from database. - loadTokens() - // Register new task. module.updateAccountWorkerMgr.Delay(1 * time.Minute) } From 78e4a40750565d8b19c7340abe584564f79bc32b Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:50:45 +0200 Subject: [PATCH 4/9] [service] Fix SPN build --- spn/debug.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 spn/debug.go diff --git a/spn/debug.go b/spn/debug.go new file mode 100644 index 000000000..824c41da9 --- /dev/null +++ b/spn/debug.go @@ -0,0 +1,60 @@ +package spn + +import ( + "bytes" + "errors" + "fmt" + "io" + "runtime" + + "github.com/maruel/panicparse/v2/stack" + + "github.com/safing/portmaster/base/utils/debug" + "github.com/safing/portmaster/service/mgr" +) + +// GetWorkerInfo returns the worker info of all running workers. +func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) { + snapshot, _, err := stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts()) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("get stack: %w", err) + } + + infos := make([]*mgr.WorkerInfo, 0, 32) + for _, m := range i.serviceGroup.Modules() { + wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. + infos = append(infos, wi) + } + + return mgr.MergeWorkerInfo(infos...), nil +} + +// AddWorkerInfoToDebugInfo adds the worker info of all running workers to the debug info. +func (i *Instance) AddWorkerInfoToDebugInfo(di *debug.Info) { + info, err := i.GetWorkerInfo() + if err != nil { + di.AddSection( + "Worker Status Failed", + debug.UseCodeSection, + err.Error(), + ) + return + } + + di.AddSection( + fmt.Sprintf("Worker Status: %d/%d (%d?)", info.Running, len(info.Workers), info.Missing+info.Other), + debug.UseCodeSection, + info.Format(), + ) +} + +func fullStack() []byte { + buf := make([]byte, 8096) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} From 113dc0438a103687b8b9eb290c4ece2732f14cd3 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:51:13 +0200 Subject: [PATCH 5/9] [service] Update golangci-lint --- .github/workflows/go.yml | 2 +- .golangci.yml | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 04a0ea64b..992f0c62e 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -40,7 +40,7 @@ jobs: - name: Run golangci-lint uses: golangci/golangci-lint-action@v4 with: - version: v1.57.1 + version: v1.60.3 only-new-issues: true args: -c ./.golangci.yml --timeout 15m diff --git a/.golangci.yml b/.golangci.yml index d6892cbcb..12967f606 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -18,11 +18,13 @@ linters: - gocyclo - goerr113 - gomnd + - gomoddirectives - ifshort - interfacebloat - interfacer - ireturn - lll + - mnd - musttag - nestif - nilnil @@ -31,16 +33,15 @@ linters: - nolintlint - nonamedreturns - nosnakecase + - perfsprint # TODO(ppacher): we should re-enanble this one to avoid costly fmt.* calls in the hot-path - revive - tagliatelle + - testifylint - testpackage - varnamelen - whitespace - wrapcheck - wsl - - perfsprint # TODO(ppacher): we should re-enanble this one to avoid costly fmt.* calls in the hot-path - - testifylint - - gomoddirectives linters-settings: revive: From c6ddaf8e1ed04f41034c7b463fe9c3c7583b45ab Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:52:30 +0200 Subject: [PATCH 6/9] [service] Fix startup race condition --- service/profile/config-update.go | 2 +- service/profile/module.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/service/profile/config-update.go b/service/profile/config-update.go index acbe7b85b..f3e8161c8 100644 --- a/service/profile/config-update.go +++ b/service/profile/config-update.go @@ -23,7 +23,7 @@ var ( cfgFilterLists []string ) -func registerConfigUpdater() error { +func registerGlobalConfigProfileUpdater() error { module.instance.Config().EventConfigChange.AddCallback("update global config profile", func(wc *mgr.WorkerCtx, s struct{}) (cancel bool, err error) { return false, updateGlobalConfigProfile(wc.Ctx()) }) diff --git a/service/profile/module.go b/service/profile/module.go index 01659c761..911ef99c2 100644 --- a/service/profile/module.go +++ b/service/profile/module.go @@ -61,10 +61,6 @@ func prep() error { return err } - if err := registerConfigUpdater(); err != nil { - return err - } - if err := registerMigrations(); err != nil { return err } @@ -118,6 +114,12 @@ func start() error { module.mgr.Go("clean active profiles", cleanActiveProfiles) + // Register config callback when starting, as it depends on the updates module, + // but the config system will already submit events earlier. + if err := registerGlobalConfigProfileUpdater(); err != nil { + return err + } + err = updateGlobalConfigProfile(module.mgr.Ctx()) if err != nil { log.Warningf("profile: error during loading global profile from configuration: %s", err) From de4cb5b34fb6a0411c17355fcce71abb9538f728 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:55:01 +0200 Subject: [PATCH 7/9] [service] Submit state clear event outside of lock --- service/mgr/states.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/mgr/states.go b/service/mgr/states.go index c80d6771a..f887a1152 100644 --- a/service/mgr/states.go +++ b/service/mgr/states.go @@ -149,11 +149,11 @@ func (m *StateMgr) Remove(id string) { // Clear removes all states. func (m *StateMgr) Clear() { m.statesLock.Lock() - defer m.statesLock.Unlock() - m.states = nil + m.statesLock.Unlock() - m.statesEventMgr.Submit(m.export()) + // Submit event without lock, because callbacks might come back to change states. + defer m.statesEventMgr.Submit(m.Export()) } // Export returns the current states. From 4b2e4f208f8c76c645f208f7393098db6b626a3a Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:55:32 +0200 Subject: [PATCH 8/9] [service] Run event callbacks asynchronously --- service/mgr/events.go | 54 +++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/service/mgr/events.go b/service/mgr/events.go index 03436dc62..359f9a92e 100644 --- a/service/mgr/events.go +++ b/service/mgr/events.go @@ -2,7 +2,6 @@ package mgr import ( - "fmt" "slices" "sync" "sync/atomic" @@ -107,6 +106,12 @@ func (em *EventMgr[T]) Submit(event T) { // Run callbacks. for _, ec := range em.callbacks { + // Check if callback was canceled. + if ec.canceled.Load() { + anyCanceled = true + continue + } + // Execute callback. var ( cancel bool @@ -114,29 +119,38 @@ func (em *EventMgr[T]) Submit(event T) { ) if em.mgr != nil { // Prefer executing in worker. - wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error { - cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope. + name := "event " + em.name + " callback " + ec.name + em.mgr.Go(name, func(w *WorkerCtx) error { + cancel, err = ec.callback(w, event) + // Handle error and cancelation. + if err != nil { + w.Warn( + "event callback failed", + "event", em.name, + "callback", ec.name, + "err", err, + ) + } + if cancel { + ec.canceled.Store(true) + } return nil }) - if wkrErr != nil { - err = fmt.Errorf("callback execution failed: %w", wkrErr) - } } else { cancel, err = ec.callback(nil, event) - } - - // Handle error and cancelation. - if err != nil && em.mgr != nil { - em.mgr.Warn( - "event callback failed", - "event", em.name, - "callback", ec.name, - "err", err, - ) - } - if cancel { - ec.canceled.Store(true) - anyCanceled = true + // Handle error and cancelation. + if err != nil && em.mgr != nil { + em.mgr.Warn( + "event callback failed", + "event", em.name, + "callback", ec.name, + "err", err, + ) + } + if cancel { + ec.canceled.Store(true) + anyCanceled = true + } } } From 57e81fb6fb5d7ae1bee610c6b7d5dd1fe676e340 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 28 Aug 2024 11:55:40 +0200 Subject: [PATCH 9/9] [service] Make linter happy --- service/debug_test.go | 2 ++ service/mgr/group.go | 1 + service/mgr/worker_info.go | 10 ++++++---- service/mgr/workermgr.go | 1 + 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/service/debug_test.go b/service/debug_test.go index 52fbe0c7c..7e2ee495a 100644 --- a/service/debug_test.go +++ b/service/debug_test.go @@ -9,6 +9,8 @@ import ( ) func TestDebug(t *testing.T) { + t.Parallel() + // Create test instance with at least one worker. i := &Instance{} n, err := notifications.New(i) diff --git a/service/mgr/group.go b/service/mgr/group.go index 035bcf05e..0019c28c0 100644 --- a/service/mgr/group.go +++ b/service/mgr/group.go @@ -30,6 +30,7 @@ const ( groupStateInvalid ) +//nolint:goconst func groupStateToString(state int32) string { switch state { case groupStateOff: diff --git a/service/mgr/worker_info.go b/service/mgr/worker_info.go index 9f281da23..7f0441895 100644 --- a/service/mgr/worker_info.go +++ b/service/mgr/worker_info.go @@ -176,7 +176,8 @@ func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error) { "waiting", "dead", "enqueue", "copystack": wi.Running++ case "chan send", "chan receive", "select", "IO wait", - "panicwait", "semacquire", "semarelease", "sleep": + "panicwait", "semacquire", "semarelease", "sleep", + "sync.Mutex.Lock": wi.Waiting++ case "": if w.workerMgr != nil { @@ -213,10 +214,10 @@ func (wi *WorkerInfo) Format() string { // Build table. tabWriter := tabwriter.NewWriter(buf, 4, 4, 3, ' ', 0) - fmt.Fprintf(tabWriter, "#\tState\tModule\tName\tWorker Func\tCurrent Line\tExtra Info\n") + _, _ = fmt.Fprintf(tabWriter, "#\tState\tModule\tName\tWorker Func\tCurrent Line\tExtra Info\n") for _, wd := range wi.Workers { - fmt.Fprintf(tabWriter, + _, _ = fmt.Fprintf(tabWriter, "%d\t%s\t%s\t%s\t%s\t%s\t%s\n", wd.Count, wd.State, @@ -365,13 +366,14 @@ func workerDetailsAreEqual(a, b *WorkerInfoDetail) bool { } } +//nolint:goconst func goroutineStateOrder(state string) int { switch state { case "runnable", "running", "syscall": return 0 // Active. case "idle", "waiting", "dead", "enqueue", "copystack": return 1 // Active-ish. - case "semacquire", "semarelease", "sleep", "panicwait": + case "semacquire", "semarelease", "sleep", "panicwait", "sync.Mutex.Lock": return 2 // Bad (practice) blocking. case "chan send", "chan receive", "select": return 3 // Potentially bad (practice), but normal blocking. diff --git a/service/mgr/workermgr.go b/service/mgr/workermgr.go index 138ab1670..7abaddb32 100644 --- a/service/mgr/workermgr.go +++ b/service/mgr/workermgr.go @@ -196,6 +196,7 @@ manage: workerMgr: s, logger: s.ctx.logger, } + //nolint:fatcontext // Every run gets a new context. wCtx.ctx, wCtx.cancelCtx = context.WithCancel(s.ctx.ctx) panicInfo, err := s.mgr.runWorker(wCtx, s.fn)