Skip to content

Commit

Permalink
Add localised queue signalling on free slots
Browse files Browse the repository at this point in the history
Instead of a global free slot condition variable, be smarter and signal only the connected start processor of the free slot.

This on its own doesn't do very much and saves a very small number of CPU cycles, but it does enable the flexibility to pass on this signal to the internal job queue condition variable. This is useful as since we added queue categorisation, a waiting queue no longer means it is empty and it could just mean there were no free slots for that specific subqueue, so we should signal it when there's a potential change in job slot distribution.
  • Loading branch information
Bo98 committed Sep 12, 2024
1 parent b5e69f3 commit d698969
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 7 deletions.
10 changes: 8 additions & 2 deletions src/job_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

# A variation of `Thread::Queue` that allows us to prioritise certain types of jobs.
class JobQueue
def initialize(queue_type, logger)
def initialize(queue_type)
@mutex = Mutex.new
@queue = Hash.new { |h, k| h[k] = [] }
@queue_type = queue_type
@condvar = ConditionVariable.new
@logger = logger
end

def <<(job)
Expand All @@ -20,6 +19,13 @@ def <<(job)
end
end

def signal_free(group)
# No need to signal for groups without slot limits.
return if group == :default

@condvar.signal
end

def pop
@mutex.synchronize do
loop do
Expand Down
10 changes: 8 additions & 2 deletions src/orka_start_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ class OrkaStartProcessor < ThreadRunner

def initialize(queue_type, name)
super("#{self.class.name} (#{name})")
@queue = JobQueue.new(queue_type, method(:log))
@queue = JobQueue.new(queue_type)
@orka_free_condvar = ConditionVariable.new
end

def pausable?
true
end

def signal_free(group)
@orka_free_condvar.signal
@queue.signal_free(group)
end

def run
log "Started #{name}."

Expand Down Expand Up @@ -64,7 +70,7 @@ def run
state.orka_mutex.synchronize do
until state.free_slot?(job)
log "Job #{job.runner_name} is waiting for a free slot."
state.orka_free_condvar.wait(state.orka_mutex)
@orka_free_condvar.wait(state.orka_mutex)
end

if paused?
Expand Down
2 changes: 1 addition & 1 deletion src/orka_stop_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def run
log "VM for job #{job.runner_name} already deleted!"
end
job.orka_vm_id = nil
state.orka_free_condvar.broadcast
state.orka_start_processors[job.queue_type].signal_free(job.group)
log "VM for job #{job.runner_name} deleted."
end
end
Expand Down
3 changes: 1 addition & 2 deletions src/shared_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def to_s

attr_reader :config,
:orka_client,
:orka_mutex, :orka_free_condvar, :github_mutex, :github_metadata_condvar,
:orka_mutex, :github_mutex, :github_metadata_condvar,
:orka_start_processors, :orka_stop_processor, :orka_timeout_processor, :github_watcher,
:github_runner_metadata,
:jobs, :expired_jobs
Expand All @@ -75,7 +75,6 @@ def initialize
@orka_client = OrkaAPI::Client.new(@config.orka_base_url, token: @config.orka_token)

@orka_mutex = Mutex.new
@orka_free_condvar = ConditionVariable.new
@github_mutex = Mutex.new
@github_metadata_condvar = ConditionVariable.new
@file_mutex = Mutex.new
Expand Down

0 comments on commit d698969

Please sign in to comment.