Giter VIP home page Giter VIP logo

que's Introduction

Que tests

This README and the rest of the docs on the master branch all refer to Que 2.x. For older versions, please refer to the docs on the respective branches: 1.x, or 0.x.

TL;DR: Que is a high-performance job queue that improves the reliability of your application by protecting your jobs with the same ACID guarantees as the rest of your data.

Que ("keɪ", or "kay") is a queue for Ruby and PostgreSQL that manages jobs using advisory locks, which gives it several advantages over other RDBMS-backed queues:

  • Concurrency - Workers don't block each other when trying to lock jobs, as often occurs with "SELECT FOR UPDATE"-style locking. This allows for very high throughput with a large number of workers.
  • Efficiency - Locks are held in memory, so locking a job doesn't incur a disk write. These first two points are what limit performance with other queues. Under heavy load, Que's bottleneck is CPU, not I/O.
  • Safety - If a Ruby process dies, the jobs it's working won't be lost, or left in a locked or ambiguous state - they immediately become available for any other worker to pick up.

Additionally, there are the general benefits of storing jobs in Postgres, alongside the rest of your data, rather than in Redis or a dedicated queue:

  • Transactional Control - Queue a job along with other changes to your database, and it'll commit or rollback with everything else. If you're using ActiveRecord or Sequel, Que can piggyback on their connections, so setup is simple and jobs are protected by the transactions you're already using.
  • Atomic Backups - Your jobs and data can be backed up together and restored as a snapshot. If your jobs relate to your data (and they usually do), there's no risk of jobs falling through the cracks during a recovery.
  • Fewer Dependencies - If you're already using Postgres (and you probably should be), a separate queue is another moving part that can break.
  • Security - Postgres' support for SSL connections keeps your data safe in transport, for added protection when you're running workers on cloud platforms that you can't completely control.

Que's primary goal is reliability. You should be able to leave your application running indefinitely without worrying about jobs being lost due to a lack of transactional support, or left in limbo due to a crashing process. Que does everything it can to ensure that jobs you queue are performed exactly once (though the occasional repetition of a job can be impossible to avoid - see the docs on how to write a reliable job).

Que's secondary goal is performance. The worker process is multithreaded, so that a single process can run many jobs simultaneously.

Compatibility:

  • MRI Ruby 2.7+ (for Ruby 3, Que 2+ is required)
  • PostgreSQL 9.5+
  • Rails 6.0+ (optional)

Please note - Que's job table undergoes a lot of churn when it is under high load, and like any heavily-written table, is susceptible to bloat and slowness if Postgres isn't able to clean it up. The most common cause of this is long-running transactions, so it's recommended to try to keep all transactions against the database housing Que's job table as short as possible. This is good advice to remember for any high-activity database, but bears emphasizing when using tables that undergo a lot of writes.

Installation

Add this line to your application's Gemfile:

gem 'que'

And then execute:

bundle

Or install it yourself as:

gem install que

Usage

First, create the queue schema in a migration. For example:

class CreateQueSchema < ActiveRecord::Migration[6.0]
  def up
    # Whenever you use Que in a migration, always specify the version you're
    # migrating to. If you're unsure what the current version is, check the
    # changelog.
    Que.migrate!(version: 7)
  end

  def down
    # Migrate to version 0 to remove Que entirely.
    Que.migrate!(version: 0)
  end
end

Create a class for each type of job you want to run:

# app/jobs/charge_credit_card.rb
class ChargeCreditCard < Que::Job
  # Default settings for this job. These are optional - without them, jobs
  # will default to priority 100 and run immediately.
  self.run_at = proc { 1.minute.from_now }

  # We use the Linux priority scale - a lower number is more important.
  self.priority = 10

  def run(credit_card_id, user_id:)
    # Do stuff.
    user = User.find(user_id)
    card = CreditCard.find(credit_card_id)

    User.transaction do
      # Write any changes you'd like to the database.
      user.update charged_at: Time.now

      # It's best to destroy the job in the same transaction as any other
      # changes you make. Que will mark the job as destroyed for you after the
      # run method if you don't do it yourself, but if your job writes to the DB
      # but doesn't destroy the job in the same transaction, it's possible that
      # the job could be repeated in the event of a crash.
      destroy

      # If you'd rather leave the job record in the database to maintain a job
      # history, simply replace the `destroy` call with a `finish` call.
    end
  end
end

Queue your job. Again, it's best to do this in a transaction with other changes you're making. Also note that any arguments you pass will be serialized to JSON and back again, so stick to simple types (strings, integers, floats, hashes, and arrays).

CreditCard.transaction do
  # Persist credit card information
  card = CreditCard.create(params[:credit_card])
  ChargeCreditCard.enqueue(card.id, user_id: current_user.id)
end

You can also add options to run the job after a specific time, or with a specific priority:

ChargeCreditCard.enqueue(card.id, user_id: current_user.id, job_options: { run_at: 1.day.from_now, priority: 5 })

Learn more about job options.

Running the Que Worker

In order to process jobs, you must start a separate worker process outside of your main server.

bundle exec que

Try running que -h to get a list of runtime options:

