Skip to content

Commit

Permalink
Factor consumer main loop into its own method.
Browse files Browse the repository at this point in the history
Refs #783
  • Loading branch information
coleifer committed Jan 9, 2024
1 parent 383331e commit 8cfe4cf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/simple/cons.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ PYTHONPATH=".:$PYTHONPATH"
export PYTHONPATH
WORKER_CLASS=${1:-thread}
export WORKER_CLASS
python ../../huey/bin/huey_consumer.py main.huey --workers=4 -k $WORKER_CLASS -C -S
python ../../huey/bin/huey_consumer.py main.huey --workers=4 -k $WORKER_CLASS -S
49 changes: 30 additions & 19 deletions huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from huey.utils import time_clock


class ConsumerStopped(Exception): pass


class BaseProcess(object):
process_name = 'BaseProcess'

Expand Down Expand Up @@ -431,31 +434,14 @@ def run(self):
Run the consumer.
"""
self.start()
timeout = self._stop_flag_timeout
health_check_ts = time_clock()

while True:
try:
self.stop_flag.wait(timeout=timeout)
except KeyboardInterrupt:
self._logger.info('Received SIGINT')
self.stop(graceful=True)
except:
self._logger.exception('Error in consumer.')
self.stop()
else:
if self._received_signal:
self.stop(graceful=self._graceful)

if self.stop_flag.is_set():
health_check_ts = self.loop(health_check_ts)
except ConsumerStopped:
break

if self._health_check:
now = time_clock()
if now >= health_check_ts + self._health_check_interval:
health_check_ts = now
self.check_worker_health()

self.huey.notify_interrupted_tasks()

if self._restart:
Expand All @@ -465,6 +451,31 @@ def run(self):
else:
self._logger.info('Consumer exiting.')

def loop(self, health_check_ts=None):
try:
self.stop_flag.wait(timeout=self._stop_flag_timeout)
except KeyboardInterrupt:
self._logger.info('Received SIGINT')
self.stop(graceful=True)
except:
self._logger.exception('Error in consumer.')
self.stop()
else:
if self._received_signal:
self.stop(graceful=self._graceful)

if self.stop_flag.is_set():
# Flag to caller that the main consumer loop should shut down.
raise ConsumerStopped

if self._health_check and health_check_ts:
now = time_clock()
if now >= health_check_ts + self._health_check_interval:
health_check_ts = now
self.check_worker_health()

return health_check_ts

def check_worker_health(self):
"""
Check the health of the worker processes. Workers that have died will
Expand Down

0 comments on commit 8cfe4cf

Please sign in to comment.