Skip to content

Continue polling for jobs when only some of the waiting worker priorities will be fully utilised#348

Merged
ZimbiX merged 7 commits intomasterfrom
make-poller-more-easily-satisfied
Mar 16, 2022
Merged

Continue polling for jobs when only some of the waiting worker priorities will be fully utilised#348
ZimbiX merged 7 commits intomasterfrom
make-poller-more-easily-satisfied

Conversation

@ZimbiX
Copy link
Copy Markdown
Member

@ZimbiX ZimbiX commented Mar 3, 2022

Resolves #345.

Basically, this is all about polling again immediately if there are a decent number of jobs we need to get through. But 'decent number of jobs' needs a good definition.

Previously, all idle worker priorities needed to be fully utilised by the jobs from a poll in order to not sleep before the next poll. This change makes it so that sleeping is skipped if any waiting worker priority is fully satisfied by the jobs from the poll.

Determining whether a worker priority would be fully utilised turned out to be far less trivial than I first thought it'd be. You have to take into account that jobs with a higher priority (a smaller number) than a worker's priority also match it.

I'm sure there's a more functional programming way to do this, but I couldn't think of it!

This was kinda hard to explain, so wording suggestions more than welcome.

A simpler alternative would be to just sleep if there are no jobs returned from the poll. That'd incur an extra poll when only a small number of jobs come in, but might be worth the ability to avoid this PR's increase in code complexity? I've kinda had to reimplement allocating jobs to workers!

Manually tested:

➜ ruby -r bundler/setup -r ./app.rb -e 'DB[:que_jobs].truncate; [1, 2].each { |priority| 10.times { MyJob.enqueue(job_options: { priority: priority }) } }' && bundle exec que --worker-priorities 1,2 --poll-interval 2 --minimum-buffer-size 0 --maximum-buffer-size 0 --log-level debug --log-internals ./app.rb | grep -E '(newly_locked|Running MyJob)'
I, [2022-03-04T00:09:57.981221 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:57.981175Z","queue":"default","locked":2,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503879,805503880]}
I, [2022-03-04T00:09:57.983762 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:57.988989 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.036137 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.036092Z","queue":"default","locked":2,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503881,805503882]}
I, [2022-03-04T00:09:58.038124 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.038463 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.091545 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.091499Z","queue":"default","locked":2,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503883,805503884]}
I, [2022-03-04T00:09:58.092894 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.093421 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.147326 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.147236Z","queue":"default","locked":2,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503885,805503886]}
I, [2022-03-04T00:09:58.150473 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.150893 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.202083 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.202014Z","queue":"default","locked":2,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503887,805503888]}
I, [2022-03-04T00:09:58.205669 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.205723 #2692386]  INFO -- : Running MyJob - priority: 1
I, [2022-03-04T00:09:58.258079 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.257993Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503889]}
I, [2022-03-04T00:09:58.260309 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.313026 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.312945Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503890]}
I, [2022-03-04T00:09:58.315538 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.366900 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.366867Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503891]}
I, [2022-03-04T00:09:58.368784 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.420966 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.420890Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503892]}
I, [2022-03-04T00:09:58.423198 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.474958 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.474882Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503893]}
I, [2022-03-04T00:09:58.477672 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.529960 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.529878Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503894]}
I, [2022-03-04T00:09:58.532743 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.585029 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.584941Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503895]}
I, [2022-03-04T00:09:58.586900 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.639576 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.639492Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503896]}
I, [2022-03-04T00:09:58.642056 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.694544 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.694449Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503897]}
I, [2022-03-04T00:09:58.697122 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.749006 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.748908Z","queue":"default","locked":1,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[805503898]}
I, [2022-03-04T00:09:58.751463 #2692386]  INFO -- : Running MyJob - priority: 2
I, [2022-03-04T00:09:58.802323 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:09:58.802238Z","queue":"default","locked":0,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[]}
I, [2022-03-04T00:10:00.826135 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:10:00.826047Z","queue":"default","locked":0,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[]}
I, [2022-03-04T00:10:02.851622 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:10:02.851551Z","queue":"default","locked":0,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[]}
I, [2022-03-04T00:10:04.874819 #2692386]  INFO -- : {"lib":"que","hostname":"ZimbiX-GreenSync","pid":2692386,"thread":2360,"internal_event":"poller_polled","object_id":2420,"t":"2022-03-03T13:10:04.874719Z","queue":"default","locked":0,"priorities":{"2":1,"1":1},"held_locks":[],"newly_locked":[]}
^C

@ZimbiX ZimbiX requested a review from oeoeaio March 3, 2022 13:28
@ZimbiX ZimbiX force-pushed the make-poller-more-easily-satisfied branch from 1c4bc57 to ef2a67d Compare March 3, 2022 13:37
Comment thread lib/que/poller.rb
worker_job_counts[worker_priority] += 1
end
worker_job_counts[worker_priority] == waiting_workers_count
end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see how this is more complicated than it first sounded, nice work.

I want to understand the relationship between the buffer size, the available_priorities hash and what is returned by the poller a bit better. I'm wondering whether a better way to determine whether a poll is satisfied is if we collect at least enough jobs to fill whatever buffer space is available (and maybe at least one priority?, but that feels like an anti-optimisation given the complexity of implementing it). My understanding at the moment is that the counts in job_buffer.available_priorities sum to the number of available workers plus the available space in the buffer. If we can fill the buffer then I think that represents a satisfactory poll, regardless of whether some workers didn't get a job.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had another thought, maybe all that we care about is that the lowest priority worker queue is full?

The available space in the buffer is added to the lowest priority worker queue when constructing the available_priorities hash. So if the lowest priority worker queue is full then the buffer will be full and all available workers of the lowest priority will have a job to do. I think that is a pretty good metric to use for last_poll_satisfied.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking this out:

Five available p1 workers, five available p50 workers, available buffer space of 10, the available priorities will be
{ p1: 5, p50: 15 }

p1 jobs p2 jobs result p50 satisfied?
100 3 { p1: 5, p50: 15 } yes
3 100 { p1: 3, p50: 15 } yes
10 10 { p1: 5, p50: 15 } yes
9 10 { p1: 5, p50: 14 } no

I think I am ok with the last one not being satisfied, the main thing I want to fix here is not polling again immediately when there are still HEAPS of jobs to work.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, maybe that doesn't actually simplify anything because to determine whether the lowest priority has been satisfied we need to do exactly what you have done and allocate jobs to higher priorities.

Another thought: a more fuzzy way of doing this is to just compare the number of jobs to the lowest priority availability without being too exact about where the jobs will actually end up?

Just thinking this out:

Five available p1 workers, five available p50 workers, available buffer space of 10, the available priorities will be
{ p1: 5, p50: 15 }

p1 jobs p2 jobs total job total jobs > p50 availability? (i.e. last_poll_satisfied?)
100 3 20 yes
3 100 18 yes
10 10 20 yes
9 10 19 yes
4 10 14 no
10 4 14 no

That looks pretty good to me. Will see if I can break it.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tested it, think I still need to deal with the nil priority but something like this:

def poll_satisfied?(priorities, jobs)
  lowest_priority = priorities.keys.min
  jobs.count >= priorities[lowest_priority]
end

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of low priority workers plus the buffer size is greater than the total number of non-lowest-priority workers (as is the case with the default settings priorities: 10,20,50,any,any,any and buffer: 8) then it's very difficult to think of a case where you get more than one potentially unnecessary poll occasionally.

Copy link
Copy Markdown
Contributor

@oeoeaio oeoeaio Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, #349 never intended to answer the question of whether any requested priority is full of jobs. It's a different question, which I think provides a close enough approximation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concerns with this algorithm are the ones you raised yourself: that it's complex, a bit impenetrable and that we're re-implementing worker allocation which introduces a coupling we'd probably rather not have. Happy to go with it though if you don't think #349 is appropriate.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I suppose for a rough algorithm, it is better that it err on the side of caution - sometimes polling again unnecessarily, rather than mistakenly sleeping.

Your code is way more readable than mine. And yeah, I can't see that there's much of a drawback either. So I'm leaning towards going with that. But I'd like to let it sit for a while to see how I feel about it later.

if there is a steady stream of jobs one could argue that polling continuously is actually the preferred behaviour, even if workers are not being fully utilised?

I would have thought the notify would handle most/all of that case; but I'm not totally across it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to let it sit for a while to see how I feel about it later.

Sure, sounds good.

I would have thought the notify would handle most/all of that case; but I'm not totally across it.

Oh yeah! I completely forgot about that, you’re probably right.

@ZimbiX
Copy link
Copy Markdown
Member Author

ZimbiX commented Mar 16, 2022

Ah crap, wrong button!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Empty job buffer not refilled when any worker priority > highest job priority

2 participants