$ que -h
usage: que [options] [file/to/require] ...
    -h, --help                       Show this help text.
    -i, --poll-interval [INTERVAL]   Set maximum interval between polls for available jobs, in seconds (default: 5)
    ...

You may need to pass que a file path to require so that it can load your app. Que will automatically load config/environment.rb if it exists, so you shouldn't need an argument if you're using Rails.

Additional Rails-specific Setup

If you're using ActiveRecord to dump your database's schema, please set your schema_format to :sql so that Que's table structure is managed correctly. This is a good idea regardless, as the :ruby schema format doesn't support many of PostgreSQL's advanced features.

Pre-1.0, the default queue name needed to be configured in order for Que to work out of the box with Rails. As of 1.0 the default queue name is now 'default', as Rails expects, but when Rails enqueues some types of jobs it may try to use another queue name that isn't worked by default. You can either:

  • Configure Rails to send all internal job types to the 'default' queue by adding the following to config/application.rb:

    config.action_mailer.deliver_later_queue_name = :default
    config.action_mailbox.queues.incineration = :default
    config.action_mailbox.queues.routing = :default
    config.active_storage.queues.analysis = :default
    config.active_storage.queues.purge = :default
  • Tell que to work all of these queues (less efficient because it requires polling all of them):

    que -q default -q mailers -q action_mailbox_incineration -q action_mailbox_routing -q active_storage_analysis -q active_storage_purge

Also, if you would like to integrate Que with Active Job, you can do it by setting the adapter in config/application.rb or in a specific environment by setting it in config/environments/production.rb, for example:

config.active_job.queue_adapter = :que

Que will automatically use the database configuration of your rails application, so there is no need to configure anything else.

You can then write your jobs as usual following the Active Job documentation. However, be aware that you'll lose the ability to finish the job in the same transaction as other database operations. That happens because Active Job is a generic background job framework that doesn't benefit from the database integration Que provides.

If you later decide to switch a job from Active Job to Que to have transactional integrity you can easily change the corresponding job class to inherit from Que::Job and follow the usage guidelines in the previous section.

Testing

There are a couple ways to do testing. You may want to set Que::Job.run_synchronously = true, which will cause JobClass.enqueue to simply execute the job's logic synchronously, as if you'd run JobClass.run(*your_args). Or, you may want to leave it disabled so you can assert on the job state once they are stored in the database.

Documentation

For full documentation, see here.

Related Projects

These projects are tested to be compatible with Que 1.x:

  • que-web is a Sinatra-based UI for inspecting your job queue.
  • que-view is a Rails engine-based UI for inspecting your job queue.
  • que-scheduler lets you schedule tasks using a cron style config file.
  • que-locks lets you lock around job execution for so only one job runs at once for a set of arguments.
  • que-unique provides fast in-memory enqueue deduping.
  • que-prometheus exposes Prometheus API endpoints for job, worker, and queue metrics

If you have a project that uses or relates to Que, feel free to submit a PR adding it to the list!

Community and Contributing

  • For feature suggestions or bugs in the library, please feel free to open an issue.
  • For general discussion and questions/concerns that don't relate to obvious bugs, join our Discord Server.
  • For contributions, pull requests submitted via Github are welcome.

Regarding contributions, one of the project's priorities is to keep Que as simple, lightweight and dependency-free as possible, and pull requests that change too much or wouldn't be useful to the majority of Que's users have a good chance of being rejected. If you're thinking of submitting a pull request that adds a new feature, consider starting a discussion in an issue first about what it would do and how it would be implemented. If it's a sufficiently large feature, or if most of Que's users wouldn't find it useful, it may be best implemented as a standalone gem, like some of the related projects above.

Specs

A note on running specs - Que's worker system is multithreaded and therefore prone to race conditions. As such, if you've touched that code, a single spec run passing isn't a guarantee that any changes you've made haven't introduced bugs. One thing I like to do before pushing changes is rerun the specs many times and watching for hangs. You can do this from the command line with something like:

for i in {1..1000}; do SEED=$i bundle exec rake; done

This will iterate the specs one thousand times, each with a different ordering. If the specs hang, note what the seed number was on that iteration. For example, if the previous specs finished with a "Randomized with seed 328", you know that there's a hang with seed 329, and you can narrow it down to a specific spec with:

for i in {1..1000}; do LOG_SPEC=true SEED=328 bundle exec rake; done

Note that we iterate because there's no guarantee that the hang would reappear with a single additional run, so we need to rerun the specs until it reappears. The LOG_SPEC parameter will output the name and file location of each spec before it is run, so you can easily tell which spec is hanging, and you can continue narrowing things down from there.

Another helpful technique is to replace an it spec declaration with hit - this will run that particular spec 100 times during the run.

With Docker

We've provided a Dockerised environment to avoid the need to manually: install Ruby, install the gem bundle, set up Postgres, and connect to the database.

To run the specs using this environment, run:

./auto/test

To get a shell in the environment, run:

./auto/dev

The Docker Compose config provides a convenient way to inject your local shell aliases into the Docker container. Simply create a file containing your alias definitions (or which sources them from other files) at ~/.docker-rc.d/.docker-bashrc, and they will be available inside the container.

