diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index bcd854dd..ffc9aebe 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,11 +7,19 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - ruby_version: [2.5.x, 2.6.x, 2.7.x] - gemfile: ["4.2", "5.2", "6.0"] - postgres_version: [9, 10, 11, 12] - exclude: - - { gemfile: "4.2", ruby_version: "2.7.x" } + ruby_version: ['2.7', '3.0', '3.1'] + rails_gemfile: ['6.0', '6.1'] + postgres_version: ['14'] + include: + # Postgres versions + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '9' } + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '10' } + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '11' } + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '12' } + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '13' } + - { ruby_version: '3.1', rails_gemfile: '6.1', postgres_version: '14' } + exclude: [] + name: "Test: Ruby ${{ matrix.ruby_version }}, Rails ${{ matrix.rails_gemfile }}, PostgreSQL ${{ matrix.postgres_version }}" services: db: image: postgres:${{ matrix.postgres_version }} @@ -24,16 +32,16 @@ jobs: --health-timeout 5s --health-retries 5 steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - name: Set up Ruby - uses: actions/setup-ruby@v1 + uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby_version }} - name: Test with Rake env: PGHOST: 127.0.0.1 PGUSER: postgres - BUNDLE_GEMFILE: spec/gemfiles/Gemfile.${{ matrix.gemfile }} + BUNDLE_GEMFILE: spec/gemfiles/Gemfile-rails-${{ matrix.rails_gemfile }} run: | sudo apt-get -yqq install libpq-dev postgresql-client createdb que-test diff --git a/.ruby-version b/.ruby-version new file mode 100644 index 00000000..94ff29cc --- /dev/null +++ b/.ruby-version @@ -0,0 +1 @@ +3.1.1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 26b93edd..141ddcc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ +- [2.0.0.beta1](#200beta1) - [1.4.0 \(2022-03-23\)](#140-2022-03-23) - [1.3.1 \(2022-02-25\)](#131-2022-02-25) - [1.3.0 \(2022-02-25\)](#130-2022-02-25) @@ -51,6 +52,59 @@ +## 2.0.0.beta1 + +**Preliminary release of Ruby 3 support** + +**Notable changes**: + +* Support for Ruby 3 introduced +* Database schema has changed to split the job arguments `args` column into `args` and `kwargs` columns, for reliable args and kwargs splitting for Ruby 3. + - The job schema version is now 2. Note that job schema version is distinct from database schema version and Que version. The `job_schema_version` column of the `que_jobs` table no longer defaults and has a not null constraint, so when manually inserting jobs into the table, this must be specified as `2`. If you have a gem that needs to support multiple Que versions, best not to blindly use the value of `Que.job_schema_version`; instead have different code paths depending on the value of `Que.job_schema_version`. You could also use this to know whether keyword arguments are in `args` or `kwargs`. +* Passing a hash literal as the last job argument to be splatted into job keyword arguments is no longer supported. +* Dropped support for providing job options as top-level keyword arguments to `Job.enqueue`, i.e. `queue`, `priority`, `run_at`, `job_class`, and `tags`. Job options now need to be nested under the `job_options` keyword argument instead. See [#336](https://github.com/que-rb/que/pull/336) +* Dropped support for Ruby < 2.7 +* Dropped support for Rails < 6.0 +* The `#by_args` method on the Job model (for both Sequel and ActiveRecord) now searches based on both args and kwargs, but it performs a subset match instead of an exact match. For instance, if your job was scheduled with `'a', 'b', 'c', foo: 'bar', baz: 1`, `by_args('a', 'b', baz: 1)` would find and return the job. +* This release contains a database migration. You will need to migrate Que to the latest database schema version (6). For example, on ActiveRecord and Rails 6: + +```ruby +class UpdateQueTablesToVersion6 < ActiveRecord::Migration[6.0] + def up + Que.migrate!(version: 6) + end + + def down + Que.migrate!(version: 5) + end +end +``` + +**Recommended upgrade process**: + +When using Que 2.x, a job enqueued with Ruby 2.7 will run as expected on Ruby 3. We recommend: + +1. Upgrade your project to the latest 1.x version of Que (1.3.1+) + - IMPORTANT: adds support for zero downtime upgrade to Que 2.x, see changelog below +2. Upgrade your project to Ruby 2.7 and Rails 6.x if it is not already +3. Upgrade your project to Que 2.x but stay on Ruby 2.7 + - IMPORTANT: You will need to continue to run Que 1.x workers until all jobs enqueued using Que 1.x (i.e. with a `job_schema_version` of `1`) have been finished. See below +4. Upgrade your project to Ruby 3 + +*NOTES:* + +* If you were already running Ruby 2.7 and were not passing a hash literal as the last job argument, you *may* be able to upgrade a running system without draining the queue, though this is not recommended. +* For all other cases, you will need to follow the recommended process above or first completely drain the queue (stop enqueuing new jobs and finish processing any jobs in the database, including cleaning out any expired jobs) before upgrading. + +**Deploying Que 1.x and 2.x workers simultaneously**: + +To run workers with two different versions of Que, you'll probably need to temporarily duplicate your gem bundle, with the Que version being the only difference. e.g.: + +- Copy your `Gemfile` and `Gemfile.lock` into a directory called `que-1-gemfile` +- Set a suitable Que version in each `Gemfile` +- Update the bundle at `que-1-gemfile/Gemfile.lock` using `BUNDLE_GEMFILE=que-1-gemfile/Gemfile bundle` +- Create a second deployment of Que, but with your `que` command prefixed with `BUNDLE_GEMFILE=que-1-gemfile/Gemfile` + ## 1.4.0 (2022-03-23) - **Fixed** @@ -61,7 +115,7 @@ * It became used in 1.0.0.beta4, and that changelog entry has been updated to reflect this. - **Documentation**: + Reformatted the changelog to be more consistent, including adding links to all issue/PR numbers. [#347](https://github.com/que-rb/que/pull/347) - + + ## 1.3.1 (2022-02-25) Unfortunately, v1.3.0 was broken. Follow its upgrade instructions, but use this version instead. diff --git a/Dockerfile b/Dockerfile index 3d03c7cc..eaacc9bd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ruby:2.7.5-slim-buster@sha256:4cbbe2fba099026b243200aa8663f56476950cc64ccd91d7aaccddca31e445b5 AS base +FROM ruby:3.1.1-slim-buster@sha256:2ada3e4fe7b1703c9333ad4eb9fc12c1d4d60bce0f981281b2151057e928d9ad AS base # Install libpq-dev in our base layer, as it's needed in all environments RUN apt-get update \ diff --git a/Gemfile b/Gemfile index aef741c0..894108f1 100644 --- a/Gemfile +++ b/Gemfile @@ -27,9 +27,4 @@ group :test do gem 'pg_examiner', '~> 0.5.2' end -platforms :rbx do - gem 'rubysl', '~> 2.0' - gem 'json', '~> 1.8' -end - gemspec diff --git a/README.md b/README.md index 1523b8d1..ca6d79a7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Que ![tests](https://github.com/que-rb/que/workflows/tests/badge.svg) -**This README and the rest of the docs on the master branch all refer to Que 1.0. If you're using version 0.x, please refer to the docs on [the 0.x branch](https://github.com/que-rb/que/tree/0.x).** +**This README and the rest of the docs on the master branch all refer to Que 2.x. For older versions, please refer to the docs on the respective branches: [1.x](https://github.com/que-rb/que/tree/1.x), or [0.x](https://github.com/que-rb/que/tree/0.x).** *TL;DR: Que is a high-performance job queue that improves the reliability of your application by protecting your jobs with the same [ACID guarantees](https://en.wikipedia.org/wiki/ACID) as the rest of your data.* @@ -23,9 +23,9 @@ Que's secondary goal is performance. The worker process is multithreaded, so tha Compatibility: -- MRI Ruby 2.2+ +- MRI Ruby 2.7+ - PostgreSQL 9.5+ -- Rails 4.1+ (optional) +- Rails 6.0+ (optional) **Please note** - Que's job table undergoes a lot of churn when it is under high load, and like any heavily-written table, is susceptible to bloat and slowness if Postgres isn't able to clean it up. The most common cause of this is long-running transactions, so it's recommended to try to keep all transactions against the database housing Que's job table as short as possible. This is good advice to remember for any high-activity database, but bears emphasizing when using tables that undergo a lot of writes. @@ -54,12 +54,12 @@ gem install que First, create the queue schema in a migration. For example: ```ruby -class CreateQueSchema < ActiveRecord::Migration[5.0] +class CreateQueSchema < ActiveRecord::Migration[6.0] def up # Whenever you use Que in a migration, always specify the version you're # migrating to. If you're unsure what the current version is, check the # changelog. - Que.migrate!(version: 5) + Que.migrate!(version: 6) end def down @@ -117,10 +117,10 @@ end You can also add options to run the job after a specific time, or with a specific priority: ```ruby -ChargeCreditCard.enqueue card.id, user_id: current_user.id, run_at: 1.day.from_now, priority: 5 +ChargeCreditCard.enqueue(card.id, user_id: current_user.id, job_options: { run_at: 1.day.from_now, priority: 5 }) ``` ## Running the Que Worker -In order to process jobs, you must start a separate worker process outside of your main server. +In order to process jobs, you must start a separate worker process outside of your main server. ```bash bundle exec que @@ -142,7 +142,7 @@ You may need to pass que a file path to require so that it can load your app. Qu If you're using ActiveRecord to dump your database's schema, please [set your schema_format to :sql](http://guides.rubyonrails.org/migrations.html#types-of-schema-dumps) so that Que's table structure is managed correctly. This is a good idea regardless, as the `:ruby` schema format doesn't support many of PostgreSQL's advanced features. -Pre-1.0, the default queue name needed to be configured in order for Que to work out of the box with Rails. In 1.0 the default queue name is now 'default', as Rails expects, but when Rails enqueues some types of jobs it may try to use another queue name that isn't worked by default. You can either: +Pre-1.0, the default queue name needed to be configured in order for Que to work out of the box with Rails. As of 1.0 the default queue name is now 'default', as Rails expects, but when Rails enqueues some types of jobs it may try to use another queue name that isn't worked by default. You can either: - [Configure Rails](https://guides.rubyonrails.org/configuring.html) to send all internal job types to the 'default' queue by adding the following to `config/application.rb`: diff --git a/bin/command_line_interface.rb b/bin/command_line_interface.rb index e902c62e..dfbd22a2 100644 --- a/bin/command_line_interface.rb +++ b/bin/command_line_interface.rb @@ -220,7 +220,7 @@ def parse( locker = begin - Que::Locker.new(options) + Que::Locker.new(**options) rescue => e output.puts(e.message) return 1 diff --git a/bin/command_line_interface.spec.rb b/bin/command_line_interface.spec.rb index 63c6596e..d5746d85 100644 --- a/bin/command_line_interface.spec.rb +++ b/bin/command_line_interface.spec.rb @@ -43,7 +43,7 @@ def assert_successful_invocation( default_require_file: Que::CommandLineInterface::RAILS_ENVIRONMENT_FILE ) - BlockJob.enqueue(queue: queue_name, priority: 1) + BlockJob.enqueue(job_options: { queue: queue_name, priority: 1 }) thread = Thread.new do diff --git a/docker-compose.yml b/docker-compose.yml index 6c55a3bd..aac19533 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,13 +10,14 @@ services: - db volumes: - .:/work - - ruby-2.7.5-gem-cache:/usr/local/bundle + - ruby-3.1.1-gem-cache:/usr/local/bundle - ~/.docker-rc.d/:/.docker-rc.d/:ro working_dir: /work entrypoint: /work/scripts/docker-entrypoint command: bash environment: DATABASE_URL: postgres://que:que@db/que-test + USE_RAILS: ~ db: image: "postgres:${POSTGRES_VERSION-13}" @@ -43,4 +44,4 @@ services: volumes: db-data: ~ - ruby-2.7.5-gem-cache: ~ + ruby-3.1.1-gem-cache: ~ diff --git a/docs/README.md b/docs/README.md index 2b555ac2..302a7b24 100644 --- a/docs/README.md +++ b/docs/README.md @@ -128,8 +128,8 @@ There are other docs to read if you're using [Sequel](#using-sequel) or [plain P After you've connected Que to the database, you can manage the jobs table. You'll want to migrate to a specific version in a migration file, to ensure that they work the same way even when you upgrade Que in the future: ```ruby -# Update the schema to version #5. -Que.migrate!(version: 5) +# Update the schema to version #6. +Que.migrate!(version: 6) # Remove Que's jobs table entirely. Que.migrate!(version: 0) @@ -441,7 +441,7 @@ que -q default -q credit_cards Then you can set jobs to be enqueued in that queue specifically: ```ruby -ProcessCreditCard.enqueue current_user.id, queue: 'credit_cards' +ProcessCreditCard.enqueue(current_user.id, job_options: { queue: 'credit_cards' }) # Or: @@ -455,7 +455,7 @@ end In some cases, the ProcessCreditCard class may not be defined in the application that is enqueueing the job. In that case, you can specify the job class as a string: ```ruby -Que.enqueue current_user.id, job_class: 'ProcessCreditCard', queue: 'credit_cards' +Que.enqueue(current_user.id, job_options: { job_class: 'ProcessCreditCard', queue: 'credit_cards' }) ``` ## Shutting Down Safely @@ -549,7 +549,7 @@ require 'que' Sequel.migration do up do Que.connection = self - Que.migrate!(version: 5) + Que.migrate!(version: 6) end down do Que.connection = self @@ -585,7 +585,7 @@ Sequel automatically wraps model persistance actions (create, update, destroy) i ## Using Que With ActiveJob -You can include `Que::ActiveJob::JobExtensions` into your `ApplicationJob` subclass to get support for all of Que's +You can include `Que::ActiveJob::JobExtensions` into your `ApplicationJob` subclass to get support for all of Que's [helper methods](#job-helper-methods). These methods will become no-ops if you use a queue adapter that isn't Que, so if you like to use a different adapter in development they shouldn't interfere. Additionally, including `Que::ActiveJob::JobExtensions` lets you define a run() method that supports keyword arguments. diff --git a/lib/que.rb b/lib/que.rb index ff49474d..a29dce7e 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -29,6 +29,7 @@ def SQL.[]=(k,v); super(k, v.strip.gsub(/\s+/, ' ').freeze); end require_relative 'que/utils/logging' require_relative 'que/utils/middleware' require_relative 'que/utils/queue_management' + require_relative 'que/utils/ruby2_keywords' require_relative 'que/utils/transactions' require_relative 'que/version' @@ -61,6 +62,7 @@ class << self include Utils::Logging include Utils::Middleware include Utils::QueueManagement + include Utils::Ruby2Keywords include Utils::Transactions extend Forwardable diff --git a/lib/que/active_job/extensions.rb b/lib/que/active_job/extensions.rb index edc81aac..448e3852 100644 --- a/lib/que/active_job/extensions.rb +++ b/lib/que/active_job/extensions.rb @@ -12,8 +12,10 @@ def run(*args) end def perform(*args) + args, kwargs = Que.split_out_ruby2_keywords(args) + Que.internal_log(:active_job_perform, self) do - {args: args} + {args: args, kwargs: kwargs} end _run( @@ -21,7 +23,12 @@ def perform(*args) que_filter_args( args.map { |a| a.is_a?(Hash) ? a.deep_symbolize_keys : a } ) - ) + ), + kwargs: Que.recursively_freeze( + que_filter_args( + kwargs.deep_symbolize_keys, + ) + ), ) end @@ -53,37 +60,46 @@ def que_filter_args(thing) # A module that we mix into ActiveJob's wrapper for Que::Job, to maintain # backwards-compatibility with internal changes we make. module WrapperExtensions - # The Rails adapter (built against a pre-1.0 version of this gem) - # assumes that it can access a job's id via job.attrs["job_id"]. So, - # oblige it. - def attrs - {"job_id" => que_attrs[:id]} + module ClassMethods + # We've dropped support for job options supplied as top-level keywords, but ActiveJob's QueAdapter still uses them. So we have to move them into the job_options hash ourselves. + def enqueue(args, priority:, queue:, run_at: nil) + super(args, job_options: { priority: priority, queue: queue, run_at: run_at }) + end end - def run(args) - # Our ActiveJob extensions expect to be able to operate on the actual - # job object, but there's no way to access it through ActiveJob. So, - # scope it to the current thread. It's a bit messy, but it's the best - # option under the circumstances (doesn't require hacking ActiveJob in - # any more extensive way). + module InstanceMethods + # The Rails adapter (built against a pre-1.0 version of this gem) + # assumes that it can access a job's id via job.attrs["job_id"]. So, + # oblige it. + def attrs + {"job_id" => que_attrs[:id]} + end + + def run(args) + # Our ActiveJob extensions expect to be able to operate on the actual + # job object, but there's no way to access it through ActiveJob. So, + # scope it to the current thread. It's a bit messy, but it's the best + # option under the circumstances (doesn't require hacking ActiveJob in + # any more extensive way). - # There's no reason this logic should ever nest, because it wouldn't - # make sense to run a worker inside of a job, but even so, assert that - # nothing absurd is going on. - Que.assert NilClass, Thread.current[:que_current_job] + # There's no reason this logic should ever nest, because it wouldn't + # make sense to run a worker inside of a job, but even so, assert that + # nothing absurd is going on. + Que.assert NilClass, Thread.current[:que_current_job] - begin - Thread.current[:que_current_job] = self + begin + Thread.current[:que_current_job] = self - # We symbolize the args hash but ActiveJob doesn't like that :/ - super(args.deep_stringify_keys) - ensure - # Also assert that the current job state was only removed now, but - # unset the job first so that an assertion failure doesn't mess up - # the state any more than it already has. - current = Thread.current[:que_current_job] - Thread.current[:que_current_job] = nil - Que.assert(self, current) + # We symbolize the args hash but ActiveJob doesn't like that :/ + super(args.deep_stringify_keys) + ensure + # Also assert that the current job state was only removed now, but + # unset the job first so that an assertion failure doesn't mess up + # the state any more than it already has. + current = Thread.current[:que_current_job] + Thread.current[:que_current_job] = nil + Que.assert(self, current) + end end end end @@ -92,6 +108,7 @@ def run(args) class ActiveJob::QueueAdapters::QueAdapter class JobWrapper < Que::Job - prepend Que::ActiveJob::WrapperExtensions + extend Que::ActiveJob::WrapperExtensions::ClassMethods + prepend Que::ActiveJob::WrapperExtensions::InstanceMethods end end diff --git a/lib/que/active_job/extensions.spec.rb b/lib/que/active_job/extensions.spec.rb index 18721e6e..85717751 100644 --- a/lib/que/active_job/extensions.spec.rb +++ b/lib/que/active_job/extensions.spec.rb @@ -6,8 +6,9 @@ describe "running jobs via ActiveJob" do before do class TestJobClass < ActiveJob::Base - def perform(*args) + def perform(*args, **kwargs) $args = args + $kwargs = kwargs end end end @@ -17,13 +18,9 @@ def perform(*args) $args = nil end - def execute_raw(*args) - TestJobClass.perform_later(*args) - end - - def execute(*args) + def execute(&perform_later_block) worker # Make sure worker is initialized. - execute_raw(*args) + perform_later_block.call assert_equal 1, active_jobs_dataset.count attrs = active_jobs_dataset.first! @@ -35,15 +32,34 @@ def execute(*args) end it "should pass its arguments to the run method" do - execute(1, 2) + execute { TestJobClass.perform_later(1, 2) } assert_equal [1, 2], $args end it "should handle argument types appropriately" do - execute(symbol_arg: 1, "string_arg" => 2) + execute { TestJobClass.perform_later(symbol_arg: 1, "string_arg" => 2) } + assert_equal( + {symbol_arg: 1, "string_arg" => 2}, + $kwargs, + ) + end + + it 'configures jobs with supported job options' do + run_at = Time.now.round + 60 + attrs = execute do + TestJobClass.new.enqueue( + queue: 'test_queue', + priority: 10, + wait_until: run_at, + ) + end assert_equal( - [{symbol_arg: 1, "string_arg" => 2}], - $args, + { + queue: 'test_queue', + priority: 10, + run_at: run_at, + }, + attrs.slice(:queue, :priority, :run_at), ) end @@ -55,11 +71,11 @@ def execute(*args) job = QueJob.first job.update(finished_at: Time.now) - execute(job_object: job) + execute { TestJobClass.perform_later(job_object: job) } assert_equal( - [{job_object: job}], - $args, + {job_object: job}, + $kwargs, ) end @@ -82,7 +98,7 @@ def execute(*args) } ) - execute(5, 6) + execute { TestJobClass.perform_later(5, 6) } assert_equal [5, 6], $args assert_instance_of ActiveJob::QueueAdapters::QueAdapter::JobWrapper, passed_1 @@ -104,7 +120,9 @@ def perform(*args) end end - assert_raises(CustomExceptionSubclass) { execute(5, 6) } + assert_raises(CustomExceptionSubclass) do + execute { TestJobClass.perform_later(5, 6) } + end assert_equal 1, active_jobs_dataset.count assert_equal [5, 6], active_jobs_dataset.get(:args).first[:arguments] @@ -127,7 +145,7 @@ def perform(*args) end end - execute_raw(3, 4) + TestJobClass.perform_later(3, 4) assert_equal [3, 4], $args assert_raises(PG::InFailedSqlTransaction) { Que.execute "SELECT 1" } @@ -146,7 +164,9 @@ def perform(*args) end end - error = assert_raises(RuntimeError) { execute_raw(3, 4) } + error = assert_raises(RuntimeError) do + TestJobClass.perform_later(3, 4) + end assert_equal "Oopsie!", error.message assert_equal error, notified_error end diff --git a/lib/que/active_record/model.rb b/lib/que/active_record/model.rb index 9f26ff56..ab8b33d6 100644 --- a/lib/que/active_record/model.rb +++ b/lib/que/active_record/model.rb @@ -39,8 +39,8 @@ def by_tag(tag) where("que_jobs.data @> ?", JSON.dump(tags: [tag])) end - def by_args(*args) - where("que_jobs.args @> ?", JSON.dump(args)) + def by_args(*args, **kwargs) + where("que_jobs.args @> ? AND que_jobs.kwargs @> ?", JSON.dump(args), JSON.dump(kwargs)) end end end diff --git a/lib/que/active_record/model.spec.rb b/lib/que/active_record/model.spec.rb index c3b83435..fadfa2b7 100644 --- a/lib/que/active_record/model.spec.rb +++ b/lib/que/active_record/model.spec.rb @@ -8,8 +8,8 @@ require "que/active_record/model" end - def enqueue_job(*args) - Que::Job.enqueue(*args).que_attrs[:id] + def enqueue_job(*args, **kwargs) + Que::Job.enqueue(*args, **kwargs).que_attrs[:id] end def assert_ids(*expected) @@ -100,8 +100,8 @@ def assert_ids(*expected) describe "by_job_class" do it "should return a dataset of jobs with that job class" do - a = enqueue_job(job_class: "CustomJobClass") - b = enqueue_job(job_class: "BlockJob") + a = enqueue_job(job_options: { job_class: "CustomJobClass" }) + b = enqueue_job(job_options: { job_class: "BlockJob" }) c = enqueue_job assert_ids(a) { |ds| ds.by_job_class("CustomJobClass") } @@ -112,8 +112,8 @@ def assert_ids(*expected) end it "should be compatible with ActiveModel job classes" do - a = enqueue_job({job_class: "WrappedJobClass"}, {job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper"}) - b = enqueue_job({job_class: "OtherWrappedJobClass"}, {job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper"}) + a = enqueue_job({job_class: "WrappedJobClass"}, job_options: { job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper" }) + b = enqueue_job({job_class: "OtherWrappedJobClass"}, job_options: { job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper" }) enqueue_job assert_ids(a) { |ds| ds.by_job_class("WrappedJobClass") } @@ -125,7 +125,7 @@ def assert_ids(*expected) describe "by_queue" do it "should return a dataset of jobs in that queue" do a = enqueue_job - b = enqueue_job(queue: "other_queue") + b = enqueue_job(job_options: { queue: "other_queue" }) assert_ids(a) { |ds| ds.by_queue("default") } assert_ids(b) { |ds| ds.by_queue("other_queue") } @@ -135,8 +135,8 @@ def assert_ids(*expected) describe "by_tag" do it "should return a dataset of jobs with the given tag" do - a = enqueue_job(tags: ["tag_1"]) - b = enqueue_job(tags: ["tag_2"]) + a = enqueue_job(job_options: { tags: ["tag_1"] }) + b = enqueue_job(job_options: { tags: ["tag_2"] }) assert_ids(a) { |ds| ds.by_tag("tag_1") } assert_ids(b) { |ds| ds.by_tag("tag_2") } diff --git a/lib/que/job.enqueue_spec.rb b/lib/que/job.enqueue_spec.rb index acfeb7b6..bdb87817 100644 --- a/lib/que/job.enqueue_spec.rb +++ b/lib/que/job.enqueue_spec.rb @@ -4,26 +4,19 @@ describe Que::Job, '.enqueue' do def assert_enqueue( - args, expected_queue: 'default', expected_priority: 100, expected_run_at: Time.now, expected_job_class: Que::Job, expected_result_class: nil, expected_args: [], + expected_kwargs: {}, expected_tags: nil, - expected_job_schema_version: Que.job_schema_version + expected_job_schema_version: Que.job_schema_version, + &enqueue_block ) - assert_equal 0, jobs_dataset.count - - result = - if args.respond_to?(:call) - args.call - else - Que.enqueue(*args) - end - + result = enqueue_block.call assert_equal 1, jobs_dataset.count assert_kind_of Que::Job, result @@ -31,6 +24,7 @@ def assert_enqueue( assert_equal expected_priority, result.que_attrs[:priority] assert_equal expected_args, result.que_attrs[:args] + assert_equal expected_kwargs, result.que_attrs[:kwargs] if expected_tags.nil? assert_equal({}, result.que_attrs[:data]) @@ -50,48 +44,48 @@ def assert_enqueue( end it "should be able to queue a job" do - assert_enqueue [] + assert_enqueue { Que.enqueue } end it "should be able to queue a job with arguments" do - assert_enqueue [1, 'two'], - expected_args: [1, 'two'] + assert_enqueue(expected_args: [1, 'two']) { Que.enqueue(1, 'two') } end it "should be able to queue a job with complex arguments" do - assert_enqueue [ - 1, - 'two', - { + assert_enqueue( + expected_args: [ + 1, + 'two', + ], + expected_kwargs: { string: "string", integer: 5, array: [1, "two", {three: 3}], hash: {one: 1, two: 'two', three: [3]}, }, - ], - expected_args: [ - 1, - 'two', - { + ) do + Que.enqueue( + 1, + 'two', string: "string", integer: 5, array: [1, "two", {three: 3}], hash: {one: 1, two: 'two', three: [3]}, - }, - ] + ) + end end it "should be able to handle a namespaced job class" do - assert_enqueue \ - -> { NamespacedJobNamespace::NamespacedJob.enqueue 1 }, + assert_enqueue( expected_args: [1], - expected_job_class: NamespacedJobNamespace::NamespacedJob + expected_job_class: NamespacedJobNamespace::NamespacedJob, + ) { NamespacedJobNamespace::NamespacedJob.enqueue(1) } end it "should error appropriately on an anonymous job subclass" do klass = Class.new(Que::Job) - error = assert_raises(Que::Error) { klass.enqueue 1 } + error = assert_raises(Que::Error) { klass.enqueue(1) } assert_equal \ "Can't enqueue an anonymous subclass of Que::Job", @@ -99,72 +93,65 @@ def assert_enqueue( end it "should be able to queue a job with a specific queue name" do - assert_enqueue [1, {queue: 'special_queue_name'}], + assert_enqueue( expected_args: [1], - expected_queue: 'special_queue_name' + expected_queue: 'special_queue_name', + ) { Que.enqueue(1, job_options: { queue: 'special_queue_name' }) } end it "should be able to queue a job with a specific time to run" do - assert_enqueue [1, {run_at: Time.now + 60}], + assert_enqueue( expected_args: [1], - expected_run_at: Time.now + 60 + expected_run_at: Time.now + 60, + ) { Que.enqueue(1, job_options: { run_at: Time.now + 60 }) } end it "should be able to queue a job with a specific priority" do - assert_enqueue [1, {priority: 4}], + assert_enqueue( expected_args: [1], - expected_priority: 4 + expected_priority: 4, + ) { Que.enqueue(1, job_options: { priority: 4 }) } end - it "should be able to queue a job with options in addition to args" do - assert_enqueue \ - [1, {string: "string", run_at: Time.now + 60, priority: 4}], - expected_args: [1, {string: "string"}], + it "should be able to queue a job with options in addition to args and kwargs" do + assert_enqueue( + expected_args: [1], + expected_kwargs: { string: "string" }, expected_run_at: Time.now + 60, - expected_priority: 4 - end - - it "should be able to use an explicit `job_options` keyword to avoid conflicts with job keyword args" do - assert_enqueue \ - [1, {string: "string", priority: 10, job_options: { priority: 15 }}], - expected_args: [1, {string: "string", priority: 10}], - expected_priority: 15 + expected_priority: 4, + ) { Que.enqueue(1, string: "string", job_options: { run_at: Time.now + 60, priority: 4 }) } end - it "should fall back to using job options specified at the top level if not specified in job_options" do - assert_enqueue \ - [1, {string: "string", run_at: Time.now + 60, priority: 10, job_options: { priority: 15 }}], - expected_args: [1, {string: "string", priority: 10}], - expected_run_at: Time.now + 60, - expected_priority: 15 + it "should no longer fall back to using job options specified at the top level if not specified in job_options" do + assert_enqueue( + expected_args: [1], + expected_kwargs: { string: "string", run_at: Time.utc(2050).to_s, priority: 10 }, + expected_run_at: Time.now, + expected_priority: 15, + ) { Que.enqueue(1, string: "string", run_at: Time.utc(2050), priority: 10, job_options: { priority: 15 }) } end describe "when enqueuing a job with tags" do it "should be able to specify tags on a case-by-case basis" do - assert_enqueue \ - [1, {string: "string", tags: ["tag_1", "tag_2"]}], - expected_args: [1, {string: "string"}], - expected_tags: ["tag_1", "tag_2"] - end - - it "should be able to use multiple hashes to avoid conflicts with keywords" do - assert_enqueue \ - [1, {string: "string", tags: ["tag_1", "tag_2"]}, {}], - expected_args: [1, {string: "string", tags: ["tag_1", "tag_2"]}], - expected_tags: nil + assert_enqueue( + expected_args: [1], + expected_kwargs: { string: "string" }, + expected_tags: ["tag_1", "tag_2"], + ) { Que.enqueue(1, string: "string", job_options: { tags: ["tag_1", "tag_2"] }) } end - it "should be able to use an explicit `job_options` keyword to avoid conflicts with job keyword args" do - assert_enqueue \ - [1, {string: "string", tags: ["tag_1", "tag_2"], job_options: { tags: ["tag_3", "tag_4"] }}], - expected_args: [1, {string: "string", tags: ["tag_1", "tag_2"]}], - expected_tags: ["tag_3", "tag_4"] + it "should no longer fall back to using tags specified at the top level if not specified in job_options" do + assert_enqueue( + expected_args: [1], + expected_kwargs: { string: "string", tags: ["tag_1", "tag_2"] }, + expected_tags: nil, + ) { Que.enqueue(1, string: "string", tags: ["tag_1", "tag_2"]) } end it "should raise an error if passing too many tags" do error = assert_raises(Que::Error) do - Que::Job.enqueue 1, string: "string", tags: %w[a b c d e f] + Que::Job.enqueue 1, string: "string", job_options: { tags: %w[a b c d e f] } end assert_equal \ @@ -175,7 +162,7 @@ def assert_enqueue( it "should raise an error if any of the tags are too long" do error = assert_raises(Que::Error) do - Que::Job.enqueue 1, string: "string", tags: ["a" * 101] + Que::Job.enqueue 1, string: "string", job_options: { tags: ["a" * 101] } end assert_equal \ @@ -187,11 +174,12 @@ def assert_enqueue( it "should respect a job class defined as a string" do class MyJobClass < Que::Job; end - assert_enqueue \ - ['argument', {other_arg: "other_arg", job_class: 'MyJobClass'}], - expected_args: ['argument', {other_arg: "other_arg"}], + assert_enqueue( + expected_args: ['argument'], + expected_kwargs: { other_arg: "other_arg" }, expected_job_class: MyJobClass, expected_result_class: Que::Job + ) { Que.enqueue('argument', other_arg: "other_arg", job_options: { job_class: 'MyJobClass' }) } end describe "when there's a hierarchy of job classes" do @@ -218,48 +206,48 @@ class QueueSubclassJob < QueueDefaultJob describe "priority" do it "should respect a default priority in a job class" do - assert_enqueue \ - -> { PriorityDefaultJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_priority: 3, expected_job_class: PriorityDefaultJob + ) { PriorityDefaultJob.enqueue(1) } - assert_enqueue \ - -> { PriorityDefaultJob.enqueue 1, priority: 4 }, + assert_enqueue( expected_args: [1], expected_priority: 4, expected_job_class: PriorityDefaultJob + ) { PriorityDefaultJob.enqueue(1, job_options: { priority: 4 }) } end it "should respect an inherited priority in a job class" do - assert_enqueue \ - -> { PrioritySubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_priority: 3, expected_job_class: PrioritySubclassJob + ) { PrioritySubclassJob.enqueue(1) } - assert_enqueue \ - -> { PrioritySubclassJob.enqueue 1, priority: 4 }, + assert_enqueue( expected_args: [1], expected_priority: 4, expected_job_class: PrioritySubclassJob + ) { PrioritySubclassJob.enqueue(1, job_options: { priority: 4 }) } end it "should respect an overridden priority in a job class" do begin PrioritySubclassJob.priority = 60 - assert_enqueue \ - -> { PrioritySubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_priority: 60, expected_job_class: PrioritySubclassJob + ) { PrioritySubclassJob.enqueue(1) } - assert_enqueue \ - -> { PrioritySubclassJob.enqueue 1, priority: 4 }, + assert_enqueue( expected_args: [1], expected_priority: 4, expected_job_class: PrioritySubclassJob + ) { PrioritySubclassJob.enqueue(1, job_options: { priority: 4 }) } ensure PrioritySubclassJob.remove_instance_variable(:@priority) end @@ -268,48 +256,48 @@ class QueueSubclassJob < QueueDefaultJob describe "run_at" do it "should respect a default run_at in a job class" do - assert_enqueue \ - -> { RunAtDefaultJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 30, expected_job_class: RunAtDefaultJob + ) { RunAtDefaultJob.enqueue(1) } - assert_enqueue \ - -> { RunAtDefaultJob.enqueue 1, run_at: Time.now + 60 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 60, expected_job_class: RunAtDefaultJob + ) { RunAtDefaultJob.enqueue(1, job_options: { run_at: Time.now + 60 }) } end it "should respect an inherited run_at in a job class" do - assert_enqueue \ - -> { RunAtSubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 30, expected_job_class: RunAtSubclassJob + ) { RunAtSubclassJob.enqueue(1) } - assert_enqueue \ - -> { RunAtSubclassJob.enqueue 1, run_at: Time.now + 60 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 60, expected_job_class: RunAtSubclassJob + ) { RunAtSubclassJob.enqueue(1, job_options: { run_at: Time.now + 60 }) } end it "should respect an overridden run_at in a job class" do begin RunAtSubclassJob.run_at = -> {Time.now + 90} - assert_enqueue \ - -> { RunAtSubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 90, expected_job_class: RunAtSubclassJob + ) { RunAtSubclassJob.enqueue(1) } - assert_enqueue \ - -> { RunAtSubclassJob.enqueue 1, run_at: Time.now + 60 }, + assert_enqueue( expected_args: [1], expected_run_at: Time.now + 60, expected_job_class: RunAtSubclassJob + ) { RunAtSubclassJob.enqueue(1, job_options: { run_at: Time.now + 60 }) } ensure RunAtSubclassJob.remove_instance_variable(:@run_at) end @@ -318,48 +306,48 @@ class QueueSubclassJob < QueueDefaultJob describe "queue" do it "should respect a default queue in a job class" do - assert_enqueue \ - -> { QueueDefaultJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_1', expected_job_class: QueueDefaultJob + ) { QueueDefaultJob.enqueue(1) } - assert_enqueue \ - -> { QueueDefaultJob.enqueue 1, queue: 'queue_3' }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_3', expected_job_class: QueueDefaultJob + ) { QueueDefaultJob.enqueue(1, job_options: { queue: 'queue_3' }) } end it "should respect an inherited queue in a job class" do - assert_enqueue \ - -> { QueueSubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_1', expected_job_class: QueueSubclassJob + ) { QueueSubclassJob.enqueue(1) } - assert_enqueue \ - -> { QueueSubclassJob.enqueue 1, queue: 'queue_3' }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_3', expected_job_class: QueueSubclassJob + ) { QueueSubclassJob.enqueue(1, job_options: { queue: 'queue_3' }) } end it "should respect an overridden queue in a job class" do begin QueueSubclassJob.queue = :queue_2 - assert_enqueue \ - -> { QueueSubclassJob.enqueue 1 }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_2', expected_job_class: QueueSubclassJob + ) { QueueSubclassJob.enqueue(1) } - assert_enqueue \ - -> { QueueSubclassJob.enqueue 1, queue: 'queue_3' }, + assert_enqueue( expected_args: [1], expected_queue: 'queue_3', expected_job_class: QueueSubclassJob + ) { QueueSubclassJob.enqueue(1, job_options: { queue: 'queue_3' }) } ensure QueueSubclassJob.remove_instance_variable(:@queue) end diff --git a/lib/que/job.rb b/lib/que/job.rb index 501b6877..0b2e4a33 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -12,7 +12,7 @@ class Job SQL[:insert_job] = %{ INSERT INTO public.que_jobs - (queue, priority, run_at, job_class, args, data, job_schema_version) + (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) VALUES ( coalesce($1, 'default')::text, @@ -21,6 +21,7 @@ class Job $4::text, coalesce($5, '[]')::jsonb, coalesce($6, '{}')::jsonb, + coalesce($7, '{}')::jsonb, #{Que.job_schema_version} ) RETURNING * @@ -56,13 +57,10 @@ class << self :priority, :run_at - def enqueue( - *args, - job_options: {}, - **arg_opts - ) - arg_opts, job_options = _extract_job_options(arg_opts, job_options.dup) - args << arg_opts if arg_opts.any? + def enqueue(*args) + args, kwargs = Que.split_out_ruby2_keywords(args) + + job_options = kwargs.delete(:job_options) || {} if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT @@ -81,6 +79,7 @@ def enqueue( priority: job_options[:priority] || resolve_que_setting(:priority), run_at: job_options[:run_at] || resolve_que_setting(:run_at), args: Que.serialize_json(args), + kwargs: Que.serialize_json(kwargs), data: job_options[:tags] ? Que.serialize_json(tags: job_options[:tags]) : "{}", job_class: \ job_options[:job_class] || name || @@ -89,27 +88,31 @@ def enqueue( if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) attrs[:args] = Que.deserialize_json(attrs[:args]) + attrs[:kwargs] = Que.deserialize_json(attrs[:kwargs]) attrs[:data] = Que.deserialize_json(attrs[:data]) _run_attrs(attrs) else values = Que.execute( :insert_job, - attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :data), + attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data), ).first - new(values) end end + ruby2_keywords(:enqueue) if respond_to?(:ruby2_keywords, true) def run(*args) # Make sure things behave the same as they would have with a round-trip # to the DB. + args, kwargs = Que.split_out_ruby2_keywords(args) args = Que.deserialize_json(Que.serialize_json(args)) + kwargs = Que.deserialize_json(Que.serialize_json(kwargs)) # Should not fail if there's no DB connection. - _run_attrs(args: args) + _run_attrs(args: args, kwargs: kwargs) end + ruby2_keywords(:run) if respond_to?(:ruby2_keywords, true) def resolve_que_setting(setting, *args) value = send(setting) if respond_to?(setting) @@ -136,27 +139,6 @@ def _run_attrs(attrs) end end end - - def _extract_job_options(arg_opts, job_options) - deprecated_job_option_names = [] - - %i[queue priority run_at job_class tags].each do |option_name| - next unless arg_opts.key?(option_name) && job_options[option_name].nil? - - job_options[option_name] = arg_opts.delete(option_name) - deprecated_job_option_names << option_name - end - - _log_job_options_deprecation(deprecated_job_option_names) - - [arg_opts, job_options] - end - - def _log_job_options_deprecation(deprecated_job_option_names) - return unless deprecated_job_option_names.any? - - warn "Passing job options like (#{deprecated_job_option_names.join(', ')}) to `JobClass.enqueue` as top level keyword args has been deprecated and will be removed in version 2.0. Please wrap job options in an explicit `job_options` keyword arg instead." - end end # Set up some defaults. diff --git a/lib/que/job.run_synchronously.spec.rb b/lib/que/job.run_synchronously.spec.rb index ae707a89..deb39538 100644 --- a/lib/que/job.run_synchronously.spec.rb +++ b/lib/que/job.run_synchronously.spec.rb @@ -13,7 +13,7 @@ end it "should ignore jobs that are scheduled for a future date" do - assert_instance_of ArgsJob, ArgsJob.enqueue(1, 2, 3, run_at: Time.now + 3) + assert_instance_of ArgsJob, ArgsJob.enqueue(1, 2, 3, job_options: { run_at: Time.now + 3 }) assert_nil $passed_args end end @@ -31,7 +31,7 @@ end it "should ignore jobs that are scheduled for a future date" do - assert_instance_of ArgsJob, ArgsJob.enqueue(1, 2, 3, run_at: Time.now + 3) + assert_instance_of ArgsJob, ArgsJob.enqueue(1, 2, 3, job_options: { run_at: Time.now + 3 }) assert_nil $passed_args end end diff --git a/lib/que/job.spec.rb b/lib/que/job.spec.rb index 208159a3..8d5705de 100644 --- a/lib/que/job.spec.rb +++ b/lib/que/job.spec.rb @@ -9,8 +9,9 @@ Que.error_notifier = proc { |e| notified_errors << e } class TestJobClass < Que::Job - def run(*args) + def run(*args, **kwargs) $args = args + $kwargs = kwargs end end end @@ -18,6 +19,7 @@ def run(*args) after do Object.send :remove_const, :TestJobClass $args = nil + $kwargs = nil end module ActsLikeAJob @@ -28,43 +30,85 @@ def expected_job_count end it "should pass its arguments to the run method" do - execute(1, 2) - assert_equal [1, 2], $args + enqueue_method.call(1, 2) + execute + assert_equal([1, 2], $args) + end + + it "should pass its keyword arguments to the run method" do + enqueue_method.call(a: 1, b: 2) + execute + assert_equal({ a: 1, b: 2 }, $kwargs) end it "should deep-freeze its arguments" do - execute(array: [], hash: {}, string: 'blah'.dup) + enqueue_method.call([], {}, 'blah'.dup) + execute - assert_equal([{array: [], hash: {}, string: 'blah'.dup}], $args) + assert_equal([[], {}, 'blah'.dup], $args) - hash = $args.first - assert hash.frozen? + array = $args + assert array[0].frozen? + assert array[1].frozen? + assert array[2].frozen? + end + + it "should deep-freeze its keyword arguments" do + enqueue_method.call(array: [], hash: {}, string: 'blah'.dup) + execute + + assert_equal({array: [], hash: {}, string: 'blah'.dup}, $kwargs) + + hash = $kwargs assert hash[:array].frozen? assert hash[:hash].frozen? assert hash[:string].frozen? end - it "should symbolize argument hashes" do - execute(a: 1, b: 2) + it "treats the last hash literal as a positional argument" do + enqueue_method.call({a: 1, b: 2}) + execute + assert_equal([{a: 1, b: 2}], $args) + end + + it "should symbolize hash argument keys" do + enqueue_method.call({a: 1, b: 2}, c: 3, d: 4) + execute assert_equal([{a: 1, b: 2}], $args) end - it "should symbolize argument hashes even if they were originally passed as strings" do + it "should symbolize hash argument keys even if they were originally passed as strings" do # The run() helper should convert these to symbols, just as if they'd # been passed through the DB. - execute('a' => 1, 'b' => 2) + enqueue_method.call({'a' => 1, 'b' => 2}, c: 3, d: 4) + execute assert_equal([{a: 1, b: 2}], $args) end + it "should symbolize keyword argument keys" do + enqueue_method.call(a: 1, b: 2) + execute + assert_equal({a: 1, b: 2}, $kwargs) + end + + it "should symbolize keyword argument keys even if they were originally passed as strings" do + # The run() helper should convert these to symbols, just as if they'd + # been passed through the DB. + enqueue_method.call('a' => 1, 'b' => 2) + execute + assert_equal({a: 1, b: 2}, $kwargs) + end + it "should handle keyword arguments just fine" do TestJobClass.class_eval do def run(a:, b: 4, c: 3) - $args = [a, b, c] + $kwargs = [a, b, c] end end - execute(a: 1, b: 2) - assert_equal [1, 2, 3], $args + enqueue_method.call(a: 1, b: 2) + execute + assert_equal [1, 2, 3], $kwargs end it "should handle keyword arguments even if they were originally passed as strings" do @@ -76,7 +120,8 @@ def run(a:, b: 4, c: 3) # The run() helper should convert these to symbols, just as if they'd # been passed through the DB. - execute('a' => 1, 'b' => 2) + enqueue_method.call('a' => 1, 'b' => 2) + execute assert_equal [1, 2, 3], $args end @@ -87,6 +132,7 @@ def run end end + enqueue_method.call execute assert_equal 0, $error_count end @@ -98,6 +144,7 @@ def run end end + enqueue_method.call execute assert_empty jobs_dataset end @@ -109,6 +156,7 @@ def run end end + enqueue_method.call execute if should_persist_job @@ -126,6 +174,7 @@ def run end end + enqueue_method.call execute if should_persist_job @@ -146,6 +195,7 @@ def default_resolve_action end end + enqueue_method.call execute if should_persist_job @@ -175,7 +225,8 @@ def default_resolve_action } ) - execute(5, 6) + enqueue_method.call(5, 6) + execute if defined?(ApplicationJob) assert_instance_of ActiveJob::QueueAdapters::QueAdapter::JobWrapper, passed_1 @@ -195,6 +246,7 @@ def run end end + enqueue_method.call job = execute assert_equal expected_job_count, active_jobs_dataset.count @@ -219,7 +271,10 @@ def run end assert_empty jobs_dataset - assert_raises(error_class) { execute } + assert_raises(error_class) do + enqueue_method.call + execute + end assert_equal expected_job_count, jobs_dataset.count end @@ -239,7 +294,10 @@ def handle_error(error) end end - error = assert_raises(RuntimeError) { execute } + error = assert_raises(RuntimeError) do + enqueue_method.call + execute + end assert_equal "Uh-oh!", error.message count, error_2 = $args @@ -256,7 +314,10 @@ def handle_error(error) end end - error = assert_raises(RuntimeError) { execute } + error = assert_raises(RuntimeError) do + enqueue_method.call + execute + end assert_equal "Uh-oh!", error.message assert_equal "Uh-oh!", notified_errors.first.message @@ -269,7 +330,10 @@ def handle_error(error) end end - assert_raises(RuntimeError) { execute } + assert_raises(RuntimeError) do + enqueue_method.call + execute + end assert_empty notified_errors end @@ -281,7 +345,10 @@ def handle_error(error) end end - error = assert_raises(RuntimeError) { execute } + error = assert_raises(RuntimeError) do + enqueue_method.call + execute + end assert_equal "Uh-oh!", error.message assert_equal expected_job_count, jobs_dataset.count @@ -298,7 +365,8 @@ def handle_error(error) job.update(finished_at: Time.now) gid = job.to_global_id(app: :test) - execute(job_object: gid.to_s) + enqueue_method.call(job_object: gid.to_s) + execute assert_equal( [{job_object: job}], @@ -330,7 +398,8 @@ def run(*args) end end - execute(3, 4) + enqueue_method.call(3, 4) + execute assert_equal [3, 4], $args Que.execute "ROLLBACK" @@ -345,9 +414,9 @@ def run(*args) include ActsLikeASynchronousJob let(:should_persist_job) { false } + let(:enqueue_method) { TestJobClass.method(:run) } - def execute(*args) - TestJobClass.run(*args) + def execute end end @@ -356,10 +425,13 @@ def execute(*args) include ActsLikeASynchronousJob let(:should_persist_job) { false } + let(:enqueue_method) { TestJobClass.method(:enqueue) } - def execute(*args) + before do TestJobClass.run_synchronously = true - TestJobClass.enqueue(*args) + end + + def execute end end @@ -367,23 +439,26 @@ def execute(*args) include ActsLikeAJob let(:should_persist_job) { true } + let(:enqueue_method) { TestJobClass.method(:enqueue) } - def execute(*args) + before do worker # Make sure worker is initialized. + end - job = TestJobClass.enqueue(*args) - attrs = job.que_attrs + def execute + assert_equal 1, jobs_dataset.count + attrs = jobs_dataset.first! job_buffer.push(Que::Metajob.new(attrs)) sleep_until_equal([attrs[:id]]) { results(message_type: :job_finished).map{|m| m.fetch(:metajob).id} } - if m = jobs_dataset.where(id: job.que_attrs[:id]).get(:last_error_message) + if m = jobs_dataset.where(id: attrs[:id]).get(:last_error_message) klass, message = m.split(": ", 2) raise Que.constantize(klass), message end - job + TestJobClass.new(attrs) end it "should handle subclassed jobs" do @@ -404,6 +479,7 @@ def run }) $args = [] + enqueue_method.call execute assert_equal [1, 2, 3], $args @@ -415,6 +491,7 @@ def run include ActsLikeAJob let(:should_persist_job) { true } + let(:enqueue_method) { TestJobClass.method(:perform_later) } before do Object.send :remove_const, :TestJobClass @@ -424,21 +501,20 @@ class ApplicationJob < ActiveJob::Base end class TestJobClass < ApplicationJob - def run(*args) + def run(*args, **kwargs) $args = args + $kwargs = kwargs end end + + worker # Make sure worker is initialized. end after do Object.send :remove_const, :ApplicationJob end - def execute(*args) - worker # Make sure worker is initialized. - - TestJobClass.perform_later(*args) - + def execute assert_equal 1, jobs_dataset.count attrs = jobs_dataset.first! @@ -464,7 +540,8 @@ def perform(*args) end end - execute("arg1" => 1, "arg2" => 2) + enqueue_method.call("arg1" => 1, "arg2" => 2) + execute assert_equal([{'arg1' => 1, 'arg2' => 2}], $args) end @@ -473,7 +550,10 @@ def perform(*args) class TestJobClass < ApplicationJob; end - error = assert_raises(Que::Error) { execute(1, 2) } + error = assert_raises(Que::Error) do + enqueue_method.call(1, 2) + execute + end assert_equal "Job class TestJobClass didn't define a run() method!", error.message assert_equal expected_job_count, jobs_dataset.count end diff --git a/lib/que/job_methods.rb b/lib/que/job_methods.rb index 55e2696b..0e07b2c4 100644 --- a/lib/que/job_methods.rb +++ b/lib/que/job_methods.rb @@ -39,12 +39,16 @@ module JobMethods # Run the job with error handling and cleanup logic. Optionally support # overriding the args, because it's necessary when jobs are invoked from # ActiveJob. - def _run(args: nil, reraise_errors: false) + def _run(args: nil, kwargs: nil, reraise_errors: false) if args.nil? && que_target args = que_target.que_attrs.fetch(:args) end - run(*args) + if kwargs.nil? && que_target + kwargs = que_target.que_attrs.fetch(:kwargs) + end + + run(*args, **kwargs) default_resolve_action if que_target && !que_target.que_resolved rescue => error raise error unless que_target diff --git a/lib/que/listener.spec.rb b/lib/que/listener.spec.rb index 9b7fc5cc..ddfe37b0 100644 --- a/lib/que/listener.spec.rb +++ b/lib/que/listener.spec.rb @@ -74,7 +74,7 @@ def notify_multiple(notifications) end it "should return frozen messages" do - notify(message_type: 'type_1', value: 4) + notify({ message_type: 'type_1', value: 4 }) result = listener.wait_for_grouped_messages(10)[:type_1].first assert_equal({value: 4}, result) @@ -117,7 +117,7 @@ def notify_multiple(notifications) q = Queue.new Que.error_notifier = proc { |e| q.push(e) } - notify(message_type: 'job_available', priority: 2, id: 4) + notify({ message_type: 'job_available', priority: 2, id: 4 }) assert_equal({}, listener.wait_for_grouped_messages(10)) error = q.pop @@ -188,11 +188,11 @@ def assert_ignored_notification(payload) describe "unlisten" do it "should stop listening for new messages" do - notify(message_type: 'type_1') + notify({ message_type: 'type_1' }) connection.drain_notifications listener.unlisten - notify(message_type: 'type_1') + notify({ message_type: 'type_1' }) # Execute a new query to fetch any new notifications. connection.execute "SELECT 1" @@ -200,7 +200,7 @@ def assert_ignored_notification(payload) end it "when unlistening should not leave any residual messages" do - 5.times { notify(message_type: 'type_1') } + 5.times { notify({ message_type: 'type_1' }) } listener.unlisten assert_nil connection.next_notification diff --git a/lib/que/locker.spec.rb b/lib/que/locker.spec.rb index f2335b91..af174c2c 100644 --- a/lib/que/locker.spec.rb +++ b/lib/que/locker.spec.rb @@ -188,7 +188,7 @@ def assert_que_locker_insertion( describe "on startup" do it "should do batch polls for jobs in its specified queue" do job1, job2 = BlockJob.enqueue, BlockJob.enqueue - job3 = Que::Job.enqueue(queue: 'my_special_queue') + job3 = Que::Job.enqueue(job_options: { queue: 'my_special_queue' }) locker_settings[:poll] = true locker @@ -205,9 +205,9 @@ def assert_que_locker_insertion( end it "should do batch polls for jobs in its specified queues" do - job1 = BlockJob.enqueue(queue: 'queue1') - job2 = BlockJob.enqueue(queue: 'queue2') - job3 = Que::Job.enqueue(queue: 'my_special_queue') + job1 = BlockJob.enqueue(job_options: { queue: 'queue1' }) + job2 = BlockJob.enqueue(job_options: { queue: 'queue2' }) + job3 = Que::Job.enqueue(job_options: { queue: 'my_special_queue' }) locker_settings[:queues] = ['queue1', 'queue2'] locker @@ -226,8 +226,8 @@ def assert_que_locker_insertion( it "should request only enough jobs to fill the buffer" do # Three BlockJobs will tie up the low-priority workers. - ids = 3.times.map { BlockJob.enqueue(priority: 100).que_attrs[:id] } - ids += 9.times.map { Que::Job.enqueue(priority: 101).que_attrs[:id] } + ids = 3.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] } + ids += 9.times.map { Que::Job.enqueue(job_options: { priority: 101 }).que_attrs[:id] } locker 3.times { $q1.pop } @@ -308,8 +308,8 @@ def run it "should request as many as necessary to reach the maximum_buffer_size" do # Three BlockJobs to tie up the low-priority workers. - ids = 3.times.map { BlockJob.enqueue(priority: 100).que_attrs[:id] } - ids += [Que::Job.enqueue(priority: 101).que_attrs[:id]] + ids = 3.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] } + ids += [Que::Job.enqueue(job_options: { priority: 101 }).que_attrs[:id]] locker_settings.clear locker_settings[:poll_interval] = 0.01 @@ -324,7 +324,7 @@ def run ids += Que.transaction do 8.times.map do - Que::Job.enqueue(priority: 101).que_attrs[:id] + Que::Job.enqueue(job_options: { priority: 101 }).que_attrs[:id] end end @@ -388,7 +388,7 @@ def run end it "should trigger a new poll when the buffer drops to the minimum size" do - ids = 12.times.map { BlockJob.enqueue(priority: 100).que_attrs[:id] } + ids = 12.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] } locker_settings[:poll] = true locker_settings[:poll_interval] = 0.01 @@ -473,8 +473,8 @@ def run sleep_until_equal(1) { DB[:que_lockers].count } assert_equal ['queue_1', 'queue_2'], DB[:que_lockers].get(:queues) - BlockJob.enqueue queue: 'queue_1' - BlockJob.enqueue queue: 'queue_2' + BlockJob.enqueue(job_options: { queue: 'queue_1' }) + BlockJob.enqueue(job_options: { queue: 'queue_2' }) $q1.pop; $q1.pop @@ -559,12 +559,12 @@ def run sleep_until_equal(1) { DB[:que_lockers].count } - BlockJob.enqueue(priority: 5) + BlockJob.enqueue(job_options: { priority: 5 }) $q1.pop - ids = 3.times.map { Que::Job.enqueue(priority: 5).que_attrs[:id] } + ids = 3.times.map { Que::Job.enqueue(job_options: { priority: 5 }).que_attrs[:id] } sleep_until_equal(ids) { ids_in_local_queue } - id = Que::Job.enqueue(priority: 10).que_attrs[:id] + id = Que::Job.enqueue(job_options: { priority: 10 }).que_attrs[:id] sleep 0.05 # Hacky. refute_includes ids_in_local_queue, id @@ -579,13 +579,13 @@ def run sleep_until_equal(1) { DB[:que_lockers].count } - block_job_id = BlockJob.enqueue(priority: 5).que_attrs[:id] + block_job_id = BlockJob.enqueue(job_options: { priority: 5 }).que_attrs[:id] $q1.pop - ids = 3.times.map { Que::Job.enqueue(priority: 5).que_attrs[:id] } + ids = 3.times.map { Que::Job.enqueue(job_options: { priority: 5 }).que_attrs[:id] } sleep_until_equal(ids) { ids_in_local_queue } - id = Que::Job.enqueue(priority: 2).que_attrs[:id] + id = Que::Job.enqueue(job_options: { priority: 2 }).que_attrs[:id] sleep_until { ids_in_local_queue == [id] + ids[0..1] } sleep_until { locked_ids == (ids_in_local_queue + [block_job_id]).sort } @@ -730,7 +730,7 @@ def run(runs:, index:) if runs < 10 delay = rand > 0.5 ? 1 : 0 - conn.execute(%(INSERT INTO que_jobs (job_class, args, run_at, job_schema_version) VALUES ('QueSpec::RunOnceTestJob', '[{"runs":#{runs + 1},"index":#{index}}]', now() + '#{delay} microseconds', #{Que.job_schema_version}))) + conn.execute(%(INSERT INTO que_jobs (job_class, kwargs, run_at, job_schema_version) VALUES ('QueSpec::RunOnceTestJob', '{"runs":#{runs + 1},"index":#{index}}', now() + '#{delay} microseconds', #{Que.job_schema_version}))) end finish @@ -739,7 +739,7 @@ def run(runs:, index:) end end - lockers = 4.times.map { Que::Locker.new(locker_settings) } + lockers = 4.times.map { Que::Locker.new(**locker_settings) } 5.times { |i| QueSpec::RunOnceTestJob.enqueue(runs: 1, index: i) } @@ -756,7 +756,7 @@ def run(runs:, index:) job_ids: jobs_dataset.select_order_map(:id), }, { - index_runs: jobs_dataset.exclude(finished_at: nil).select_map(:args).map{|a| [a.first[:index], a.first[:runs]]}.sort, + index_runs: jobs_dataset.exclude(finished_at: nil).select_map(:kwargs).map{ |a| [a[:index], a[:runs]]}.sort, job_ids: DB[:test_data].select_order_map(:job_id), } ) diff --git a/lib/que/migrations.current_schema.spec.rb b/lib/que/migrations.current_schema.spec.rb index 6b4f80aa..e4d21384 100644 --- a/lib/que/migrations.current_schema.spec.rb +++ b/lib/que/migrations.current_schema.spec.rb @@ -19,7 +19,7 @@ def assert_constraint_error(name, &block) ].each do |args| assert_constraint_error 'valid_args' do DB[:que_jobs]. - insert(job_class: 'Que::Job', args: JSON.generate(args, quirks_mode: true)) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, args: JSON.generate(args, quirks_mode: true)) end end end @@ -27,7 +27,7 @@ def assert_constraint_error(name, &block) it "should make sure that a job has valid tags" do assert_constraint_error 'valid_data' do DB[:que_jobs]. - insert(job_class: 'Que::Job', data: JSON.dump(tags: {})) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, data: JSON.dump(tags: {})) end [ @@ -40,7 +40,7 @@ def assert_constraint_error(name, &block) ].each do |tags| assert_constraint_error 'valid_data' do DB[:que_jobs]. - insert(job_class: 'Que::Job', data: JSON.dump(tags: tags)) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, data: JSON.dump(tags: tags)) end end @@ -54,12 +54,12 @@ def assert_constraint_error(name, &block) ].each do |tags| assert_constraint_error 'valid_data' do DB[:que_jobs]. - insert(job_class: 'Que::Job', data: JSON.dump(tags: tags)) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, data: JSON.dump(tags: tags)) end assert_constraint_error 'valid_data' do DB[:que_jobs]. - insert(job_class: 'Que::Job', data: JSON.dump(tags: (tags << "valid_tag").shuffle)) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, data: JSON.dump(tags: (tags << "valid_tag").shuffle)) end end end @@ -67,7 +67,7 @@ def assert_constraint_error(name, &block) it "should make sure that a job_class does not exceed 200 characters" do assert_constraint_error 'job_class_length' do DB[:que_jobs]. - insert(job_class: 'a' * 201) + insert(job_class: 'a' * 201, job_schema_version: Que.job_schema_version) end # Make sure the check constraint also handles wrapped ActiveJob jobs. @@ -75,7 +75,8 @@ def assert_constraint_error(name, &block) DB[:que_jobs]. insert( job_class: 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper', - args: JSON.dump([{job_class: '2' * 501}]) + args: JSON.dump([{job_class: '2' * 501}]), + job_schema_version: Que.job_schema_version, ) end @@ -87,28 +88,29 @@ def assert_constraint_error(name, &block) data: JSON.dump( args: [], tags: [], - ) + ), + job_schema_version: Que.job_schema_version, ) end it "should make sure that a queue does not exceed 100 characters" do assert_constraint_error 'queue_length' do DB[:que_jobs]. - insert(job_class: 'Que::Job', queue: 'a' * 101) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, queue: 'a' * 101) end end it "should make sure that a job error message does not exceed 500 characters" do assert_constraint_error 'error_length' do DB[:que_jobs]. - insert(job_class: 'Que::Job', last_error_message: 'a' * 501) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, last_error_message: 'a' * 501) end end it "should make sure that a job error backtrace does not exceed 10000 characters" do assert_constraint_error 'error_length' do DB[:que_jobs]. - insert(job_class: 'Que::Job', last_error_backtrace: 'a' * 10001) + insert(job_class: 'Que::Job', job_schema_version: Que.job_schema_version, last_error_backtrace: 'a' * 10001) end end end diff --git a/lib/que/migrations.rb b/lib/que/migrations.rb index da74ec1b..80b775df 100644 --- a/lib/que/migrations.rb +++ b/lib/que/migrations.rb @@ -4,7 +4,7 @@ module Que module Migrations # In order to ship a schema change, add the relevant up and down sql files # to the migrations directory, and bump the version here. - CURRENT_VERSION = 5 + CURRENT_VERSION = 6 class << self def migrate!(version:) diff --git a/lib/que/migrations.spec.rb b/lib/que/migrations.spec.rb index 29609724..dcb60b3b 100644 --- a/lib/que/migrations.spec.rb +++ b/lib/que/migrations.spec.rb @@ -109,7 +109,7 @@ end after do - # DB[:que_jobs].delete + DB[:que_jobs].delete Que::Migrations.migrate!(version: Que::Migrations::CURRENT_VERSION) end @@ -227,9 +227,8 @@ def assert_up_migration(v3: {}, v4: {}) end it "when migrating down should remove finished and expired jobs so that they aren't run repeatedly" do - a, b, c = 3.times.map do - DB[:que_jobs].returning(:id).insert(job_class: 'Que::Job').first[:id] - end + Que::Migrations.migrate!(version: Que::Migrations::CURRENT_VERSION) + a, b, c = 3.times.map { Que::Job.enqueue.que_attrs[:id] } jobs_dataset.where(id: a).update(finished_at: Time.now) jobs_dataset.where(id: b).update(finished_at: Time.now) @@ -239,6 +238,8 @@ def assert_up_migration(v3: {}, v4: {}) end assert_equal [c], jobs_dataset.select_map(:id) + + Que::Migrations.migrate!(version: Que::Migrations::CURRENT_VERSION) end end end diff --git a/lib/que/migrations.state_trigger_spec.rb b/lib/que/migrations.state_trigger_spec.rb index 25acddec..66e54521 100644 --- a/lib/que/migrations.state_trigger_spec.rb +++ b/lib/que/migrations.state_trigger_spec.rb @@ -77,6 +77,7 @@ def assert_notification( job_class: "CustomJobClass", queue: "custom_queue", data: JSON.dump(tags: ["tag_1", "tag_2"]), + job_schema_version: Que.job_schema_version, ) end end @@ -91,6 +92,7 @@ def assert_notification( DB[:que_jobs].insert( job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper", args: JSON.dump([{job_class: "WrappedJobClass"}]), + job_schema_version: Que.job_schema_version, ) end end @@ -112,6 +114,7 @@ def assert_notification( DB[:que_jobs].insert( job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper", args: JSON.dump(args), + job_schema_version: Que.job_schema_version, ) end end @@ -127,7 +130,7 @@ def assert_notification( previous_state: "nonexistent", current_state: "ready", ) do - DB[:que_jobs].insert(job_class: "MyJobClass") + DB[:que_jobs].insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version) end end @@ -139,14 +142,14 @@ def assert_notification( current_state: "scheduled", run_at: future, ) do - DB[:que_jobs].insert(job_class: "MyJobClass", run_at: future) + DB[:que_jobs].insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version, run_at: future) end end end describe "when updating a job" do it "and marking it as finished should issue a notification containing the job's class, error count, etc." do - record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass").first + record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version).first assert get_message assert_notification( @@ -160,7 +163,7 @@ def assert_notification( end it "and marking it as errored" do - record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass").first + record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version).first assert get_message assert_notification( @@ -174,7 +177,7 @@ def assert_notification( end it "and marking it as scheduled for the future" do - record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass").first + record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version).first assert get_message future = Time.now + 36000 @@ -190,7 +193,7 @@ def assert_notification( end it "and marking it as expired" do - record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass").first + record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version).first assert get_message assert_notification( @@ -204,7 +207,7 @@ def assert_notification( end it "and not changing the state should not emit a message" do - id = DB[:que_jobs].insert(job_class: "MyJobClass", run_at: Time.now + 36000) + id = DB[:que_jobs].insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version, run_at: Time.now + 36000) assert get_message @@ -216,7 +219,7 @@ def assert_notification( describe "when deleting a job" do it "should issue a notification containing the job's class, queue, etc." do - record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass").first + record = DB[:que_jobs].returning(:id, :run_at).insert(job_class: "MyJobClass", job_schema_version: Que.job_schema_version).first assert get_message assert_notification( diff --git a/lib/que/migrations.work_job_trigger_spec.rb b/lib/que/migrations.work_job_trigger_spec.rb index bf336183..ac89c25b 100644 --- a/lib/que/migrations.work_job_trigger_spec.rb +++ b/lib/que/migrations.work_job_trigger_spec.rb @@ -67,7 +67,7 @@ def listen_connection conn.async_exec "LISTEN que_listener_1" - Que::Job.enqueue run_at: Time.now + 60 + Que::Job.enqueue(job_options: { run_at: Time.now + 60 }) assert_nil conn.wait_for_notify(0.01) end diff --git a/lib/que/migrations/6/down.sql b/lib/que/migrations/6/down.sql new file mode 100644 index 00000000..7d7aaff7 --- /dev/null +++ b/lib/que/migrations/6/down.sql @@ -0,0 +1,8 @@ +DROP INDEX que_jobs_kwargs_gin_idx; +ALTER TABLE que_jobs DROP COLUMN kwargs; + +ALTER INDEX que_poll_idx RENAME TO que_poll_idx_with_job_schema_version; +CREATE INDEX que_poll_idx ON que_jobs (queue, priority, run_at, id) WHERE (finished_at IS NULL AND expired_at IS NULL); + +ALTER TABLE que_jobs ALTER COLUMN job_schema_version SET DEFAULT 1; +ALTER TABLE que_jobs ALTER COLUMN job_schema_version DROP NOT NULL; diff --git a/lib/que/migrations/6/up.sql b/lib/que/migrations/6/up.sql new file mode 100644 index 00000000..22582915 --- /dev/null +++ b/lib/que/migrations/6/up.sql @@ -0,0 +1,8 @@ +ALTER TABLE que_jobs ADD COLUMN kwargs JSONB NOT NULL DEFAULT '{}'; +CREATE INDEX que_jobs_kwargs_gin_idx ON que_jobs USING gin (kwargs jsonb_path_ops); + +DROP INDEX que_poll_idx; +ALTER INDEX que_poll_idx_with_job_schema_version RENAME TO que_poll_idx; + +ALTER TABLE que_jobs ALTER COLUMN job_schema_version DROP DEFAULT; +ALTER TABLE que_jobs ALTER COLUMN job_schema_version SET NOT NULL; diff --git a/lib/que/poller.spec.rb b/lib/que/poller.spec.rb index c1d87507..57d39bd8 100644 --- a/lib/que/poller.spec.rb +++ b/lib/que/poller.spec.rb @@ -87,8 +87,8 @@ def poll( end it "should skip jobs in the wrong queue" do - one = Que::Job.enqueue(queue: 'one').que_attrs[:id] - two = Que::Job.enqueue(queue: 'two').que_attrs[:id] + one = Que::Job.enqueue(job_options: { queue: 'one' }).que_attrs[:id] + two = Que::Job.enqueue(job_options: { queue: 'two' }).que_attrs[:id] assert_equal [one], poll(queue_name: 'one') end @@ -112,8 +112,8 @@ def poll( end it "should skip jobs that don't meet the priority requirements" do - one = Que::Job.enqueue(priority: 7).que_attrs[:id] - two = Que::Job.enqueue(priority: 8).que_attrs[:id] + one = Que::Job.enqueue(job_options: { priority: 7 }).que_attrs[:id] + two = Que::Job.enqueue(job_options: { priority: 8 }).que_attrs[:id] assert_equal [one], poll(priorities: {7 => 5}) end @@ -152,9 +152,9 @@ def assert_poll(priorities:, locked:) end it "should only work a job whose scheduled time to run has passed" do - future1 = Que::Job.enqueue(run_at: Time.now + 30).que_attrs[:id] - past = Que::Job.enqueue(run_at: Time.now - 30).que_attrs[:id] - future2 = Que::Job.enqueue(run_at: Time.now + 30).que_attrs[:id] + future1 = Que::Job.enqueue(job_options: { run_at: Time.now + 30 }).que_attrs[:id] + past = Que::Job.enqueue(job_options: { run_at: Time.now - 30 }).que_attrs[:id] + future2 = Que::Job.enqueue(job_options: { run_at: Time.now + 30 }).que_attrs[:id] assert_equal [past], poll end @@ -170,9 +170,9 @@ def assert_poll(priorities:, locked:) end it "should prefer a job that was scheduled to run longer ago" do - id1 = Que::Job.enqueue(run_at: Time.now - 30).que_attrs[:id] - id2 = Que::Job.enqueue(run_at: Time.now - 60).que_attrs[:id] - id3 = Que::Job.enqueue(run_at: Time.now - 30).que_attrs[:id] + id1 = Que::Job.enqueue(job_options: { run_at: Time.now - 30 }).que_attrs[:id] + id2 = Que::Job.enqueue(job_options: { run_at: Time.now - 60 }).que_attrs[:id] + id3 = Que::Job.enqueue(job_options: { run_at: Time.now - 30 }).que_attrs[:id] assert_equal [id2], poll(priorities: {200 => 1}) end @@ -180,7 +180,7 @@ def assert_poll(priorities:, locked:) it "should prefer a job that was queued earlier" do run_at = Time.now - 30 - a, b, c = 3.times.map { Que::Job.enqueue(run_at: run_at).que_attrs[:id] } + a, b, c = 3.times.map { Que::Job.enqueue(job_options: { run_at: run_at }).que_attrs[:id] } assert_equal [a, b], poll(priorities: {200 => 2}) end diff --git a/lib/que/sequel/model.rb b/lib/que/sequel/model.rb index 52b8469b..9effd5be 100644 --- a/lib/que/sequel/model.rb +++ b/lib/que/sequel/model.rb @@ -40,8 +40,11 @@ def by_tag(tag) where(QUALIFIED_TABLE[:data].pg_jsonb.contains(JSON.dump(tags: [tag]))) end - def by_args(*args) - where(QUALIFIED_TABLE[:args].pg_jsonb.contains(JSON.dump(args))) + def by_args(*args, **kwargs) + where( + QUALIFIED_TABLE[:args].pg_jsonb.contains(JSON.dump(args)) & + QUALIFIED_TABLE[:kwargs].pg_jsonb.contains(JSON.dump(kwargs)) + ) end end end diff --git a/lib/que/sequel/model.spec.rb b/lib/que/sequel/model.spec.rb index 822c8c8e..224c9207 100644 --- a/lib/que/sequel/model.spec.rb +++ b/lib/que/sequel/model.spec.rb @@ -7,7 +7,7 @@ require "que/sequel/model" end - def enqueue_job(*args) + ruby2_keywords def enqueue_job(*args) Que::Job.enqueue(*args).que_attrs[:id] end @@ -101,8 +101,8 @@ def assert_ids(*expected) describe "by_job_class" do it "should return a dataset of jobs with that job class" do - a = enqueue_job(job_class: "CustomJobClass") - b = enqueue_job(job_class: "BlockJob") + a = enqueue_job(job_options: { job_class: "CustomJobClass" }) + b = enqueue_job(job_options: { job_class: "BlockJob" }) c = enqueue_job assert_ids(a) { |ds| ds.by_job_class("CustomJobClass") } @@ -113,8 +113,8 @@ def assert_ids(*expected) end it "should be compatible with ActiveModel job classes" do - a = enqueue_job({job_class: "WrappedJobClass"}, {job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper"}) - b = enqueue_job({job_class: "OtherWrappedJobClass"}, {job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper"}) + a = enqueue_job({job_class: "WrappedJobClass"}, job_options: { job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper" }) + b = enqueue_job({job_class: "OtherWrappedJobClass"}, job_options: { job_class: "ActiveJob::QueueAdapters::QueAdapter::JobWrapper" }) c = enqueue_job assert_ids(a) { |ds| ds.by_job_class("WrappedJobClass") } @@ -126,7 +126,7 @@ def assert_ids(*expected) describe "by_queue" do it "should return a dataset of jobs in that queue" do a = enqueue_job - b = enqueue_job(queue: "other_queue") + b = enqueue_job(job_options: { queue: "other_queue" }) assert_ids(a) { |ds| ds.by_queue("default") } assert_ids(b) { |ds| ds.by_queue("other_queue") } @@ -136,8 +136,8 @@ def assert_ids(*expected) describe "by_tag" do it "should return a dataset of jobs with the given tag" do - a = enqueue_job(tags: ["tag_1"]) - b = enqueue_job(tags: ["tag_2"]) + a = enqueue_job(job_options: { tags: ["tag_1"] }) + b = enqueue_job(job_options: { tags: ["tag_2"] }) assert_ids(a) { |ds| ds.by_tag("tag_1") } assert_ids(b) { |ds| ds.by_tag("tag_2") } @@ -151,6 +151,7 @@ def assert_ids(*expected) b = enqueue_job arg: "arg_string" c = enqueue_job arg_hash: {arg: "arg_string"} d = enqueue_job + e = enqueue_job 'an argument', a_keyword: 'another_argument' assert_ids(a) { |ds| ds.by_args("arg_string") } assert_ids { |ds| ds.by_args("nonexistent_arg_string") } @@ -158,6 +159,9 @@ def assert_ids(*expected) assert_ids { |ds| ds.by_args(arg: "nonexistent_arg_string") } assert_ids(c) { |ds| ds.by_args(arg_hash: {arg: "arg_string"}) } assert_ids { |ds| ds.by_args(arg_hash: {arg: "nonexistent_arg_string"}) } + assert_ids(e) { |ds| ds.by_args('an argument', a_keyword: 'another_argument') } + assert_ids { |ds| ds.by_args('an argument', a_keyword: 'blah') } + assert_ids { |ds| ds.by_args('a very heated argument', a_keyword: 'another_argument') } end end end diff --git a/lib/que/utils/introspection.spec.rb b/lib/que/utils/introspection.spec.rb index bf72bc2e..c3711fa1 100644 --- a/lib/que/utils/introspection.spec.rb +++ b/lib/que/utils/introspection.spec.rb @@ -49,7 +49,7 @@ describe 'job_states' do it "should return a list of the jobs currently being run" do - BlockJob.enqueue priority: 2 + BlockJob.enqueue(job_options: { priority: 2 }) # Ensure that the portion of the SQL query that accounts for bigint # job_ids functions correctly. @@ -67,7 +67,7 @@ state = states.first assert_equal \ %i(priority run_at id job_class error_count last_error_message queue - last_error_backtrace finished_at expired_at args data job_schema_version ruby_hostname ruby_pid), + last_error_backtrace finished_at expired_at args data job_schema_version kwargs ruby_hostname ruby_pid), state.keys assert_equal 2, state[:priority] diff --git a/lib/que/utils/queue_management.spec.rb b/lib/que/utils/queue_management.spec.rb index 521011c1..b8b0cf96 100644 --- a/lib/que/utils/queue_management.spec.rb +++ b/lib/que/utils/queue_management.spec.rb @@ -5,7 +5,7 @@ describe Que::Utils::QueueManagement do describe "clear!" do it "should clear all jobs from the queue" do - jobs_dataset.insert job_class: "Que::Job" + jobs_dataset.insert(job_class: "Que::Job", job_schema_version: Que.job_schema_version) assert_equal 1, jobs_dataset.count Que.clear! assert_equal 0, jobs_dataset.count diff --git a/lib/que/utils/ruby2_keywords.rb b/lib/que/utils/ruby2_keywords.rb new file mode 100644 index 00000000..ec54c5b6 --- /dev/null +++ b/lib/que/utils/ruby2_keywords.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +# Temporary module allowing ruby2 keyword args to be extracted from an *args splat +# Allows us to ensure consistent behaviour when running on ruby 2 vs ruby 3 +# We can remove this if/when we drop support for ruby 2 + +require 'json' + +module Que + module Utils + module Ruby2Keywords + def split_out_ruby2_keywords(args) + return [args, {}] unless args.last&.is_a?(Hash) && Hash.ruby2_keywords_hash?(args.last) + + [args[0..-2], args.last] + end + end + end +end diff --git a/lib/que/utils/ruby2_keywords.spec.rb b/lib/que/utils/ruby2_keywords.spec.rb new file mode 100644 index 00000000..84b5e413 --- /dev/null +++ b/lib/que/utils/ruby2_keywords.spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Que::Utils::Ruby2Keywords do + describe "split_out_ruby2_keywords" do + describe "when last argument is not a hash" do + let(:args_splat) { ["string"] } + + it "does not split arguments" do + args, kwargs = Que.split_out_ruby2_keywords(args_splat) + assert_equal(["string"], args) + assert_equal({}, kwargs) + end + end + + describe "when last argument a hash literal" do + let(:args_splat) { ["string", { a: 1, b: 2}] } + + it "does not split arguments" do + args, kwargs = Que.split_out_ruby2_keywords(args_splat) + assert_equal(["string", {a: 1, b: 2}], args) + assert_equal({}, kwargs) + end + end + + describe "when last argument is flagged as a ruby2 keywords hash" do + let(:args_splat) { ["string", Hash.ruby2_keywords_hash({ a: 1, b: 2})] } + + it "splits keywords out of arguments" do + args, kwargs = Que.split_out_ruby2_keywords(args_splat) + assert_equal(["string"], args) + assert_equal({ a: 1, b: 2}, kwargs) + end + end + end +end diff --git a/lib/que/utils/transactions.spec.rb b/lib/que/utils/transactions.spec.rb index 2dc2673a..79ab2e4a 100644 --- a/lib/que/utils/transactions.spec.rb +++ b/lib/que/utils/transactions.spec.rb @@ -7,7 +7,7 @@ refute Que.in_transaction? Que.transaction do assert Que.in_transaction? - Que.execute "INSERT INTO que_jobs (job_class) VALUES ('MyJobClass');" + Que.execute "INSERT INTO que_jobs (job_class, job_schema_version) VALUES ('MyJobClass', #{Que.job_schema_version});" assert Que.in_transaction? end refute Que.in_transaction? diff --git a/lib/que/version.rb b/lib/que/version.rb index db283159..669ea7a3 100644 --- a/lib/que/version.rb +++ b/lib/que/version.rb @@ -1,9 +1,9 @@ # frozen_string_literal: true module Que - VERSION = '1.4.0' + VERSION = '2.0.0.beta1' def self.job_schema_version - 1 + 2 end end diff --git a/lib/que/worker.rb b/lib/que/worker.rb index 656f0b7d..91e7d02a 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -125,7 +125,7 @@ def work_job(metajob) log_message[:event] = :job_worked end - Que.log(log_message) + Que.log(**log_message) end instance diff --git a/lib/que/worker.spec.rb b/lib/que/worker.spec.rb index eeaace8c..435e213f 100644 --- a/lib/que/worker.spec.rb +++ b/lib/que/worker.spec.rb @@ -65,7 +65,7 @@ def finished_job_ids end end - [1, 2, 3].each { |i| WorkerJob.enqueue i, priority: i } + [1, 2, 3].each { |i| WorkerJob.enqueue(i, job_options: { priority: i }) } job_ids = jobs_dataset.order_by(:priority).select_map(:id) run_jobs @@ -155,7 +155,7 @@ def run(*args) it "should only take jobs that meet it priority requirement" do jobs = (1..20).map do |i| - Que::Job.enqueue(i, priority: i).que_attrs + Que::Job.enqueue(i, job_options: { priority: i }).que_attrs end job_ids = jobs.map { |j| j[:id] } @@ -183,7 +183,7 @@ def assert_retry_cadence( expected_error_message: "RuntimeError: Error!", expected_backtrace: /\A#{__FILE__}/ ) - jobs_dataset.insert(job_class: job_class) + jobs_dataset.insert(job_class: job_class, job_schema_version: Que.job_schema_version) error_count = 0 delays.each do |delay| @@ -216,8 +216,8 @@ def assert_retry_cadence( it "should record/report the error and not crash the worker" do # First job should error, second job should still be worked. job_ids = [ - WorkerJob.enqueue(priority: 1), - Que::Job.enqueue(priority: 2), + WorkerJob.enqueue(job_options: { priority: 1 }), + Que::Job.enqueue(job_options: { priority: 2 }), ].map{|j| j.que_attrs[:id]} run_jobs @@ -369,7 +369,7 @@ def run(*args) end it "when it reaches a maximum should mark the job as expired" do - job = Que.enqueue(job_class: "NonexistentJobClass") + job = Que.enqueue(job_options: { job_class: "NonexistentJobClass" }) ds = jobs_dataset.where(id: job.que_attrs[:id]) assert_equal 1, ds.update(error_count: 15) @@ -394,7 +394,7 @@ def run(*args) 4, 19, 84, 259, job_class: "J", expected_error_message: /undefined method/, - expected_backtrace: /que\/worker\.rb/ + expected_backtrace: false assert_instance_of NoMethodError, notified_errors.first[:error] end diff --git a/que.gemspec b/que.gemspec index e0599713..3a44291d 100644 --- a/que.gemspec +++ b/que.gemspec @@ -13,6 +13,8 @@ Gem::Specification.new do |spec| spec.homepage = 'https://github.com/que-rb/que' spec.license = 'MIT' + spec.required_ruby_version = '>= 2.7.0' + files_to_exclude = [ /\A\.circleci/, /\AGemfile/, diff --git a/spec/gemfiles/Gemfile.6.0 b/spec/gemfiles/Gemfile-rails-6.0 similarity index 100% rename from spec/gemfiles/Gemfile.6.0 rename to spec/gemfiles/Gemfile-rails-6.0 diff --git a/spec/gemfiles/Gemfile.5.2 b/spec/gemfiles/Gemfile-rails-6.1 similarity index 83% rename from spec/gemfiles/Gemfile.5.2 rename to spec/gemfiles/Gemfile-rails-6.1 index 9770380c..9bc0f95d 100644 --- a/spec/gemfiles/Gemfile.5.2 +++ b/spec/gemfiles/Gemfile-rails-6.1 @@ -5,8 +5,8 @@ gem 'que', path: '../..' group :development, :test do gem 'rake' - gem 'activerecord', '~> 5.2', require: nil - gem 'activejob', '~> 5.2', require: nil + gem 'activerecord', '~> 6.1', require: nil + gem 'activejob', '~> 6.1', require: nil gem 'sequel', require: nil gem 'connection_pool', require: nil gem 'pond', require: nil diff --git a/spec/gemfiles/Gemfile.4.2 b/spec/gemfiles/Gemfile.4.2 deleted file mode 100644 index 4d5ccc96..00000000 --- a/spec/gemfiles/Gemfile.4.2 +++ /dev/null @@ -1,23 +0,0 @@ -source 'https://rubygems.org' - -gem 'que', path: '../..' - -group :development, :test do - gem 'rake' - - gem 'activerecord', '~> 4.2.0', require: nil - gem 'activejob', '~> 4.2.0', require: nil - gem 'sequel', require: nil - gem 'connection_pool', require: nil - gem 'pond', require: nil - gem 'pg', '~> 0.20.0', require: nil, platform: :ruby - gem 'pg_jruby', require: nil, platform: :jruby -end - -group :test do - gem 'minitest', '~> 5.10.1' - gem 'minitest-profile', '0.0.2' - gem 'minitest-hooks', '1.4.0' - gem 'pry' - gem 'pg_examiner', '~> 0.5.2' -end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 208f8df3..dc775c82 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -158,7 +158,7 @@ class QueSpec < Minitest::Spec end let :locker do - Que::Locker.new(locker_settings) + Que::Locker.new(**locker_settings) end let :job_buffer do @@ -192,8 +192,8 @@ def sleep_until_equal(expected, timeout: SLEEP_UNTIL_TIMEOUT) end || raise("sleep_until_equal: expected #{expected.inspect}, got #{actual.inspect}") end - def sleep_until(*args, &block) - sleep_until?(*args, &block) || raise("sleep_until timeout reached") + def sleep_until(**args, &block) + sleep_until?(**args, &block) || raise("sleep_until timeout reached") end def sleep_until?(timeout: SLEEP_UNTIL_TIMEOUT)