Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fea: support job count limit #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cmdstalk -help
# -cmd="": Command to run in worker.
# -per-tube=1: Number of workers per tube.
# -tubes=[default]: Comma separated list of tubes.
# -max-jobs=0: Maximum number of items to process before exitting. Zero for no limit.

# Watch three specific tubes.
cmdstalk -cmd="/path/to/your/worker --your=flags --here" -tubes="one,two,three"
Expand Down
35 changes: 30 additions & 5 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package broker

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -41,8 +42,10 @@ type Broker struct {
// Tube name this broker will service.
Tube string

log *log.Logger
results chan<- *JobResult
log *log.Logger
results chan<- *JobResult
jobReceived chan<- struct{}
ctx context.Context
}

type JobResult struct {
Expand Down Expand Up @@ -71,13 +74,15 @@ type JobResult struct {
}

// New broker instance.
func New(address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) {
func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult, jobReceived chan<- struct{}) (b Broker) {
b.Address = address
b.Tube = tube
b.Cmd = cmd

b.log = log.New(os.Stdout, fmt.Sprintf("[%s:%d] ", tube, slot), log.LstdFlags)
b.results = results
b.jobReceived = jobReceived
b.ctx = ctx
return
}

Expand All @@ -94,17 +99,28 @@ func (b *Broker) Run(ticks chan bool) {
b.log.Println("watching", b.Tube)
ts := beanstalk.NewTubeSet(conn, b.Tube)

b.log.Println("starting reserve loop (waiting for job)")
for {
if ticks != nil {
if _, ok := <-ticks; !ok {
break
}
}

b.log.Println("reserve (waiting for job)")
id, body := bs.MustReserveWithoutTimeout(ts)
if isCancelled(b.ctx) {
break
}

id, body, err := bs.MustReserveWithTimeout(ts, 1*time.Second)
if err == bs.ErrTimeout {
// Doing this to be able to gracefully handle cancelled context.
continue
}

job := bs.NewJob(id, body, conn)

b.jobReceived <- struct{}{}

t, err := job.Timeouts()
if err != nil {
b.log.Panic(err)
Expand Down Expand Up @@ -154,6 +170,15 @@ func (b *Broker) Run(ticks chan bool) {
b.log.Println("broker finished")
}

func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func (b *Broker) executeJob(job bs.Job, shellCmd string) (result *JobResult, err error) {
result = &JobResult{JobId: job.Id, Executed: true}

Expand Down
52 changes: 41 additions & 11 deletions broker/broker_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package broker

import (
"context"
"log"
"sync"
"time"

"github.com/kr/beanstalk"
Expand All @@ -18,19 +20,27 @@ const (
// created. The `perTube` option determines how many brokers are started for
// each tube.
type BrokerDispatcher struct {
address string
cmd string
conn *beanstalk.Conn
perTube uint64
tubeSet map[string]bool
address string
cmd string
conn *beanstalk.Conn
perTube uint64
tubeSet map[string]bool
jobReceived chan<- struct{}
ctx context.Context
wg sync.WaitGroup
}

func NewBrokerDispatcher(address, cmd string, perTube uint64) *BrokerDispatcher {
func NewBrokerDispatcher(parentCtx context.Context, address, cmd string, perTube, maxJobs uint64) *BrokerDispatcher {
ctx, cancel := context.WithCancel(parentCtx)
jobReceived := make(chan struct{})
go limittedCountGenerator(maxJobs, cancel, jobReceived)
return &BrokerDispatcher{
address: address,
cmd: cmd,
perTube: perTube,
tubeSet: make(map[string]bool),
address: address,
cmd: cmd,
perTube: perTube,
tubeSet: make(map[string]bool),
jobReceived: jobReceived,
ctx: ctx,
}
}

Expand Down Expand Up @@ -72,13 +82,33 @@ func (bd *BrokerDispatcher) RunAllTubes() (err error) {
return
}

// limittedCountGenerator creates a channel that returns a boolean channel with
// nlimit true's and false otherwise. If nlimit is 0 it the channel will always
// be containing true.
func limittedCountGenerator(nlimit uint64, cancel context.CancelFunc, eventHappened <-chan struct{}) {
ngenerated := uint64(1)
for range eventHappened {
if nlimit != 0 && ngenerated == nlimit {
log.Println("reached job limit. quitting.")
cancel()
}
ngenerated++
}
}

func (bd *BrokerDispatcher) runBroker(tube string, slot uint64) {
bd.wg.Add(1)
go func() {
b := New(bd.address, tube, slot, bd.cmd, nil)
defer bd.wg.Done()
b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil, bd.jobReceived)
b.Run(nil)
}()
}

func (bd *BrokerDispatcher) Wait() {
bd.wg.Wait()
}

func (bd *BrokerDispatcher) watchNewTubes() (err error) {
tubes, err := bd.conn.ListTubes()
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions bs/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package bs

import (
"errors"
"time"

"github.com/kr/beanstalk"
Expand All @@ -16,18 +17,22 @@ const (
DeadlineSoonDelay = 1 * time.Second
)

var (
ErrTimeout = errors.New("timeout for reserving a job")
)

// reserve-with-timeout until there's a job or something panic-worthy.
// Handles beanstalk.ErrTimeout by retrying immediately.
// Handles beanstalk.ErrDeadline by sleeping DeadlineSoonDelay before retry.
// panics for other errors.
func MustReserveWithoutTimeout(ts *beanstalk.TubeSet) (id uint64, body []byte) {
var err error
func MustReserveWithTimeout(ts *beanstalk.TubeSet, timeout time.Duration) (id uint64, body []byte, err error) {
for {
id, body, err = ts.Reserve(1 * time.Hour)
id, body, err = ts.Reserve(timeout)
if err == nil {
return
} else if err.(beanstalk.ConnError).Err == beanstalk.ErrTimeout {
continue
err = ErrTimeout
return
} else if err.(beanstalk.ConnError).Err == beanstalk.ErrDeadline {
time.Sleep(DeadlineSoonDelay)
continue
Expand Down
4 changes: 4 additions & 0 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Options struct {

// The beanstalkd tubes to watch.
Tubes TubeList

// Maximum number of jobs to process before exitting.
MaxJobs uint64
}

// TubeList is a list of beanstalkd tube names.
Expand All @@ -54,6 +57,7 @@ func ParseFlags() (o Options, err error) {
flag.BoolVar(&o.All, "all", false, "Listen to all tubes, instead of -tubes=...")
flag.StringVar(&o.Cmd, "cmd", "", "Command to run in worker.")
flag.Uint64Var(&o.PerTube, "per-tube", 1, "Number of workers per tube.")
flag.Uint64Var(&o.MaxJobs, "max-jobs", 0, "Maximum number of items to process before exitting. Zero for no limit.")
flag.Var(&o.Tubes, "tubes", "Comma separated list of tubes.")
flag.Parse()

Expand Down
20 changes: 16 additions & 4 deletions cmdstalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,34 @@
package main

import (
"context"
"log"
"os"
"os/signal"

"github.com/99designs/cmdstalk/broker"
"github.com/99designs/cmdstalk/cli"
)

func main() {
opts := cli.MustParseFlags()

bd := broker.NewBrokerDispatcher(opts.Address, opts.Cmd, opts.PerTube)
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-c
log.Println("received interrupt. quitting.")
cancel()
}()

bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube, opts.MaxJobs)

if opts.All {
bd.RunAllTubes()
} else {
bd.RunTubes(opts.Tubes)
}

// TODO: wire up to SIGTERM handler etc.
exitChan := make(chan bool)
<-exitChan
bd.Wait()
}