Without Docker

You'll need to have Postgres running. Assuming you have it running on port 5697, with a que-test database, and a username & password of que, you can run:

DATABASE_URL=postgres://que:que@localhost:5697/que-test bundle exec rake

If you don't already have Postgres, you could use Docker Compose to run just the database:

docker compose up -d db

If you want to try a different version of Postgres, e.g. 12:

export POSTGRES_VERSION=12

Git pre-push hook

So we can avoid breaking the build, we've created Git pre-push hooks to verify everything is ok before pushing.

To set up the pre-push hook locally, run:

echo -e "#\!/bin/bash\n\$(dirname \$0)/../../auto/pre-push-hook" > .git/hooks/pre-push
chmod +x .git/hooks/pre-push

Release process

The process for releasing a new version of the gem is:

  • Merge PR(s)
  • Git pull locally
  • Update the version number, bundle install, and commit
  • Update CHANGELOG.md, and commit
  • Tag the commit with the version number, prefixed by v
  • Git push to master
  • Git push the tag
  • Publish the new version of the gem to RubyGems: gem build -o que.gem && gem push que.gem
  • Create a GitHub release - rather than describe anything there, link to the heading for the release in CHANGELOG.md
  • Post on the Que Discord in #announcements

que's People

Contributors

amatsuda avatar benhoskings avatar brandur avatar brixen avatar chanks avatar dvrensk avatar giovannibonetti avatar hardbap avatar hlascelles avatar hmarr avatar janko avatar jasoncodes avatar joehorsnell avatar joevandyk avatar jonathanhefner avatar jpaulgs avatar kevinrobayna avatar maddymarkovitz avatar mnbbrown avatar myklclason avatar nikitug avatar oeoeaio avatar ollym avatar rea-jonpad avatar siegy22 avatar smontanari avatar statianzo avatar swrobel avatar tvanderpol avatar zimbix avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

que's Issues

Might want to mention potential transport security benefits in README

Some of the attractiveness of que comes from the fact that it leans on Postgres, which can use SSL for transport. This is a huge plus imho, as you can't get this at all with a Redis backed system. This is a regular frustration of mine, especially when running workers in The Cloud (:tm:).

Anyway, mentioning that in your README might be a good idea, as I'm not sure it is immediately obvious to most people.

Migrate DB when use Rails 4.1.6

Hi.

I found strange things.

When I upgrade to rails 4.1.6 from 4.1.5.

I create DB again by rake db:migrate:reset.

Then, que_jobs scheme is crashed like below.

create_table "que_jobs", primary_key: "queue", force: true do |t|
  t.integer  "priority",    limit: 2, default: 100,                                        null: false
  t.datetime "run_at",                default: "now()",                                    null: false
  t.integer  "job_id",      limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
  t.text     "job_class",                                                                  null: false
  t.json     "args",                  default: [],                                         null: false
  t.integer  "error_count",           default: 0,                                          null: false
  t.text     "last_error"
end

I back to rails 4.1.5. I migrated again.
so, It is correct scheme.
Why? my enviroment?

create_table "que_jobs", id: false, force: true do |t|
  t.integer  "priority",    limit: 2, default: 100,                                        null: false
  t.datetime "run_at",                default: "now()",                                    null: false
  t.integer  "job_id",      limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
  t.text     "job_class",                                                                  null: false
  t.json     "args",                  default: [],                                         null: false
  t.integer  "error_count",           default: 0,                                          null: false
  t.text     "last_error"
  t.text     "queue",                 default: "",                                         null: false
end

Que doesn't log failed jobs

I see

