Skip to content
20 changes: 17 additions & 3 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ def poll(

return unless should_poll?

expected_count = priorities.inject(0){|s,(_,c)| s + c}

jobs =
connection.execute_prepared(
:poll_jobs,
Expand All @@ -159,7 +157,7 @@ def poll(
)

@last_polled_at = Time.now
@last_poll_satisfied = expected_count == jobs.count
@last_poll_satisfied = any_priority_satisfied?(priorities, jobs)

Que.internal_log :poller_polled, self do
{
Expand Down Expand Up @@ -265,5 +263,21 @@ def cleanup(connection)
SQL
end
end

private

def any_priority_satisfied?(priorities, jobs)
job_priorities = jobs.map { |job| job.fetch(:priority) }.sort
worker_job_counts = Hash.new(0)
priorities.any? do |worker_priority, waiting_workers_count|
waiting_workers_count.times do
return false if job_priorities.empty?
break if job_priorities.first > worker_priority
job_priorities.shift
worker_job_counts[worker_priority] += 1
end
worker_job_counts[worker_priority] == waiting_workers_count
end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see how this is more complicated than it first sounded, nice work.

I want to understand the relationship between the buffer size, the available_priorities hash and what is returned by the poller a bit better. I'm wondering whether a better way to determine whether a poll is satisfied is if we collect at least enough jobs to fill whatever buffer space is available (and maybe at least one priority?, but that feels like an anti-optimisation given the complexity of implementing it). My understanding at the moment is that the counts in job_buffer.available_priorities sum to the number of available workers plus the available space in the buffer. If we can fill the buffer then I think that represents a satisfactory poll, regardless of whether some workers didn't get a job.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had another thought, maybe all that we care about is that the lowest priority worker queue is full?

The available space in the buffer is added to the lowest priority worker queue when constructing the available_priorities hash. So if the lowest priority worker queue is full then the buffer will be full and all available workers of the lowest priority will have a job to do. I think that is a pretty good metric to use for last_poll_satisfied.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking this out:

Five available p1 workers, five available p50 workers, available buffer space of 10, the available priorities will be
{ p1: 5, p50: 15 }

p1 jobs p2 jobs result p50 satisfied?
100 3 { p1: 5, p50: 15 } yes
3 100 { p1: 3, p50: 15 } yes
10 10 { p1: 5, p50: 15 } yes
9 10 { p1: 5, p50: 14 } no

I think I am ok with the last one not being satisfied, the main thing I want to fix here is not polling again immediately when there are still HEAPS of jobs to work.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, maybe that doesn't actually simplify anything because to determine whether the lowest priority has been satisfied we need to do exactly what you have done and allocate jobs to higher priorities.

Another thought: a more fuzzy way of doing this is to just compare the number of jobs to the lowest priority availability without being too exact about where the jobs will actually end up?

Just thinking this out:

Five available p1 workers, five available p50 workers, available buffer space of 10, the available priorities will be
{ p1: 5, p50: 15 }

p1 jobs p2 jobs total job total jobs > p50 availability? (i.e. last_poll_satisfied?)
100 3 20 yes
3 100 18 yes
10 10 20 yes
9 10 19 yes
4 10 14 no
10 4 14 no

That looks pretty good to me. Will see if I can break it.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tested it, think I still need to deal with the nil priority but something like this:

def poll_satisfied?(priorities, jobs)
  lowest_priority = priorities.keys.min
  jobs.count >= priorities[lowest_priority]
end

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of low priority workers plus the buffer size is greater than the total number of non-lowest-priority workers (as is the case with the default settings priorities: 10,20,50,any,any,any and buffer: 8) then it's very difficult to think of a case where you get more than one potentially unnecessary poll occasionally.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, #349 never intended to answer the question of whether any requested priority is full of jobs. It's a different question, which I think provides a close enough approximation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concerns with this algorithm are the ones you raised yourself: that it's complex, a bit impenetrable and that we're re-implementing worker allocation which introduces a coupling we'd probably rather not have. Happy to go with it though if you don't think #349 is appropriate.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I suppose for a rough algorithm, it is better that it err on the side of caution - sometimes polling again unnecessarily, rather than mistakenly sleeping.

Your code is way more readable than mine. And yeah, I can't see that there's much of a drawback either. So I'm leaning towards going with that. But I'd like to let it sit for a while to see how I feel about it later.

if there is a steady stream of jobs one could argue that polling continuously is actually the preferred behaviour, even if workers are not being fully utilised?

I would have thought the notify would handle most/all of that case; but I'm not totally across it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to let it sit for a while to see how I feel about it later.

Sure, sounds good.

I would have thought the notify would handle most/all of that case; but I'm not totally across it.

Oh yeah! I completely forgot about that, you’re probably right.

end
end
end
56 changes: 44 additions & 12 deletions lib/que/poller.spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,29 +255,61 @@ def assert_poll(priorities:, locked:)
assert poller.should_poll?
end

it "should be true if the last poll returned a full complement of jobs" do
jobs = 5.times.map { Que::Job.enqueue }
it "should be true if the jobs returned from the last poll satisfied all priority requests" do
job_ids_p10 = 3.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }

result = poller.poll(priorities: {500 => 3}, held_locks: Set.new)
assert_equal 3, result.length
result = poller.poll(priorities: { 10 => 3, 20 => 2 }, held_locks: Set.new)
assert_equal (job_ids_p10 + job_ids_p20), result.map(&:id)

assert_equal true, poller.should_poll?
end

it "should be false if the last poll didn't return a full complement of jobs" do
jobs = 5.times.map { Que::Job.enqueue }
it "should be true if the jobs returned from the last poll satisfied any priority request" do
job_ids_p10 = 2.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }

result = poller.poll(priorities: {500 => 7}, held_locks: Set.new)
assert_equal 5, result.length
result = poller.poll(priorities: { 10 => 2, 20 => 1 }, held_locks: Set.new)
assert_equal (job_ids_p10 + [job_ids_p20.first]), result.map(&:id)

assert_equal true, poller.should_poll?
end

it "should be true if the jobs returned from the last poll satisfied any priority request and were slightly higher priority than each priority requested" do
job_ids_p10 = 2.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }

