Skip to content

Commit

Permalink
Refactor inbox processing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammadnawzad committed Feb 28, 2024
1 parent 7cb0dc2 commit 88a10c6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
6 changes: 2 additions & 4 deletions lib/inboxable/polling_receiver_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@ def perform

def perform_activerecord
Inboxable.inbox_model.pending
.where(last_attempted_at: [..Time.zone.now, nil])
.find_in_batches(batch_size: ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i)
.each do |batch|
batch.each do |inbox|
inbox.processor_class_name.constantize.perform_async(inbox.id)
inbox.update(last_attempted_at: 1.minute.from_now, status: :processed, allow_processing: false)
inbox.update(last_attempted_at: Time.zone.now, status: :processing, allow_processing: false)
end
end
end

def perform_mongoid
batch_size = ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i
Inboxable.inbox_model.pending
.any_of({ last_attempted_at: ..Time.zone.now }, { last_attempted_at: nil })
.each_slice(batch_size) do |batch|
batch.each do |inbox|
inbox.processor_class_name.constantize.perform_async(inbox.id.to_s)
inbox.update(last_attempted_at: 1.minute.from_now, status: :processed, allow_processing: false)
inbox.update(last_attempted_at: Time.zone.now, status: :processing, allow_processing: false)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/templates/activerecrod_inbox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Inbox < ApplicationRecord
after_commit :process, if: :allow_processing?

# Scopes and Enums
enum status: { pending: 0, processed: 1, failed: 2 }
enum status: { pending: 0, processed: 1, failed: 2, processing: 3 }

def increment_attempt
self.attempts = attempts + 1
Expand All @@ -25,6 +25,6 @@ def check_threshold_reach
end

def check_publishing
self.allow_processing = false if processed?
self.allow_processing = false unless pending?
end
end
4 changes: 2 additions & 2 deletions lib/templates/mongoid_inbox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Inbox

attr_accessor :allow_processing

as_enum :status, %i[pending processed failed], field: { type: String, default: 'pending' }, map: :string
as_enum :status, %i[pending processed failed processing], field: { type: String, default: 'pending' }, map: :string

statuses.each_key do |key|
scope key, -> { where(status_cd: key) }
Expand Down Expand Up @@ -45,6 +45,6 @@ def check_threshold_reach
end

def check_publishing
self.allow_processing = false if processed?
self.allow_processing = false unless pending?
end
end

0 comments on commit 88a10c6

Please sign in to comment.