I, [2014-01-12T20:07:10.774322 #27845]  INFO -- : [Que] No jobs available...

in the output even when que is processing failing jobs. I think there should be some logs saying that the job failed?

I do see:

I, [2014-01-12T20:06:40.769155 #27845]  INFO -- : [Que] Worked job in 0.7 ms: #<MyJob:0xbac4f300 @attrs={"priority"=>"1", "run_at"=>"2014-01-12 20:06:37.653061-08", "job_id"=>"7", "job_class"=>"MyJob", "args"=>"[]", "error_count"=>"0", :args=>[]}, @destroyed=true>

on job success.

explain SQL queries

I'm trying to build the same job queue but in python. I see the queries in sql.rb but it's not very clear to me how they operate, given my moderate understanding of SQL.

Could you please explain step by step how the job_lock query works and what guarantees it provides?

Rubinius 2.2 support

Rubinius 2.2 split a bunch of the standard library out into gems, and I'm not sure what's necessary in order to support it alongside older versions.

Schema dump produces incorrect table format

Hey there. I'm not entirely sure if this is something that I'm not doing correctly, but it's a problem that I keep encountering during development of a Rails app that I'm working on at the moment.

Whenever my schema is dumped, this is the entry for the Que table:

create_table "que_jobs", primary_key: "queue", force: true do |t|
  t.integer  "priority",    limit: 2, default: 100,                                        null: false
  t.datetime "run_at",                default: "now()",                                    null: false
  t.integer  "job_id",      limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
  t.text     "job_class",                                                                  null: false
  t.json     "args",                  default: [],                                         null: false
  t.integer  "error_count",           default: 0,                                          null: false
  t.text     "last_error"
end

The problem is when the schema is loaded (like, say, during testing), then I start getting errors from Que regarding the SQL queries and mismatched column types:

PG::DatatypeMismatch: ERROR:  column "queue" is of type integer but expression is of type text
LINE 5:       (coalesce($1, '')::text, coalesce($2, 100)::smallint, ...
               ^
HINT:  You will need to rewrite or cast the expression.

The reason why I'm sure it's to do with the schema format is that I can mitigate the problem by dropping the test DB, and then recreating it via migrations, instead of loading it via rake db:schema:load.

I had a bit of a dig around the codebase, but I couldn't come up with any solid answers. Is it possible that Que sets something in the database which Rails cannot serialise to it's Ruby schema format? Admittedly I'm just guessing here and grasping at straws, so any help would be appreciated! 😄

ActiveRecord::ConnectionNotEstablished on Heroku

Hi there,

I think Que is amazing. Nice work.

But I'm getting a ActiveRecord::ConnectionNotEstablished crash when using it on Heroku. Here's the backtrace:

$ heroku run 'rake db:migrate'
Running `rake db:migrate` attached to terminal... up, run.6525
rake aborted!
ActiveRecord::ConnectionNotEstablished: ActiveRecord::ConnectionNotEstablished
/app/vendor/bundle/ruby/2.1.0/gems/activerecord-4.1.3/lib/active_record/connection_handling.rb:109:in `connection_pool'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:5:in `checkout'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/job.rb:82:in `work'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:78:in `block in work_loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `work_loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:17:in `block in initialize'
(See full trace by running task with --trace)

My configuration:

    config.que.worker_count = 1
    config.que.wake_interval = 10.minutes
    config.que.mode = :async

It works fine if I change it to config.que.mode = :off. This is a single Heroku web dyno. Everything works fine locally.

Any ideas?

Support ruby keyword arguments

class Job < Que::Job
  def run(start_at: end_at:)
    puts "start at: #{start_at}, end at: #{end_at}"
  end
end
Job.enqueue(start_at: 1.hour.ago, end_at: Time.now)

Right now, it makes start_at a hash containing {start_at: 1.hour.ago, end_at: Time.now}.

I took a stab at this, but it wasn't trivial. So I wanted to see what others thought first before spending more time on it.

rake que:work doesn't process jobs at v0.8.0 when Que.mode = :off

Hi there,

We use Que with Que.mode = :off so that we can process jobs in a dedicated worker process, via the rake que:work task. After upgrading to v0.8.0, this has stopped processing jobs for us.

I've reproduced this issue in a vanilla Rails 4.1 app. The steps I took were:

  1. rails new test_que

  2. Added gem "que" to the Gemfile and ran bundle

  3. Added config/initializers/que.rb with Que.mode = :off as the only content

  4. Added a tiny job class to test with, such as:

    class TestJob < Que::Job
      def run
        puts "I'm done!"
      end
    end
  5. Ran rake que:work

  6. Over in another terminal tab, opened rails console and entered TestJob.enqueue

  7. Back in the tab with the rake task, I watch the output and see nothing new about processing that TestJob.

If I do all of these steps with gem "que", "0.7.3" in my Gemfile, then everything works. I run TestJob.enqueue and output about processing the job immediately appears in the shell running the rake que:work task.

Looking over your changes in v0.8.0, it looks like this might be the cause of the issue. It looks like the code for the worker to actually process the jobs only ever gets activated if Que.mode = :async. This would be fine if you wanted Que in :async mode, but when you want it :off, to allow the dedicated worker process, it seems like nothing ever happens.

Am I understanding all of this right? Would a fix be to change that conditional to check for either :async or :off as the Que mode? Anyway, I hope this is enough information for you to look into the issue. Thank you!

explicit cast error on postgres 9.3 (Ubuntu 14.04)

error when running 'rake que:work'

("class":"PG::UndefinedFunction","message":"ERROR:  operator does not exist: integer = text\nLINE 7:           WHERE queue = $1::text\n                              ^\nHINT:  No operator matches the given name and argument type(s). You might need to add explicit type casts.\n")

I was able to resolve this be making the following change in /lib/que/sql.rb (line 10)

WHERE queue = $1::text

to

WHERE queue = $1::int4

No support for queues

I have a couple different apps that listen to the same jobs table for jobs. For example, I have a process who's only responsible for syncing with zendesk. I have a process that's only responsible for sending emails. Etc. The zendesk process can't run any of the email jobs, and the email process can't run any of the zendesk jobs.

I don't think it's possible now to have more than one application connecting to a que_job table -- all the worker processes will try to run all the jobs, whereas I want only the zendesk process to run the zendesk jobs, the email process to run the email jobs, etc.

Usually this is done via a queue column, I think.

Add que_jobs.created_at ?

Not sure if it would be helpful to have a timestamp that indicates when the job was created. Possibly when it was last ran as well.

I suppose you could figure that out from the retry count and the retry interval though..

Thoughts?

Allow listening on multiple queues

I've been playing with Que and the ActiveJob wrapper in Rails 4.2, looks really useful so thanks to everyone who contributed so far.

It took me a while to realise that an apps Que workers will only listen to one queue at a time, defaulting to ''. Once I'd figured out I could set QUE_QUEUE environment variable I made some progress. However I ran into trouble again when using the new deliver_later functionality in ActionMailer. The job it creates puts mails on the mailers queue. I've worked around this in my test app by putting all my jobs on the mailers queue but it's not a great solution.

I can think of two solutions to this.

The simpler one would be to let QUE_QUEUE potentially be a list of queues separated by commas and tweak the :lock_job sql to use WHERE queue in ...

A more complicated (but more flexible) solution would be to somehow let the app override set_up_workers and create it's own workers listening to whatever queues were required. This may already be possible by setting worker_count to 0 and doing Que::Worker.workers.push Que::Worker.new('foo') as required. I'll test this out after I've finished writing this ticket.

Would you object to a PR allowing something like the following?

# One worker for the mailer queue and two for the default queue
Que.worker_configuration = { 'mailers' => 1, 'default' => 2 }

Env → Unicorn / Rails4

Hi, I'm using this great gem. thank you.
This is not bug, is question.

I'm developing with below env.

  • Rails 4.0.4
  • Ruby 2.1.1

and, I have confirmed that que is very good working.
( WEBRick and Apache, Passenger env.)

but, I changed my env to use unicorn gem(4.8.2).
Then que gem create a database data, but not working job process.

How do I setting my env ?

config.que.mode = :async in environments triggers intermittent ActiveRecord::ConnectionNotEstablished

This appears to be a similar sort of race condition to #47. It seems the wrangler gets called when setting the mode, and because environments aren't in after_initialize (obviously) like how the railtie sets the mode, we get errors (but not always).

I don't think this needs a code fix, but a mention in the readme or docs might be helpful. Or at least someone can search issues and find this.

Thanks for this great gem!

Intermittent ActiveRecord::ConnectionNotEstablished error

I have Que 0.7.3 in a Rails 4.1.1 application on MRI running Thin and using all default settings except for config.que.worker_count = 1 in my config/environments/development.rb. My ActiveRecord connection pool is set to 5 connections.

When I run rails s, on my last attempt, the app successfully started up 1 in 10 tries. For the other 9, I received the following output:

/Users/jamieenglish/projects/ee2e/.bundle/gems/activerecord-4.1.1/lib/active_record/connection_handling.rb:109:in `connection_pool': ActiveRecord::ConnectionNotEstablished (ActiveRecord::ConnectionNotEstablished)
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/adapters/active_record.rb:5:in `checkout'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/job.rb:82:in `work'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:76:in `block in work_loop'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:73:in `loop'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:73:in `work_loop'
        from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:17:in `block in initialize'

This seems like a race condition, as if I add sleep 2 to the top of Que::Worker#work_loop all is fine.

Happy to provide a fix if pointed in the right direction.

Compatibility with Unicorn (was: Single dyno?)

Hi there,

Just taking a look at your project - looks cool.

It can even do this in the background of your web process - if you're running on Heroku, for example, you won't need to run a separate worker dyno.

How exactly would you accomplish this? Would you have to spawn the thread in a unicorn before_fork or is there a much simpler method to load on rails boot?

Thanks!

Add configuration option for maximum job run time

Had a que job that hanged forever, possibly because of a problem unrelated to que.

Got me to thinking, would it be useful to have an option for setting the maximum length a job could run? Obviously could be handled by client code easily, but could be a useful option to include in que.

Que.mode with puma

Thank you for this awesome gem!

I've a question. When I start my puma webserver with the following command:

bundle exec puma -e $RACK_ENV -p $PORT -C ./config/puma.rb

Que does not start the Que.mode in :async. I've added config.que.mode = :async to <Rails.env>.rb enable the async mode, but this also applies to the rails console. Where do I enable the Que.mode = :async only for the webserver?

Cheers!

tests fail

Haven't dug into this yet, but it happens repeatably for me on the latest master, running on ubuntu 12.04 on a vm:

$ rake
/usr/local/stow/ruby-2.0.0-p247/bin/ruby -S rspec ./spec/adapters/active_record_spec.rb ./spec/adapters/connection_pool_spec.rb ./spec/adapters/pg_spec.rb ./spec/adapters/sequel_spec.rb ./spec/unit/connection_spec.rb ./spec/unit/customization_spec.rb ./spec/unit/enqueue_spec.rb ./spec/unit/helper_spec.rb ./spec/unit/logging_spec.rb ./spec/unit/migrations_spec.rb ./spec/unit/pool_spec.rb ./spec/unit/states_spec.rb ./spec/unit/stats_spec.rb ./spec/unit/work_spec.rb ./spec/unit/worker_spec.rb
.F.FF. [THEN HANGS FOREVER]

I see about three failures, then it hangs forever. apparently there's a few other test ruby processes that have been forked.

When I stop those other processes, I see the following:

Failures:

  1) Que using the ActiveRecord adapter should wake up a Worker after queueing a job in async mode, waiting for a transaction to commit if necessary
     Failure/Error: sleep_until { Que::Worker.workers.all?(&:sleeping?) && DB[:que_jobs].empty? }
     RuntimeError:
       Thing never happened!
     # ./spec/support/helpers.rb:7:in `block in sleep_until'
     # ./spec/support/helpers.rb:5:in `loop'
     # ./spec/support/helpers.rb:5:in `sleep_until'
     # ./spec/adapters/active_record_spec.rb:58:in `block (2 levels) in <top (required)>'

  2) Que using the ActiveRecord adapter should instantiate args as ActiveSupport::HashWithIndifferentAccess
     Failure/Error: $passed_args.first[:param].should == 2
     NoMethodError:
       undefined method `first' for nil:NilClass
     # ./spec/adapters/active_record_spec.rb:37:in `block (2 levels) in <top (required)>'

  3) Que using the ActiveRecord adapter should support Rails' special extensions for times
     Failure/Error: DB[:que_jobs].get(:run_at).should be_within(3).of Time.now - 60
       expected 2014-01-28 08:59:03 -0800 to be within 3 of 2014-01-28 16:59:03 -0800
     # ./spec/adapters/active_record_spec.rb:46:in `block (2 levels) in <top (required)>'

  4) Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter should allow multiple workers to complete jobs simultaneously
     Failure/Error: $q1.pop
     SignalException:
       SIGTERM
     Shared Example Group: "a multi-threaded Que adapter" called from ./spec/adapters/active_record_spec.rb:14
     # ./spec/support/shared_examples/multi_threaded_adapter.rb:31:in `block (2 levels) in <top (required)>'

  5) Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter behaves like a Que adapter should be able to queue and work a job
     Failure/Error: result[:event].should == :job_worked
       expected: :job_worked
            got: :job_race_condition (using ==)
       Diff:
       @@ -1,2 +1,2 @@
       -:job_worked
       +:job_race_condition

     Shared Example Group: "a Que adapter" called from ./spec/support/shared_examples/multi_threaded_adapter.rb:2
     # ./spec/support/shared_examples/adapter.rb:15:in `block (2 levels) in <top (required)>'

