Skip to content

Commit

Permalink
allow to configure multiple queues
Browse files Browse the repository at this point in the history
Added option to configure multiple nfqueues.

Post with detailed information about the performance:
#1104

After using -queues 1:6 , you need to configure the rules manually:
(for TCP)
nft insert rule inet mangle output tcp flags syn / fin,syn,rst,ack queue to numgen inc mod 6

TODO:
 - Configure queues in the fw automatically based on the queues defined.
 - Investigate if we need to use runtime.LockOSThread() in NewQueue().
 - Allow to use multiple instances of the daemon:
    * One daemon acts as the main daemon, connected to the server (UI) and
    managing the rules and notifications.
    * The other daemons only intercept and apply verdicts on packets, with
    the rules loaded from a central directory (/etc/opensnitchd/rules)

FIXME:
 - There's a deadlock repeating the packets when a connection is waiting
   for approval.
 - Investigate the high mem consumption under heavy load.
  • Loading branch information
gustavo-iniguez-goya committed Apr 5, 2024
1 parent 2ec37ed commit f032575
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"runtime"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -68,6 +70,7 @@ var (
ebpfModPath = "" // /usr/lib/opensnitchd/ebpf
noLiveReload = false
queueNum = 0
queues = ""
repeatQueueNum int //will be set later to queueNum + 1
workers = 16
debug = false
Expand All @@ -91,7 +94,7 @@ var (
queue = (*netfilter.Queue)(nil)
repeatQueue = (*netfilter.Queue)(nil)
repeatPktChan = (<-chan netfilter.Packet)(nil)
pktChan = (<-chan netfilter.Packet)(nil)
pktChan = [](<-chan netfilter.Packet)(nil)
wrkChan = (chan netfilter.Packet)(nil)
sigChan = (chan os.Signal)(nil)
exitChan = (chan bool)(nil)
Expand All @@ -106,6 +109,7 @@ func init() {
flag.StringVar(&procmonMethod, "process-monitor-method", procmonMethod, "How to search for processes path. Options: ftrace, audit (experimental), ebpf (experimental), proc (default)")
flag.StringVar(&uiSocket, "ui-socket", uiSocket, "Path the UI gRPC service listener (https://github.com/grpc/grpc/blob/master/doc/naming.md).")
flag.IntVar(&queueNum, "queue-num", queueNum, "Netfilter queue number.")
flag.StringVar(&queues, "queues", queues, "Netfilter total queues. Format: -queues 1:10 (starts 10 queues)")
flag.IntVar(&workers, "workers", workers, "Number of concurrent workers.")
flag.BoolVar(&noLiveReload, "no-live-reload", debug, "Disable rules live reloading.")

Expand Down Expand Up @@ -154,16 +158,10 @@ func overwriteLogging() bool {
func setupQueues() {
// prepare the queue
var err error
queue, err = netfilter.NewQueue(uint16(queueNum))
if err != nil {
msg := fmt.Sprintf("Error creating queue #%d: %s", queueNum, err)
uiClient.SendWarningAlert(msg)
log.Warning("Is opensnitchd already running?")
log.Fatal(msg)
}
pktChan = queue.Packets()

repeatQueueNum = queueNum + 1
// use upper range numbers for the repeating queue, not to interfere with
// the queue ranges.
repeatQueueNum = 32000 - queueNum

repeatQueue, err = netfilter.NewQueue(uint16(repeatQueueNum))
if err != nil {
Expand All @@ -173,6 +171,25 @@ func setupQueues() {
log.Warning(msg)
}
repeatPktChan = repeatQueue.Packets()

// the format to specify multiple queues is 1:10
qs := strings.SplitN(queues, ":", 2)
lowb := uint64(0)
upb := uint64(1)
if len(qs) > 1 {
lowb, err = strconv.ParseUint(qs[0], 10, 16)
if err != nil {
lowb = 0
}
upb, err = strconv.ParseUint(qs[1], 10, 16)
if err != nil {
upb = lowb + 1
}
}
for i := lowb; i < upb; i++ {
q, _ := netfilter.NewQueue(uint16(i))
pktChan = append(pktChan, q.Packets())
}
}

func setupLogging() {
Expand Down Expand Up @@ -258,12 +275,9 @@ func worker(id int) {
case <-ctx.Done():
goto Exit
default:
pkt, ok := <-wrkChan
if !ok {
log.Debug("worker channel closed %d", id)
goto Exit
}
pkt := <-wrkChan
onPacket(pkt)

}
}
Exit:
Expand All @@ -273,7 +287,7 @@ Exit:
func setupWorkers() {
log.Debug("Starting %d workers ...", workers)
// setup the workers
wrkChan = make(chan netfilter.Packet)
wrkChan = make(chan netfilter.Packet, workers)
for i := 0; i < workers; i++ {
go worker(i)
}
Expand Down Expand Up @@ -638,18 +652,26 @@ func main() {
initSystemdResolvedMonitor()

log.Info("Running on netfilter queue #%d ...", queueNum)
for {
select {
case <-ctx.Done():
goto Exit
case pkt, ok := <-pktChan:
if !ok {
goto Exit
for _, p := range pktChan {
go func(c <-chan netfilter.Packet) {
for {
select {
case <-ctx.Done():
return
case pkt, ok := <-c:
if !ok {
return
}
wrkChan <- pkt
}
}
wrkChan <- pkt
}
}(p)
}
Exit:
select {
case <-sigChan:
case <-ctx.Done():
}

close(wrkChan)
doCleanup(queue, repeatQueue)
os.Exit(0)
Expand Down

0 comments on commit f032575

Please sign in to comment.