result = poller.poll(priorities: { 11 => 2, 21 => 1 }, held_locks: Set.new)
assert_equal (job_ids_p10 + [job_ids_p20.first]), result.map(&:id)

assert_equal true, poller.should_poll?
end

it "should be true if the jobs returned from the last poll satisfied any priority request and a lower priority request was upgraded to high priority" do
job_ids_p10 = 5.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }

result = poller.poll(priorities: { 10 => 3, 20 => 2 }, held_locks: Set.new)
assert_equal job_ids_p10, result.map(&:id)

assert_equal true, poller.should_poll?
end

it "should be false if the jobs returned from the last poll didn't satisfy any priority request" do
job_ids_p10 = 5.times.map { Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] }
job_ids_p20 = 2.times.map { Que::Job.enqueue(job_options: { priority: 20 }).que_attrs[:id] }

result = poller.poll(priorities: { 10 => 6, 20 => 3 }, held_locks: Set.new)
assert_equal (job_ids_p10 + job_ids_p20), result.map(&:id)

assert_equal false, poller.should_poll?
end

it "should be true if the last poll didn't return a full complement of jobs, but the poll_interval has elapsed" do
jobs = 5.times.map { Que::Job.enqueue }
it "should be true if the jobs returned from the last poll didn't satisfy any priority request, but the poll_interval has elapsed" do
job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] }

result = poller.poll(priorities: {500 => 7}, held_locks: Set.new)
assert_equal 5, result.length
result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new)
assert_equal job_ids, result.map(&:id)

poller.instance_variable_set(:@last_polled_at, Time.now - 30)
assert_equal true, poller.should_poll?
Expand Down