Finished in 10.05 seconds
116 examples, 5 failures

Failed examples:

rspec ./spec/adapters/active_record_spec.rb:52 # Que using the ActiveRecord adapter should wake up a Worker after queueing a job in async mode, waiting for a transaction to commit if necessary
rspec ./spec/adapters/active_record_spec.rb:34 # Que using the ActiveRecord adapter should instantiate args as ActiveSupport::HashWithIndifferentAccess
rspec ./spec/adapters/active_record_spec.rb:41 # Que using the ActiveRecord adapter should support Rails' special extensions for times
rspec ./spec/support/shared_examples/multi_threaded_adapter.rb:28 # Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter should allow multiple workers to complete jobs simultaneously
rspec ./spec/support/shared_examples/adapter.rb:12 # Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter behaves like a Que adapter should be able to queue and work a job

Support non-transactional jobs?

We sometimes need to be able to que jobs that should run even if the current transaction rolls back.

Right now, we use dblink to open the new connection.

I'm not sure if that's something that should be added to que or not.

Call for comments and testimonials

The issues tracker is a weird place to bring this up, but anyway - I'm going to be giving a talk on Que (and the design and concepts behind using advisory locks for job queues in general, I suppose) at the NYC Postgres User Group meetup in a couple of weeks. I haven't sketched out exactly what I'll be talking about yet, but I'm open to input on:

  • Are there any open questions you have on Que's design that you'd like answered?
  • Is there anything that you especially like or dislike about using Que? Maybe in terms of ops or how it influences your app design, but anything would be helpful. I'm certainly not afraid to present cons of putting the job queue in the database.
  • Do you have any testimonials or other feedback based on your use of it?

