samsondav / rihanna Goto Github PK
View Code? Open in Web Editor NEWRihanna is a high performance postgres-backed job queue for Elixir
License: MIT License
Rihanna is a high performance postgres-backed job queue for Elixir
License: MIT License
Reproduce;
schedule a job
Rihanna.schedule(MyJob, [attrs], at: due_at)
, with due_at a datetime as per the docs.
from that job, return
{:reenqueue, due_at}
, with due_at a datetime as per the docs.
The job is now being removed from the db.
Expected behavior is that the new due_at should be set in the table on the job.
At some point in time, a job that was scheduled is no longer relevant, or even wrong. As of today, I have the following the options:
If I could delete the job using the relevant data to my domain, that is, the module and the function, I could delete the outdated jobs without the need to save the id.
I'm preparing an PR with that
hi there ~
i have tried rihanna in my Elixir project, follow the config in README.md, but the job is not executed in ExUnit sandbox, is there sth i missing ?
there is no "work, work .." in console, how could i make it run in test env ?
also i am not sure if #59 facing the same problem
OS: mac 10.13.1
Rihanna version: latest
Elixir version: latest(1.9.0)
Phoenix version: latest
Hello and thanks for this project. I am evaluating whether we should use rihanna for some job scheduling and the FAQ entry about job duration stood out to me:
One thing to be aware of is that if you restart your application (e.g. because you deployed) then all running jobs on that node will be exited. For this reason it is probably sensible not to make your jobs take an extremely long time.
Can you elaborate a little on this? What does exit mean here..? Is the job considered "done", put into some error state or requeued?
For some context: We will have jobs that can potentially take quite a long time to finish (30mins+) and use CD extensively. So any job that gets interrupted due to a deployment needs to be finished at some point (restarted).
Thanks again and have a nice day!
The telemetry library aims to be a unified metrics reporting system for the BEAM ecosystem. It's currently used by Ecto and Phoenix
It could be good for Rihanna to support it so that people can hook into it for their monitoring of Rihanna.
https://github.com/beam-telemetry/telemetry
Possibly events:
[:rihanna, :job, :done]
We could also add some pollable metrics https://github.com/beam-telemetry/telemetry_poller
[:rihanna, :jobs_queued_count]
[:rihanna, :dead_job_count]
[:rihanna, :jobs_running_count]
It would be great if when the application is shutting down a period of time is given for the workers to finish.
If the workers finish within the grace time the application continues shutting down, if they do not they are killed and then shut down continues.
Here's an example of how I implemented this in Exq: https://github.com/akira/exq/pull/271/files
I may be up for implementing this at a later date :)
Hi there! I'm considering using Rihanna but it's not the best fit for my use case until it supports defining multiple queues, each with a configurable # of workers. I understand from your FAQ that this is a planned feature. Do you have any guess when it will become a priority? eg. next month / next year / next 5 years?
Thanks - and awesome project!
Hello!
Ecto has a nice sandbox that allows us to run database hitting tests concurrently when using Ecto's connection pool.
It'd be nice to be able to use this with Rihanna. Currently the library is hard-coded to use a dedicated Postgrex connection, however if the user was able to pass a connection the user could pass an Ecto repo and thus make use of the sandbox.
This would also remove the need for the Rihanna.Job.Postgrex
in many cases, halving the number of database connections that Rihanna uses. Configuration could be added to prevent this process from being started if desired.
In addition it would also enable the user to enqueue more jobs and perform business login with the same SQL transaction as the connection will be shared across both automatically with Ecto, enabling a little more safety.
Here's some prior art on adding Ecto integration to a SQL executing lib without coupling tightly lpil/yesql@e692828
I will implement this feature if given the thumbs up.
Cheers,
Louis
All jobs are not created equal. Some need to be scheduled earlier than others if there aren't enough workers to work on all of the enqueued jobs. Allow defining a priority against all jobs to make sure that higher priority jobs are scheduled.
You are running a subscription business and have two background jobs defined:
In this example you want to send the email to the customer to make sure they have feedback on the receipt of their subscription. This would take a higher priority over notifying a Slack channel.
Create a column priority (smallint/integer)
that allows defining an integer priority. This can conform to the niceness values used in Linux — lower values are more important. When trying to poll for jobs, order the result by priority
.
Is there a max worker concurrency? Can I configure it?
The second way is to implement the Rihanna.Job behaviour and define the perform/1 function. Implementing this behaviour allows you to define retry strategies, show custom error messages on failure etc. See the docs for more details.
Could you please describe how to handle retries? Using the Rihanna.Job
behaviour. Do you schedule another job with the same params within the current job? If I wanted to have a global retry, is there any reason why I couldn't use a process to monitor the jobs table for failed jobs and retry them using Rihanna.Job.retry/1
? Thank you for your help and this library.
Is it possible to run multiple rihanna (supervisors) each pointing to a different configured table? Not sure how enqueue
would work. This would let us have N "queues", effectively.
Windows 10 WSL (Ubuntu 16)
Elixir 1.8, Phoenix 1.4
Postgres 9.6 (Google)
Rihanna 1.3.1
Rihanna has been working fine in development; however, I am about to move to production, so I created a Postgres server at Google compute cloud, and installed cloud_sql_proxy
on my laptop.
Note: I am not yet doing a release build; rather, I am just compiling and running MIX_ENV=prod on my laptop while connecting to the remote Postgres at Google using cloud_sql_proxy
.
Cloud_sql_proxy works fine for almost everything:
MIX_ENV=prod mix ecto.create
MIX_ENV=prod mix ecto.migrate # works fine
However:
MIX_ENV=prod mix phx.server
Reveals unhandled errors:
18:51:43.557 [info] Running GjwappWeb.Endpoint with cowboy 2.6.3 at 0.0.0.0:4000 (http)
18:51:43.677 [info] Access GjwappWeb.Endpoint at http://localhost:4000
18:51:43.824 [error] Postgrex.Protocol (#PID<0.517.0>) failed to connect: ** (DBConnection.ConnectionError) tcp connect (localhost:5432): connection refused - :econnrefused
18:51:44.928 [error] Postgrex.Protocol (#PID<0.517.0>) failed to connect: ** (DBConnection.ConnectionError) tcp connect (localhost:5432): connection refused - :econnrefused
18:51:47.930 [error] Postgrex.Protocol (#PID<0.517.0>) failed to connect: ** (DBConnection.ConnectionError) tcp connect (localhost:5432): connection refused - :econnrefused
18:51:51.119 request_id=FaW9x6SqlTDl1QQAAAIh [info] GET /
18:51:51.339 request_id=FaW9x6SqlTDl1QQAAAIh [info] Sent 200 in 220ms
18:51:51.979 [error] GenServer Rihanna.JobDispatcher terminating
** (CaseClauseError) no case clause matching: {:error, %DBConnection.ConnectionError{message: "connection not available and request was dropped from queue after 2935ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information"}}
(rihanna) lib/rihanna/migration.ex:173: Rihanna.Migration.check_table!/1
(rihanna) lib/rihanna/job_dispatcher.ex:110: Rihanna.JobDispatcher.check_database!/1
(rihanna) lib/rihanna/job_dispatcher.ex:31: Rihanna.JobDispatcher.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :initialise
So, is Rihanna not handling it properly or am I not setting it up properly?
How would you like me to proceed. I am happy to fork this and work on it with a bit of direction.
Also interesting beam toughness: Despite these error messages pouring out, I can run the app fine and do CRUD -- I just cannot get a Rihanna job processed.
Thanks.
David
20:24:40.039 [info] execute "ALTER TABLE rihanna_jobs DROP COLUMN due_at;\nALTER TABLE rihanna_jobs DROP COLUMN rihanna_internal_meta;\n"
** (Postgrex.Error) ERROR 42601 (syntax_error): cannot insert multiple commands into a prepared statement
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
20:24:40.070 [error] GenServer #PID<0.238.0> terminating
** (Postgrex.Error) ERROR 42601 (syntax_error): cannot insert multiple commands into a prepared statement
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
Last message: {:EXIT, #PID<0.74.0>, {%Postgrex.Error{connection_id: 8840, message: nil, postgres: %{code: :syntax_error, file: "postgres.c", line: "1280", message: "cannot insert multiple commands into a prepared statement", pg_code: "42601", routine: "exec_parse_message", severity: "ERROR", unknown: "ERROR"}}, [{Ecto.Adapters.SQL, :query!, 5, [file: 'lib/ecto/adapters/sql.ex', line: 200]}, {Ecto.Adapters.Postgres, :"-execute_ddl/3-fun-0-", 4, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Adapters.Postgres, :execute_ddl, 3, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Ecto.Migration.Runner, :"-flush/0-fun-1-", 2, [file: 'lib/ecto/migration/runner.ex', line: 104]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Migration.Runner, :flush, 0, [file: 'lib/ecto/migration/runner.ex', line: 102]}, {:timer, :tc, 2, [file: 'timer.erl', line: 181]}]}}
State: %DynamicSupervisor{args: {{:temporary, 5000}, []}, children: %{}, extra_arguments: [], max_children: :infinity, max_restarts: 3, max_seconds: 5, mod: Task.Supervisor, name: {#PID<0.238.0>, Task.Supervisor}, restarts: [], strategy: :one_for_one}
20:24:40.076 [error] GenServer #PID<0.247.0> terminating
** (Postgrex.Error) ERROR 42601 (syntax_error): cannot insert multiple commands into a prepared statement
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
Last message: {:EXIT, #PID<0.74.0>, {%Postgrex.Error{connection_id: 8840, message: nil, postgres: %{code: :syntax_error, file: "postgres.c", line: "1280", message: "cannot insert multiple commands into a prepared statement", pg_code: "42601", routine: "exec_parse_message", severity: "ERROR", unknown: "ERROR"}}, [{Ecto.Adapters.SQL, :query!, 5, [file: 'lib/ecto/adapters/sql.ex', line: 200]}, {Ecto.Adapters.Postgres, :"-execute_ddl/3-fun-0-", 4, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Adapters.Postgres, :execute_ddl, 3, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Ecto.Migration.Runner, :"-flush/0-fun-1-", 2, [file: 'lib/ecto/migration/runner.ex', line: 104]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Migration.Runner, :flush, 0, [file: 'lib/ecto/migration/runner.ex', line: 102]}, {:timer, :tc, 2, [file: 'timer.erl', line: 181]}]}}
State: %DynamicSupervisor{args: {{:temporary, 5000}, []}, children: %{}, extra_arguments: [], max_children: :infinity, max_restarts: 3, max_seconds: 5, mod: Task.Supervisor, name: {#PID<0.247.0>, Task.Supervisor}, restarts: [], strategy: :one_for_one}
20:24:40.082 [error] GenServer Broker.Repo terminating
** (Postgrex.Error) ERROR 42601 (syntax_error): cannot insert multiple commands into a prepared statement
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
Last message: {:EXIT, #PID<0.74.0>, {%Postgrex.Error{connection_id: 8840, message: nil, postgres: %{code: :syntax_error, file: "postgres.c", line: "1280", message: "cannot insert multiple commands into a prepared statement", pg_code: "42601", routine: "exec_parse_message", severity: "ERROR", unknown: "ERROR"}}, [{Ecto.Adapters.SQL, :query!, 5, [file: 'lib/ecto/adapters/sql.ex', line: 200]}, {Ecto.Adapters.Postgres, :"-execute_ddl/3-fun-0-", 4, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Adapters.Postgres, :execute_ddl, 3, [file: 'lib/ecto/adapters/postgres.ex', line: 96]}, {Ecto.Migration.Runner, :"-flush/0-fun-1-", 2, [file: 'lib/ecto/migration/runner.ex', line: 104]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1899]}, {Ecto.Migration.Runner, :flush, 0, [file: 'lib/ecto/migration/runner.ex', line: 102]}, {:timer, :tc, 2, [file: 'timer.erl', line: 181]}]}}
State: {:state, {:local, Broker.Repo}, :one_for_one, [{:child, #PID<0.628.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, Broker.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: Ecto.Adapters.Postgres.TypeModule, port: 5432, name: Broker.Repo.Pool, otp_app: :broker, repo: Broker.Repo, timeout: 15000, pool_timeout: 5000, adapter: Ecto.Adapters.Postgres, username: "postgres", password: "postgres", database: "broker_dev", hostname: "localhost", pool_size: 1, pool: DBConnection.Poolboy]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {Broker.Repo, :broker, Ecto.Adapters.Postgres, [pool_size: 1]}}
I think the error is caused by this line. Here we have two statements on a single execute.
Because of this line I can't pass the valid ssl_opts
keyword list to Rihanna, making it impossible to be used with ssl for myself.
IMHO the fix would simply be to add :ssl_opts
here since you already take :ssl
option. This would work for MY scenario, but it is important to notice that there are several other options that Postgrex
accepts for starting a connection. So, the proper fix here might be to add all of them.
Without this I can't use Rihanna. Since we are not actually testing Rihanna with all possible connection setups, I can open a PR only adding the key if desired. Though it would be better if a maintainer could do it because, if possible, I'd need that on a PATCH or MINOR release.
Thanks for your work!
I stepped through the Rhianna versions and found that this error is produced in v1.3.3 onwards.
[error] GenServer Rihanna.JobDispatcher terminating
** (MatchError) no match of right hand side value: %Postgrex.Result{columns: ["pg_advisory_unlock"], command: :select, connection_id: 21507, messages: [%{code: "01000", file: "lock.c", line: "1901", message: "you don't own a lock of type ExclusiveLock", routine: "LockRelease", severity: "WARNING", unknown: "WARNING"}], num_rows: 1, rows: [[false]]}
(rihanna) lib/rihanna/job.ex:342: Rihanna.Job.release_lock/2
(rihanna) lib/rihanna/job.ex:302: Rihanna.Job.mark_successful/2
(rihanna) lib/rihanna/job_dispatcher.ex:67: Rihanna.JobDispatcher.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Please ask if you need more details, but this is all on a very standard setup; Postgres v11.3 and Ecto, etc all installed at their latest versions from yesterday.
Should I even be installing Rhianna above v1.3.0? I just saw the advice to use 0.0.0
in the README and assumed it was an out of date reference, so I looked through the commits to find the latest version.
Such as here: https://github.com/samphilipd/rihanna/blob/master/lib/rihanna/job.ex#L398
When a job is retried and set to due_at
, should that value be considered at all? Should that be considered over the enqueued_at
?
Currently we have a Phoenix based UI application. This is handy, but forces users to deploy more than one application.
If we supplied a UI in the form of a Plug that could be mounted on another Plug app more people would be able to use the UI.
I need this for my app so I'll build this later.
I've been running an Elixir app on Heroku, once I introduced Rihanna it began crashing on startup. Upgrading to Elixir 1.6 remedies the problem but the docs state 1.5 should work so I'm reporting the stack traces here.
Adding the supervisor as described...
supervisor(Rihanna.Supervisor, [name: Rihanna.Supervisor, postgrex: My.Repo.config()])
Results in...
Application archie exited: Archie.Application.start(:normal, []) returned an error: shutdown: failed to start child: Rihanna.Supervisor
** (EXIT) an exception was raised:
** (FunctionClauseError) no function clause matching in Keyword.pop_first/3
(elixir) lib/keyword.ex:959: Keyword.pop_first({:name, Rihanna.Supervisor}, :postgrex, [])
(rihanna) lib/rihanna/supervisor.ex:32: Rihanna.Supervisor.start_link/2
(stdlib) supervisor.erl:365: :supervisor.do_start_child/2
(stdlib) supervisor.erl:348: :supervisor.start_children/3
(stdlib) supervisor.erl:314: :supervisor.init_children/2
(stdlib) gen_server.erl:328: :gen_server.init_it/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
{"Kernel pid terminated",application_controller,"{application_start_failure,archie,{{shutdown,{failed_to_start_child,'Elixir.Rihanna.Supervisor',{'EXIT',{function_clause,[{'Elixir.Keyword',pop_first,[{name,'Elixir.Rihanna.Supervisor'},postgrex,[]],[{file,\"lib/keyword.ex\"},{line,959}]},{'Elixir.Rihanna.Supervisor',start_link,2,[{file,\"lib/rihanna/supervisor.ex\"},{line,32}]},{supervisor,do_start_child,2,[{file,\"supervisor.erl\"},{line,365}]},{supervisor,start_children,3,[{file,\"supervisor.erl\"},{line,348}]},{supervisor,init_children,2,[{file,\"supervisor.erl\"},{line,314}]},{gen_server,init_it,6,[{file,\"gen_server.erl\"},{line,328}]},{proc_lib,init_p_do_apply,3,[{file,\"proc_lib.erl\"},{line,247}]}]}}}},{'Elixir.Archie.Application',start,[normal,[]]}}}"}
See #46 for discussion.
This would bring enhanced performance when run on a single Erlang cluster.
First of all, thank you for this awesome library!
I'm currently running my Phoenix app on Heroku and looking into using rihanna. I'd love to get some feedback on your experience/insights in running on Heroku.
For example: What happens when scaling to multiple dynos? And can the task processing be moved to a "worker" instead of a "web" dyno?
Thank you so much in advance!
👋
I've added a simple test like this and is seems to be failing:
describe "enqueue" do
test "valid job" do
# tracing shows that `:poll`s are being made
# Rexbug.start(["Rihanna.JobDispatcher :: return", "Rihanna.Config :: return"],
# msgs: 1000,
# time: 60000
# )
assert {:ok, %Rihanna.Job{}} = Rihanna.enqueue({Kernel, :send, [self(), {:work, :done}]})
assert_receive {:work, :done}, 60000 # never arrives
end
end
The repo that rihanna uses runs in an ecto sandbox.
I've also noted that rihanna_jobs table doesn't seem to be rolled back after the test exits. Does rihanna run job polling queries outside of the sandbox transaction? Then it would explain why the job created in the test is never executed.
Hi,
I am using the library in one application inside an umbrella with many other apps.
Now I was trying to use it in another app. Each of the apps has its own repo and database.
Encountered 2 issues:
Rihanna.Supervisor
is already started by one of the apps, so it cannot be started by the other one as wellWould it be a way to make it work in an umbrella app?
Thanks
PS: isn't that funny? having trouble running rihanna under the umbrella
Wanted to make sure that dropping table is all that is needed to rollback.
Would give potential users more confidence in the project, and makes it easier to review code from contributors :)
Currently, it is possible to re-enqueue a job (the args are generally available), but when doing so we lose any metadata (specifically attempts count).
I'd like to introduce a non-failure response that simply sets a new due_at, but otherwise retains the job as-is (id, metadata, etc).
Que v1.x (currently in beta) has had significant design changes over the v0.x design. The changes are summarized in the changelog
The most relevant portion is this:
- Que's implementation has been changed from one in which worker threads hold their own PG connections and lock their own jobs to one in which a single thread (and PG connection) locks jobs through LISTEN/NOTIFY and batch polling, and passes jobs along to worker threads. This has many benefits, including:
- Jobs queued for immediate processing can be actively distributed to workers with LISTEN/NOTIFY, which is more efficient than having workers repeatedly poll for new jobs.
- When polling is necessary (to pick up jobs that are scheduled for the future or that need to be retried due to errors), jobs can be locked and fetched in batches, rather than one at a time.
- Individual workers no longer need to monopolize their own (usually idle) connections while working jobs, so Ruby processes will require fewer Postgres connections.
- PgBouncer or another external connection pool can be used for workers' connections (though not for the connection used to lock and listen for jobs).
There are some major benefits to these design changes (thanks @chanks 👏), in particular the removal of the requirement to open a dedicated connection and hold it for the entire duration of working a job. The v4 schema migration also illustrates the underlying changes to the Postgres schema that enable these upgrades.
Have you looked into this or thought about what might be involved in migrating Rihanna to this design?
Hello!
On my local machine, I'm running into issues where Rihanna jobs are sporadically not appearing in logs but are being executed.
The flow is this:
External Twilio API to Phoenix Endpoint
⬇️
Phoenix Controller enqueues a Rihanna job
⬇️
Rihanna job performs DB work, sends a notification email, and updates GraphQL subscriptions
As I'm running my localhost Phoenix server, I'm (??) seemingly-randomly getting two different outputs in my logs:
Sometimes I see [related DB logging omitted]...
[debug] Processing with TwilioController.message_create/2
Pipelines: [:logging, :api]
[info] Received POST /twilio/message
[info] Sent 201 response in 2.00ms
[debug] [Rihanna] Starting job 4179
[debug] Processed begin in 0ms
[debug] [Rihanna] Finished job 4179
...and other times I see...
[debug] Processing with TwilioController.message_create/2
Pipelines: [:logging, :api]
[info] Received POST /twilio/message
[info] Sent 201 response in 2.26ms
...with no corresponding Rihanna logging. However, I know the job is executing because the emails are being delivered as expected and the database records are being updated as expected by the job.
The punchline for me is that in the no-Rihanna-logging scenarios, my GraphQL subscriptions aren't being pushed through, which is a big bad in the case of my app.
This is where my lack of OTP knowledge is going to lead me to ask an ignorantly-phrased question: Where are the un-logged job executions happening in these seemingly-random instances? How do I... get them to execute in the same place?
(Gosh this feels like an inarticulate question. And for that, I'm sorry)
We should have Rihanna be able to execute an after_execution callback on the job e.g. if the user wants to take some action on failure such as sending an email.
cc @sadir
If a module-based job returns :error
or {:error, reason}
, retry_at
is called on the module to determine whether to retry. However, if the job raises or exits because of an error, retry_at
is not called on the module so the job is never retried.
I'm happy to take a look at a PR for this!
Module and function name should be columns to enable fast searching
Hi there - we have a table with more than a million jobs, and the WITH RECURSIVE jobs AS...
query used by the Rihanna.Job.lock/3
is timing out right away. This query takes more than 45 seconds to return, if we relax the dbconnection timeout. Anyone here encountered this situation, and if yes, how are you handling it? Can that query be optimized? Many thanks!
And here is the psql plan, in case it will be useful for troubleshooting: https://explain.depesz.com/s/s8vc
Jobs are not guaranteed to be commutative. That means you could imagine a scenario like this:
3 (non commutative) jobs fail, and appear on the failed queue. Imagine for example, switching a boolean value.
That means on the retry list we have:
1. A job which wants to go from true
-> false
2. A job which wants to go from false
-> true
3. A job which wants to go from true
-> false
Now imagine a dev retires the job, as far as I can see there's nothing guaranteeing the jobs run in the correct order - so 3 could finish, then 1 could finish, then 2 could finish.
That would leave the system in an incorrect state.
This error is returned when attempting to dispatch jobs with a custom jobs_table_name
config:
[error] GenServer Rihanna.JobDispatcher terminating
** (ArgumentError) The Rihanna jobs table must be upgraded.
The easiest way to upgrade the database is with Ecto.
Run `mix ecto.gen.migration upgrade_rihanna_jobs` and make your migration look
like this:
defmodule MyApp.UpgradeRihannaJobs do
use Rihanna.Migration.Upgrade
end
Now you can run `mix ecto.migrate`.
(rihanna) lib/rihanna/migration.ex:267: Rihanna.Migration.raise_upgrade_required!/0
(rihanna) lib/rihanna/job_dispatcher.ex:31: Rihanna.JobDispatcher.handle_info/2
The check here: https://github.com/samphilipd/rihanna/blob/master/lib/rihanna/migration.ex#L234
is looking for the wrong primary key index. For example, with:
config :rihanna, jobs_table_name: "background_jobs",
The generated primary key index name is background_jobs_pkey
vs.rihanna_jobs_pkey
resulting in the error above.
Many queues have a limit on how long it can take to perform an item of work.
It looks like rihanna doesn't have a limit (yay!). Let's document this in the FAQ
Would it make sense to add the functionality to use Rihanna
and expose the enqueue
and schedule
methods?
My personal use case is that I am running an umbrella application and one of those apps is tasked with managing the workers. I was thinking of isolating the Rihanna dependency exclusively to that app and expose it through a module (that would use Rihanna
) to the other apps, like so:
defmodule My.Worker do
use Rihanna
end
So then I could do something like: My.Worker.enqueue(...)
Thanks 🙇
There's an issue in the Rihanna.Jobs docs having to do with missing code block delimiters.
We're currently using Rihanna in production and ran into a major issue where Job.lock
was taking nearly 3 minutes to execute on ~160k records. The vast majority of performance hit can be blamed on these comparison and sort operations, which, conveniently, can be addressed by a single index.
CREATE INDEX CONCURRENTLY rihanna_jobs_enqueued_at_id ON rihanna_jobs (enqueued_at ASC, id ASC);
Job.lock
now runs on the same dataset at < 2ms.
I've forked and fleshed out the benchmark with a test for 100k records and the above index, but ultimately would like to include the index by default on table create/update.
Creating this issue to track progress and discussion.
One of the hardest things to manage when dealing with background jobs is rate limiting requests.
Here's the free OSS solution I use currently in ruby: https://github.com/enova/sidekiq-rate-limiter
Here is the page that describes the problem and solution (with Sidekiq enterprise):
https://github.com/mperham/sidekiq/wiki/Ent-Rate-Limiting
Thank you for your contributions to OSS!!!
I am trying to run the tests on Ubuntu 18.04 (Bionic) with
# elixir -v
Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1]
Elixir 1.7.1 (compiled with Erlang/OTP 21)
The tests run successfully (it seems) but I am getting a lot of errors, output below:
# mix test
.......
10:47:58.614 [error] Task #PID<0.255.0> started from #PID<0.253.0> terminating
** (RuntimeError) Kaboom!
(rihanna) test/support/mocks.ex:75: Rihanna.Mocks.BadMFAMock.perform/1
(rihanna) lib/rihanna/job_dispatcher.ex:134: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
....
10:47:59.056 [error] Task #PID<0.271.0> started from #PID<0.269.0> terminating
** (UndefinedFunctionError) function Nope.broken/1 is undefined (module Nope is not available)
Nope.broken(:kaboom!)
(rihanna) lib/rihanna/job_dispatcher.ex:134: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
.
10:47:59.164 [error] Task #PID<0.276.0> started from #PID<0.274.0> terminating
** (UndefinedFunctionError) function ErrorBehaviourMockWithNoErrorCallback.perform/1 is undefined (module ErrorBehaviourMockWithNoErrorCallback is not available)
ErrorBehaviourMockWithNoErrorCallback.perform([#PID<0.272.0>, #Reference<0.2314301152.1576534022.148955>])
(rihanna) lib/rihanna/job_dispatcher.ex:141: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
...
10:47:59.264 [error] Task #PID<0.284.0> started from #PID<0.282.0> terminating
** (RuntimeError) [Rihanna] Cannot execute MFA job because Rihanna was configured with the `behaviour_only` config option set to true.
(rihanna) lib/rihanna/job_dispatcher.ex:132: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
..
10:47:59.508 [error] Task #PID<0.295.0> started from #PID<0.293.0> terminating
** (RuntimeError) Kaboom!
(rihanna) test/support/mocks.ex:75: Rihanna.Mocks.BadMFAMock.perform/1
(rihanna) lib/rihanna/job_dispatcher.ex:141: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
.
10:47:59.617 [error] Task #PID<0.300.0> started from #PID<0.298.0> terminating
** (RuntimeError) Kaboom!
(rihanna) test/support/mocks.ex:75: Rihanna.Mocks.BadMFAMock.perform/1
(rihanna) lib/rihanna/job_dispatcher.ex:134: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
.....
10:47:59.949 [error] Task #PID<0.322.0> started from #PID<0.320.0> terminating
** (UndefinedFunctionError) function Nope.broken/1 is undefined (module Nope is not available)
Nope.broken(:kaboom!)
(rihanna) lib/rihanna/job_dispatcher.ex:134: anonymous fn/1 in Rihanna.JobDispatcher.spawn_supervised_task/1
(elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: #Function<2.72278995/0 in Rihanna.JobDispatcher.spawn_supervised_task/1>
Args: []
..............................................
Finished in 3.7 seconds
69 tests, 0 failures
Randomized with seed 151891
Any idea what could be causing this?
Thank you for this package, great work.
After upgrading to 2.1.0 I got this error repeatedly for about 3 minutes. I'm not sure it's related to the 2.1.0 upgrade, but I thought I'd share it here in case it is more clear to anyone else.
ERROR 25006 (read_only_sql_transaction) cannot execute SELECT FOR UPDATE in a read-only transaction
lib/postgrex.ex:233 Postgrex.query!/4
lib/rihanna/job.ex:417 Rihanna.Job.lock/3
lib/rihanna/job_dispatcher.ex:37 Rihanna.JobDispatcher.handle_info/2
gen_server.erl:637 :gen_server.try_dispatch/4
gen_server.erl:711 :gen_server.handle_msg/6
proc_lib.erl:249 :proc_lib.init_p_do_apply/3
Testing behaviour and outcomes is an important part of developing software for many teams. Making it easier to test Rihanna
job behaviour in ExUnit
tests would be helpful to this aim.
Ideally we would have the following functions available:
# clears all jobs in the job queue
Rihanna.Test.purge()
# checks that a job module with the given arguments has been enqueued.
Rihanna.Test.queues_up(job, arguments)
There may be more that would be useful.
I'd imagine the first of these functions to be used as part of the initialisation of a test. The second of these could be used to check that some event triggers a job being enqueued.
defmodule APITest do
use ExUnit.Case
setup do
:ok = Rihanna.Test.purge()
end
test "new user creation sends email verification" do
Some.Namespace.Test.HTTP.Module.fires_post_request_creating_new_user(username, email)
assert Rihanna.Test.queues_up(Some.Namespace.Jobs.SendVerificationEmail, %{username: username, email: email})
end
end
This isn't a new idea. Most of this I'm stealing from rspec-que.
I've been using Rihanna in production for a while now and would like to thank the maintainers for the great work and a very useful, slim system.
We're subscribing to the Telemetry events emitted by Rihanna and export them to a dashboard. Our system schedules diffent types of jobs, and it would be very useful to differentiate between these types in the dashboard. For this to work, I see three options:
job_id
, see below). IMHO this option is suboptimal in terms of separation of concerns: it requires the subscriber (concern: instrumentation) to have Rihanna as a hard dependency (leaking concern: how does Rihanna store jobs?).Rihanna already provides support for telemetry for the following events:
enqueued
deleted
locked
(count
only)succeeded
failed
reenqueued
retried
released
Most of the events report a count of 1 and the numeric job ID, except:
deleted
sometimes will report a count > 1 and hence no id, andlocked
only reports the countI would like to help implementing this, and will highly appreciate any input from the maintainer team.
It looks like the migration is hard-coded to use an integer primary key. Is it possible to use a binary, such as PG's built-in support for UUID primary keys?
Rihanna requires 1 + N database connections per node, where 1 connection is used for the external API of enqueuing/retrying jobs and N is the number of dispatchers.
In the default configuration of one dispatcher per node, Rihanna will use 2 database connections per node.
This seems to imply that the number of dispatcher can be configured, but Rihanna is hard coded to only start 1 dispatcher and 1 Postgrex, so there is always 2.
Coupled with the fact that both processes have a hard-coded name it's not possible to have more instances of the tree as the name is already taken.
It seems that the job dispatcher name is never actually used and can be removed: https://github.com/samphilipd/rihanna/search?utf8=%E2%9C%93&q=Rihanna.JobDispatcher&type=
The Postgrex connection could be not started (see #38), and then the user can add as many Rihanna subtrees to their application as needed :)
This error is showing up quite frequenly on Sentry.IO for my application:
UndefinedFunctionError: function nil.id/0 is undefined
?, in nil.id/0
File "lib/rihanna/job_dispatcher.ex", line 67, in Rihanna.JobDispatcher.handle_info/2
File "gen_server.erl", line 637, in :gen_server.try_dispatch/4
File "gen_server.erl", line 711, in :gen_server.handle_msg/6
File "proc_lib.erl", line 249, in :proc_lib.init_p_do_apply/3
From what I can gather, it's trying to mark a job as successful, but the job that has been popped from the map is nil.
def handle_info({ref, result}, state = %{pg: pg, working: working}) do
# Flush guarantees that any DOWN messages will be received before
# demonitoring. This is probably unnecessary but it can't hurt to be sure.
Process.demonitor(ref, [:flush])
{job, working} = Map.pop(working, ref) # ==> JOB GETS POPPED HERE
case result do
{:error, _} ->
job_failure(job, result, pg)
:error ->
job_failure(job, result, pg)
_ ->
Rihanna.Job.mark_successful(pg, job.id) # ==> JOB IS NIL AT THIS POINT ?!?
end
state = Map.put(state, :working, working)
{:noreply, state}
end
I'm using Rihanna version 2.0.0 on Elixir 1.8.1 with Erlang/OTP 21.
I'm running the rollback of rihanna migration and appears that the sql that drops the table has a syntax error:
-DROP TABLE(rihanna_jobs);\n
+DROP TABLE rihanna_jobs;\n
Versions:
0.6.0
1.6.6
20.3
The error:
13:32:42.321 [info] execute "DROP TABLE(rihanna_jobs);\n"
** (Postgrex.Error) ERROR 42601 (syntax_error): syntax error at or near "("
(ecto) lib/ecto/adapters/sql.ex:200: Ecto.Adapters.SQL.query!/5
(ecto) lib/ecto/adapters/postgres.ex:96: anonymous fn/4 in Ecto.Adapters.Postgres.execute_ddl/3
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/adapters/postgres.ex:96: Ecto.Adapters.Postgres.execute_ddl/3
(ecto) lib/ecto/migration/runner.ex:104: anonymous fn/2 in Ecto.Migration.Runner.flush/0
(elixir) lib/enum.ex:1899: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto) lib/ecto/migration/runner.ex:102: Ecto.Migration.Runner.flush/0
(stdlib) timer.erl:181: :timer.tc/2
(ecto) lib/ecto/migration/runner.ex:26: Ecto.Migration.Runner.run/6
(ecto) lib/ecto/migrator.ex:128: Ecto.Migrator.attempt/6
(ecto) lib/ecto/migrator.ex:105: anonymous fn/4 in Ecto.Migrator.do_down/4
(ecto) lib/ecto/adapters/sql.ex:576: anonymous fn/3 in Ecto.Adapters.SQL.do_transaction/3
(db_connection) lib/db_connection.ex:1283: DBConnection.transaction_run/4
(db_connection) lib/db_connection.ex:1207: DBConnection.run_begin/3
(db_connection) lib/db_connection.ex:798: DBConnection.transaction/3
(ecto) lib/ecto/migrator.ex:262: anonymous fn/4 in Ecto.Migrator.migrate/4
(elixir) lib/enum.ex:1294: Enum."-map/2-lists^map/1-0-"/2
(elixir) lib/enum.ex:1294: Enum."-map/2-lists^map/1-0-"/2
(ecto) lib/mix/tasks/ecto.rollback.ex:79: anonymous fn/4 in Mix.Tasks.Ecto.Rollback.run/2
(elixir) lib/enum.ex:737: Enum."-each/2-lists^foreach/1-0-"/2
Migration file:
defmodule MyApp.Repo.Migrations.CreateRihannaJobs do
use Rihanna.Migration
end
I would be glad to open a PR fixing this error if it was confirmed to be true.
The current error message is worse than the default NoClauseError in Elixir because it doesn't tell you which argument is the problem.
We can either improve the message or just let Elixir handle it with a NoClauseError.
I'm interested in running jobs periodically, i.e. once a day.
I may be up for implementing this. I see it is planned in the README, do you have opinions on how this should work?
Cheers,
Louis
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.