I don't think the presentations at NYCPUG are usually recorded, but I'll be happy to upload my slides somewhere after it's done. Thanks!

DB connection error? When unicorn restart by deploy.

Hi, I have question.

My Env is ...

  • Rails 4.1.5
  • Unicorn 4.8.3
  • Que 0.8.1

An error occurred like below.

I, [2014-09-10T11:50:33.594993 #18456]  INFO -- : forked child re-executing...
I, [2014-09-10T11:50:34.096243 #18456]  INFO -- : inherited addr=/app/appname/current/tmp/sockets/unicorn.sock fd=14
I, [2014-09-10T11:50:34.096457 #18456]  INFO -- : Refreshing Gem list
I, [2014-09-10T11:50:42.183461 #18456]  INFO -- : master process ready
I, [2014-09-10T11:50:42.186065 #18614]  INFO -- : worker=0 ready
I, [2014-09-10T11:50:42.190885 #18621]  INFO -- : worker=2 ready
I, [2014-09-10T11:50:42.193107 #18618]  INFO -- : worker=1 ready
I, [2014-09-10T11:50:42.197371 #18635]  INFO -- : worker=3 ready
E, [2014-09-10T11:50:42.546774 #18635] ERROR -- : listen loop error: "FATAL:  remaining connection slots are reserved for non-replication superuser connections\n" (PG::ConnectionBad)
E, [2014-09-10T11:50:42.546853 #18635] ERROR -- : /app/appname/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/postgresql_adapter.rb:888:in `initialize'

・・・

E, [2014-09-10T13:50:18.227689 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:445:in `checkout_new_connection'
E, [2014-09-10T13:50:18.227712 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:416:in `acquire_connection'
E, [2014-09-10T13:50:18.227735 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:351:in `block in checkout'
E, [2014-09-10T13:50:18.227770 #28140] ERROR -- : /usr/local/rbenv/versions/2.1.2/lib/ruby/2.1.0/monitor.rb:211:in `mon_synchronize'
E, [2014-09-10T13:50:18.227795 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:350:in `checkout'
E, [2014-09-10T13:50:18.227818 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:265:in `block in connection'
E, [2014-09-10T13:50:18.227842 #28140] ERROR -- : /usr/local/rbenv/versions/2.1.2/lib/ruby/2.1.0/monitor.rb:211:in `mon_synchronize'
E, [2014-09-10T13:50:18.227879 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:264:in `connection'
E, [2014-09-10T13:50:18.227901 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:294:in `with_connection'
E, [2014-09-10T13:50:18.227923 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
E, [2014-09-10T13:50:18.227945 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:5:in `checkout'
E, [2014-09-10T13:50:18.227967 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/job.rb:82:in `work'
E, [2014-09-10T13:50:18.227989 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:78:in `block in work_loop'
E, [2014-09-10T13:50:18.228011 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `loop'
E, [2014-09-10T13:50:18.228033 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `work_loop'
E, [2014-09-10T13:50:18.228055 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:17:in `block in initialize'

My unicorn.rb is like below.
When I change worker_processes to 4. The error occur not always.
( When worker_processes is 2, Error does't occured.)

I found Puma's issue(Restarting process with USR2 doesn't always work).

Is The issue relate?
Or, Que's problem?

thanks.

app_path = '/app/appname'
working_directory app_path + '/current'
pid               app_path + '/current/tmp/pids/unicorn.pid'
preload_app true
GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true
worker_processes 2

listen app_path + '/current/tmp/sockets/unicorn.sock', :backlog => 64

before_fork do |server, worker|
  old_pid = "#{ server.config[:pid] }.oldbin"
  if File.exists?(old_pid) && server.pid != old_pid
    begin
      sig = (worker.nr + 1) >= server.worker_processes ? :QUIT : :TTOU
      Process.kill(sig, File.read(old_pid).to_i)
    end
  end

  defined?(ActiveRecord::Base) and
    ActiveRecord::Base.connection.disconnect!
end

after_fork do |server, worker|
  defined?(ActiveRecord::Base) and
    ActiveRecord::Base.establish_connection

  Que.mode = :async
end

rails server log spam

Hello, I just discovered this Project and I've started to integrate it into our app. I really like it's simplicity in setting up a new job, but I have one issue where I can't find any help on.

When I run 'rails server' it outputs every 5 seconds:

{"lib":"que","hostname":"louis.speedport.ip","pid":42723,"thread":2240071600,"event":"job_unavailable"}

However, at this point I haven't actually scheduled any Jobs and the queries:

Que.execute( "select * from que_jobs" )
Que.worker_states
Que.job_stats

return an empty Array.

When I do run a job it gets scheduled and executed correctly, but the constant spamming is somewhat annoying.

Our App runs currently on Rails 4.1.4 and uses Postgres version 9.3.5

Any help / hint to solve this would be appreciated.

Support multiple que-s in one Postgres DB

pg_advisory_lock supports passing two int4s instead of one int8, that way it's possible to "namespace" the locks, with either a user-supplied number or by e.g. using the oid of the que_jobs table.

This allows having more than one que running per Postgres instance or to use que while also having other code that uses Postgres advisory locks.

Thoughts?

Support forking jobs

I have some jobs that process a lot of data. Using a forking worker model means that each process can start with a clean slate. If the process balloons to 1GB, that data is automatically cleaned up at the end of the job.

`NOTIFY`/`LISTEN` support?

Hello, just found this neat project. I currently use queue_classic to process jobs, which doesn't use advisory locks but does use NOTIFY/LISTEN for efficiently picking up new jobs. It looks like que currently uses polling to detect new jobs.

I see there has been some consideration for NOTIFY/LISTEN in #8 and #22 and the "experimental" branch. Just wondering if that's still a feature being considered? Polling works fine for my use case, but I'm just curious. Thanks!

Optionally using stored procedures

The discussion in #77 derailed a bit, so I'm opening a new thread for this.

The idea is to use stored procedures for grabbing an item from the queue to avoid the extra round trip otherwise necessary to deal with the race condition. I ran some initial tests using a program I wrote just a while back, and the results suggest an improvement in performance:

Performance Test Results

(The program implements the same general algorithm, but it's not exactly the code Que uses.)

It's not clear that anyone needs queues this fast (we're already talking thousands of items per second), though, so I'm not sure if this is worth the effort in practice.

lower default wake_interval

5 seconds seems pretty high?

The polling query should be pretty quick. I have three worker processes set to 0.01 running and there's no noticeable postgresql cpu or disk activity.

Maybe set it to 1 or 0.25 or 0.1?

Running a job once

Is there a more convenient way of running (not enqueueing with no delay) once than MyJob.new({ job_class: MyJob.to_s }).run(some_id, other, arg)?

Can't access global variable in Job class

I do like to read global variable like below code.

class CreateContainer < Que::Job
  def run(region, options)
    $storages[region].directories.create key: options[:container_name], public: true
  end
end

But I got this error.

{"class":"NoMethodError","message":"undefined method `directories' for nil:NilClass"},"job":{"queue":"","priority":100,"run_at":"2014-09-22T08:21:12.274+00:00","job_id":6,"job_class":"CreateContainer","args":["ord",{"container_name":"test_container"}],"error_count":5}}

Could you give me any suggestion?

JRuby support

Issue #2 kind of got side-tracked with rubinius and jruby so I am opening new issue instead.

The current jruby branch uses jdbc-postgres and activerecord-jdbcpostgresql-adapter but from what I can see it looks like it would be easier to use https://github.com/headius/jruby-pg

Which is aimed to be a "Drop-in" replacement for CRuby's pg driver.

Using that gem might heavily reduce the amount of code modifications required to support jruby. (It is currently marked as an rc level gem)

Puma with workers fails to exit

When I start puma with a configuration file like this:

threads 8,8
workers 3
preload_app!

It won't exit. It prints out:

Finishing Que's current jobs before exiting...
Finishing Que's current jobs before exiting...
Finishing Que's current jobs before exiting...

and hangs forever. I have to manually kill the ruby processes and when I kill the last of the 3 process it prints out:

Finishing Que's current jobs before exiting...
Que's jobs finished, exiting...

If I remove workers 3 line, puma exits almost directly.

Puma setup with forking not reinitializing the objects.

I can't easily get Que to work under puma with multiple worker (single mode is fine). Que is getting instantiated in the root process and copied over into the child processes without reinitialization.

The docs say I can just set use Que.mode = :async to make it work, but I also have to reinitialize the connection and the logger.
The problem then remaining is that the workers are not woken up periodically, since the wrangler is in the wrong process. I need this for jobs added by cron-jobs.

Am I doing something wrong? My current work-around feels brittle.

The interesting lines of my current workaround:

    Que.connection= ::ActiveRecord # else we get a /que.rb:40:in `adapter': Que connection not established! (RuntimeError)
    Que.logger = Rails.logger # else we don't get any log entries
    Que.worker_count = que_workers # enable the que again
    # Que.wake_interval = 5.seconds # crashes with ThreadError
    Thread.new do
      loop do
        sleep 1
        Que.wake!
      end
    end         

my full configuration is:

require 'que'

puma_workers = Integer(ENV['PUMA_WORKERS'] || 2)
min_threads =  Integer(ENV['MIN_THREADS']  || 8)
max_threads =  Integer(ENV['MAX_THREADS']  || 8)
que_workers =  Integer(ENV['QUE_WORKERS']  || 1)

Que.mode = :off 

workers puma_workers
threads min_threads, max_threads
preload_app!
port        ENV['PORT']     || 3000
environment ENV['RACK_ENV'] || 'development'

on_worker_boot do
  ActiveRecord::Base.connection_pool.disconnect!

  ActiveSupport.on_load(:active_record) do
    config = ActiveRecord::Base.configurations[Rails.env] ||
                Rails.application.config.database_configuration[Rails.env]
    config['pool']              = max_threads + que_workers
    ActiveRecord::Base.establish_connection(config)

    Que.connection= ::ActiveRecord # else we get a /que.rb:40:in `adapter': Que connection not established! (RuntimeError)
    Que.logger = Rails.logger # else we don't get any log entries
    Que.worker_count = que_workers # enable the que again
    # Que.wake_interval = 5.seconds # crashes with ThreadError
    Thread.new do
      loop do
        sleep 1
        Que.wake!
      end
    end         
  end
end

Plus I have a Que.mode = :off in my application.rb to be safe.

@attrs[:run_at] error

Any time I try to use @attrs[:run_at], even in exactly the same manner as described in the docs to continuous jobs, I get an error:

"error":{"class":"TypeError","message":"no implicit conversion of ActiveSupport::Duration into String"}

Perhaps something has changed at the docs haven't caught up yet?

Possibly document a guide on web processes polling for job completion

Say a web process starts a job that can take a while, say we're generating a large file that gets pushed to s3.

  • web process starts job, gets back job id that's returned to the client
  • client refreshes periodically with that job id, either by ajax or http refresh
  • when job completes, the client gets the job payload (a s3 url or other data)

I think this is a pretty common thing, would be nice to have a somewhat idiomatic document that explains how to do it with que. I'll see if I can work on it.

Would be nice to be able to report back job status (like "generating file", "uploading", "finalizing", etc)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.