Giter VIP home page Giter VIP logo

river's People

Contributors

andrewmbenton avatar bgentry avatar brandur avatar cga1123 avatar dependabot[bot] avatar gguinea avatar kamikazechaser avatar pmenglund avatar tc5027 avatar timwmillard avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

river's Issues

Web UI

Hi,

in the docs close to the bottom there is a really nice UI which shows the state of all jobs.

Is the code for that UI public as well?
If yes, how I access it?

Picture of the UI:

web-dashboard-preview 461dc852

Happy holidays!

Start doing GitHub releases

If you start doing Github releases (instead of just pushing tags), any new release that you do will show up on Github feed of anyone that has starred your repository.

For me personally that would be very useful - I plan to use river in a project of mine soon, and it would be easier this way to keep up with new development :)

Separate worker and insertion of jobs.

Hello! Have been playing around with River and wanted to ask if it's possible to have a client without workers just for insertion of jobs so that we are able to have workers on a separate machine from the main server.

Thanks!

Flaky Client stop test

From this run:

--- FAIL: Test_Client (0.00s)
    --- FAIL: Test_Client/Stopped ([10](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:11).14s)
        logger.go:225: time=2024-01-16T15:42:32.755Z level=INFO msg="River client successfully started"
        logger.go:225: time=2024-01-16T15:42:32.757Z level=INFO msg="Election change received" is_leader=false
        logger.go:225: time=2024-01-16T15:42:32.757Z level=INFO msg="producer: Producer started" queue=default
        logger.go:225: time=2024-01-16T15:42:32.757Z level=INFO msg="producer: Run loop started"
        logger.go:225: time=2024-01-16T15:42:32.859Z level=INFO msg="Client[github.com/jackc/pgx/v5.Tx]: Hard stop started; cancelling all work"
        logger.go:225: time=2024-01-16T15:42:32.876Z level=INFO msg="producer: Run loop stopped"
        logger.go:225: time=2024-01-16T15:42:32.876Z level=INFO msg="producer: Producer stopped" queue=default num_completed_jobs=1
        client_test.go:466: context deadline exceeded
        logger.go:225: time=2024-01-16T15:42:37.860Z level=INFO msg="Client[github.com/jackc/pgx/v5.Tx]: Stop started"
        client_test.go:156: 
            	Error Trace:	/home/runner/work/river/river/client_test.go:156
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/testing/testing.go:[11](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:12)69
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/testing/testing.go:[13](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:14)47
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/testing/testing.go:[15](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:16)89
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/runtime/panic.go:541
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/testing/testing.go:999
            	            				/opt/hostedtoolcache/go/1.21.5/x64/src/testing/testing.go:1076
            	            				/home/runner/work/river/river/client_test.go:466
            	Error:      	Received unexpected error:
            	            	context deadline exceeded
            	Test:       	Test_Client/Stopped
FAIL
FAIL	github.com/riverqueue/river	[20](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:21).060s

cc @brandur

run validateQueueName on both insert and worker startup?

Hi there! Just started trying out river and really like it so far. However, I ran into this situation which seems like it could be improved upon:

  1. I created a job with args with this method func (MyJobArgs) InsertOpts { return river.InsertOpts{Queue: "my-queue"} }
  2. I deployed my application and successfully inserted a job with that queue value, but realized I'd forgotten to add that queue to my worker config (i did the insert using an insert-only client)
  3. I deployed again with an entry in the worker config like "my-queue": {}
  4. The application failed to start with the error queue name is invalid, see documentation: "my-queue"

Personally I like hyphens better than underscores but that's not important 😄 it is surprising though that the Insert API doesn't enforce the same validation rules around queue names as the NewClient API. Shouldn't the library validate the queue name in both situations using the same logic? I see that there's a validateJobArgs that only checks job kinds against registered workers, and only does that if workers are registered (which makes sense), but it feels like it should be fine to also have it always check that the job's configured queue could have a workers attached to it.

(Tangential issue: while the error says "see documentation" I didn't really find anything relevant in the docs; I found the validation rules instead by searching for the error in this repo.)

Happy to submit a PR if you agree but don't have the bandwidth ... or maybe there's a reason to enforce in one situation but not the other?

ugly log lines due to generic client

This flaky test run had some logs with a bug ugly prefix due to the generic client type:

logger.go:225: time=[20](https://github.com/riverqueue/river/actions/runs/7544000573/job/20536298471#step:8:21)24-01-16T15:42:37.860Z level=INFO msg="Client[github.com/jackc/pgx/v5.Tx]: Stop started"

It's probably best if this just says Client. PR incoming.

Leadership re-election

Hi, I was reading through the following SQL query for leadership re-election:

-- name: LeadershipAttemptReelect :execrows
INSERT INTO river_leader(name, leader_id, elected_at, expires_at)
  VALUES (@name::text, @leader_id::text, now(), now() + @ttl::interval)
ON CONFLICT (name)
  DO UPDATE SET
    expires_at = now() + @ttl::interval
  WHERE
    EXCLUDED.leader_id = @leader_id::text;

Please correct me if I'm understanding incorrectly, but does the WHERE clause always evaluate to true, which could cause non-leaders to extend the expiry of the current leader?

Should the intent be to compare against the latest leader_id value instead as follows?

...
WHERE
    river_leader.id = EXCLUDED.leader_id;

Docs improvement: when to use river vs temporal

I think it would be helpful to add a section to the docs on when to use River vs other popular services like Temporal.

My use case is the following:

  1. User uploads 1-100 documents.
  2. All documents need to get processed in three steps. All three steps depend on external services. Each step not take more than 5 seconds.
  3. Inform user about progress updates, e.g. how many files were processed already & how many failed

Would it make more sense to use River for such a use case or is Temporal a better choice?

MPL2.0 license?

This project looks great, congratulations on the release. I’ve come into the Go ecosystem from Python and always appreciated the simplicity of Celery with Django.

I’d love to adopt this in some of my projects but am concerned about the LGPL license as Go would statically link the library and I’d need to license my application source code with LGPL. I saw this thread with some more info which described projects like zeromq having a static linking exception, but I couldn’t find any exception in the license files for this repo. There was also this post on LGPL for Go which I found a useful reference.

Would you consider licensing this project with MPL2.0?

Support for client middleware?

I'm integration river into a project and would love to be able to have some kind of a middleware pattern whereby I can inject metadata such as trace IDs, correlation IDs, etc...

I'm looking for something akin to the ClientMiddleware available in Sidekiq, which allows for access to the job object before persistence. The equivalent for ServerMiddleware is more straightforward, as wrapping the Worker interface is easily done!

I think for the time being I'll create a smaller Client interface for river that I propagate through my application which I can then use to wrap the Insert operations with custom logic that have access to the context in a uniform manner.

It might be nice for that to be something that can be configured directly on a *river.Client as a middleware stack?

Would be great to hear thoughts about how others are approaching this problem, and views around this from the maintainers as well! Thank you 😄

Enable "execute hooks" for worked jobs

Why?

  • Opening a span for tracing and stashing it on the ctx
  • Generic "starting job" logging

(Distinct from https://riverqueue.com/docs/subscriptions because this needs to be synchronous / needs direct access to ctx.)

User experience

Define an optional hooks interface

// WorkerWithHooks is a job worker that provides hooks.
type WorkerWithHooks[T JobArgs] interface {
	PreRun(ctx context.Context, job *Job[T]) (context.Context, error) // Naming inspired by cobra's PreRun
}

and then use a type assertion to check if this optional interface is satisfied by a river.Worker.

Implementation

E.g. in jobExecutor{}.execute() you could make it possible to run a hook like

ctx, err := e.WorkUnit.PreWork(ctx)

And you could invoke in wrapperWorkUnit{}.PreWork() similar to Work()

// noOpPreRun is a `PreRun()` function that does nothing.
func noOpPreRun[T JobArgs](ctx context.Context, _ *Job[T]) (context.Context, error) {
	return ctx, nil
}

func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
	preRun := noOpPreRun[T]
	// To make this type assertion cheaper, it should probably happen when
	// `workUnitFactoryWrapper{}` is created (not when `MakeUnit()` is invoked)
	wh, ok := w.worker.(WorkerWithHooks[T])
	if ok && wh.PreRun != nil {
		preRun = wh.PreRun
	}

	return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker, preRun: preRun}
}

// wrapperWorkUnit implements workUnit for a job and Worker.
type wrapperWorkUnit[T JobArgs] struct {
	job    *Job[T] // not set until after UnmarshalJob is invoked
	jobRow *rivertype.JobRow
	worker Worker[T]
	preRun func(ctx context.Context, job *Job[T]) (context.Context, error)
}

Client's `Schema` config does nothing

Unless I'm mistaken, the Schema option that can be sent to a client doesn't do anything.

I think all we need to do is to make sure to run SET SCHEMA 'xxx' on every new connection?

Also, even if this feature did work, the migration tool doesn't currently support an alternate schema, so that needs to be corrected too.

cc @bgentry

Passing a `riverClient` to a worker?

I am trying to trigger a job from within another job (think: send a notification when a long-running job is complete). This requires that I have access to the riverClient in the worker context.

I could pass the client to the worker, like

river.AddWorker[action.SomeArgs](workers, &action.SomeWorker{DbPool: dbPool, RiverClient: riverClient})

but that creates a chicken-egg problem because I need an initialized Workers to create the riverClient.

Adding the worker after initialization does not work either because NewClient does not accept an empty Workers:

workers := river.NewWorkers()

riverClient, err := river.NewClient[pgx.Tx](riverpgxv5.New(dbPool.Pool), &river.Config{
    Workers: workers,
})
if err != nil {
    panic(err)
}

river.AddWorker[action.SomeArgs](workers, &action.SomeWorker{DbPool: dbPool, RiverClient: riverClient})

// panic: at least one Worker must be added to the Workers bundle

Adding some worker that does not depend on the client does actually work, but is kind of awkward:

workers := river.NewWorkers()
river.AddWorker[action.DummyArgs](workers, &action.DummyWorker{})

riverClient, err := river.NewClient[pgx.Tx](riverpgxv5.New(dbPool.Pool), &river.Config{
    Workers: workers,
})
if err != nil {
    panic(err)
}

river.AddWorker[action.SomeArgs](workers, &action.SomeWorker{DbPool: dbPool, RiverClient: riverClient})

// -> both workers work as expected

Is there any other "pattern" to achieve this? If not, I think the status quo should at least be documented somewhere.

Usage of dbutil WithTxV inside standard adapter

Hi maintainers excellent job on this library!

I have two questions, so not an urgent issue, I tried to add a label to this issue but I guess it's not supported if you are not a maintainer.

In dbutil you have this function:

func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(ctx context.Context, tx pgx.Tx) (T, error)) (T, error) {

When I look at the standard adapter, I can see that sometimes it is used:

return dbutil.WithTxV(ctx, a.executor, func(ctx context.Context, tx pgx.Tx) (*dbsqlc.RiverJob, error) {

And sometimes, the transaction is started manually, instead:

tx, err := a.executor.Begin(ctx)

Q1: I can't figure out the reason for this inconsistent use, is it because of the default value for an array being nil if not explicitly given?
Meaning if we modify WithTxV like so, it would avoid the code duplication?

func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(ctx context.Context, tx pgx.Tx) (T, error)) (T, error) {
	var defaultRes T

	tx, err := txBeginner.Begin(ctx)

	// Use reflection to determine if T is a slice, map, or channel and initialize it appropriately.
	tType := reflect.TypeOf(defaultRes)
	switch tType.Kind() {
	case reflect.Slice:
		defaultRes = reflect.MakeSlice(tType, 0, 0).Interface().(T)
	case reflect.Map:
		defaultRes = reflect.MakeMap(tType).Interface().(T)
}
...//rest

Q2:
I probably am missing something somewhere, but is JobSetStateIfRunning not running in a transaction? And if yes, why?
It seems like it calls directly the queries, which in their turn just uses QueryRow via the executor?
I traced the function's use inside the executorer and completer, but it still doesn't grab a transaction.

Thank you very much for your work on this project. This is me trying to understand it a bit better, and not questioning you.

Job uniqueness semantic

After reading the documentation multiple times, it's still unclear wether unique jobs can run simultaneously. I will explain my scenario. I want a system where I have unique jobs which must run at most once concurrently (the job can retry, but they cannot run in parallel). I also want to be able to enqueue the same unique Job at most once. In other words, I'd like to enqueue a unique job, and if it's currently running, enqueue another one. If there's already one running and one queued with the same key, do nothing.

It seems that this could be achieved by using UniqueOpts with a caller defined key and a custom ByState: .... But my challenge is understanding how workers can still pickup the newly enqueued job if there's already a worker executing the currently running job with the same key.

If I understand correctly, the uniqueness is only checked at insertion, thus with the previous scenario, I could still have 2 concurrent workers on the same unique job.

River Schema Migration Improvement

I realize that everyone has their favorite coolaid but applying the migrations with the current pattern makes it a bit difficult to implement. I was wondering if there was a way to run the river CLI tool to generate a SQL patch of the given changes?

Most people have already chosen a tool they like and are actively using it, so having yet another way to modify the SQL schema is a bit distasteful. Now we have two different tools that have write access to a DB that need to be both installed, managed and executed as part of a given deployment.

Given the wide range of options in the ecosystem. (See below), I was wondering if we could just create a SQL

What I'd like to see is something like.

river migrate-up --dry ## or pick your favorite syntax to achieve the goal.

SQL:

-- migrate-up
ALTER TABLE add COLUMN.... 

--migrate-down
ALTER TABLE DROP COLUMN....

How you choose to integrate the SQL into your migration tool is up to you, but at least give me the SQL that will be executed.

Add `river validate` subcommand

Reading https://riverqueue.com/docs/migrations I'm struck by the fact that it may be hard (over time) to figure out if the version of the schema I'm running in my database matches the schema expected by the version river that I've got in my go.mod / go.sum.

I also think having river validate would make it much easier for teams like mine to integrate this into our existing migrations workflow. I.e. we could run river validate in CI and during development and detect if our existing migrations sequence needs to be updated.

(I don't think it'd be the end of the world to add river migrate-up during deploy right next to our migrate up command. However, having the two work together during development is preferable to having two "migrations managers" running during our deploys.)

flaky test: goroutine leak

Ran into this failure in this run:

goleak: Errors on successful test run: found unexpected goroutines:
[Goroutine 1638 in state select, with github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop on top of the stack:
github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop(0xc0002119f8, {0xd08708, 0xc00008e370})
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:64 +0x275
created by github.com/riverqueue/river/internal/util/timeutil.NewTickerWithInitialTick in goroutine 1637
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:39 +0x1a5
 Goroutine 1669 in state select, with github.com/riverqueue/river/internal/maintenance.(*JobCleaner).Start.func1 on top of the stack:
github.com/riverqueue/river/internal/maintenance.(*JobCleaner).Start.func1()
	/home/runner/work/river/river/internal/maintenance/job_cleaner.go:119 +0x3c9
created by github.com/riverqueue/river/internal/maintenance.(*JobCleaner).Start in goroutine 606
	/home/runner/work/river/river/internal/maintenance/job_cleaner.go:109 +0x167
 Goroutine 1637 in state select, with github.com/riverqueue/river/internal/maintenance.(*Rescuer).Start.func1 on top of the stack:
github.com/riverqueue/river/internal/maintenance.(*Rescuer).Start.func1()
	/home/runner/work/river/river/internal/maintenance/rescuer.go:124 +0x3c9
created by github.com/riverqueue/river/internal/maintenance.(*Rescuer).Start in goroutine 606
	/home/runner/work/river/river/internal/maintenance/rescuer.go:114 +0x167
 Goroutine 606 in state select, with github.com/riverqueue/river/internal/baseservice.(*BaseService).CancellableSleep on top of the stack:
github.com/riverqueue/river/internal/baseservice.(*BaseService).CancellableSleep(0xc00007e500, {0xd08708, 0xc00078c6e0}, 0xab3700?)
	/home/runner/work/river/river/internal/baseservice/base_service.go:78 +0x106
github.com/riverqueue/river/internal/baseservice.(*BaseService).CancellableSleepRandomBetween(0xc00007e500, {0xd08708, 0xc00078c6e0}, 0x0, 0x3b9aca00)
	/home/runner/work/river/river/internal/baseservice/base_service.go:90 +0x8a
github.com/riverqueue/river/internal/maintenance.(*PeriodicJobEnqueuer).Start(0xc00007e500, {0xd08708, 0xc00008e320})
	/home/runner/work/river/river/internal/maintenance/periodic_job_enqueuer.go:97 +0x76
github.com/riverqueue/river/internal/maintenance.(*QueueMaintainer).Start(0xc0002213e0, {0xd082a8, 0x10dea40})
	/home/runner/work/river/river/internal/maintenance/queue_maintainer.go:66 +0x124
github.com/riverqueue/river.(*Client[...]).handleLeadershipChange(0xc000282ed4, {0xd082a8?, 0x10dea40}, 0xc00009e0a0)
	/home/runner/work/river/river/client.go:890 +0x229
github.com/riverqueue/river.(*Client[...]).Start.func3()
	/home/runner/work/river/river/client.go:614 +0x165
created by github.com/riverqueue/river.(*Client[...]).Start in goroutine 51
	/home/runner/work/river/river/client.go:601 +0x810
 Goroutine 1670 in state select, with github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop on top of the stack:
github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop(0xc0007223c0, {0xd08708, 0xc00078c5a0})
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:64 +0x275
created by github.com/riverqueue/river/internal/util/timeutil.NewTickerWithInitialTick in goroutine 1669
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:39 +0x1a5
 Goroutine 1667 in state select, with github.com/riverqueue/river/internal/maintenance.(*Scheduler).Start.func1 on top of the stack:
github.com/riverqueue/river/internal/maintenance.(*Scheduler).Start.func1()
	/home/runner/work/river/river/internal/maintenance/scheduler.go:107 +0x3c9
created by github.com/riverqueue/river/internal/maintenance.(*Scheduler).Start in goroutine 606
	/home/runner/work/river/river/internal/maintenance/scheduler.go:97 +0x167
 Goroutine 1668 in state select, with github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop on top of the stack:
github.com/riverqueue/river/internal/util/timeutil.(*TickerWithInitialTick).runLoop(0xc000722318, {0xd08708, 0xc00078c0f0})
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:64 +0x275
created by github.com/riverqueue/river/internal/util/timeutil.NewTickerWithInitialTick in goroutine 1667
	/home/runner/work/river/river/internal/util/timeutil/time_util.go:39 +0x1a5
]
FAIL	github.com/riverqueue/river/rivertest	1.7[49](https://github.com/riverqueue/river/actions/runs/7503208803/job/20427490169#step:8:50)s

Immediately prior, we have these logs from all maintenance services about a closed pool:

time=2024-01-12T14:07:56.515Z level=ERROR msg="Rescuer: Error rescuing jobs" error="error fetching stuck jobs: closed pool"
PASS
time=2024-01-12T14:07:56.631Z level=ERROR msg="Scheduler: Error scheduling jobs" error="error deleting completed jobs: closed pool"
time=2024-01-12T14:07:57.001Z level=ERROR msg="JobCleaner: Error cleaning jobs" error="error deleting completed jobs: closed pool"

And an indication that whichever test was running passed successfully. Nothing in the logs to indicate which test spawned these goroutines afaict, so it could be any of them from github.com/riverqueue/river/rivertest. I skimmed through these, and it doesn't look like there are any issues with a defer where there should be a t.Cleanup() or any other mixup that could cause such an issue.

I dug a bit through the BaseStartStop and our usage of it, but haven't been able to pin down a potential cause for this 🤔 cc @brandur

InsertOpts() Metadata

It seems that Metadata is ignored when provided as a part JobArgsWithInsertOpts.InsertOpts().
When specified as a part of third param to Client.Insert they are applied just fine.

The problem I see is that insertParamsFromArgsAndOptions neglects JobArgsWithInsertOpts meta and always takes what was passed as a function arg. Actually I do agree that the function arg should have higher priority and override what was specified in JobArgsWithInsertOpts.InsertOpts(), but when the arg is empty I'd expect job's meta to take effect.

Important pieces below:

if insertOpts == nil {
	insertOpts = &InsertOpts{}
}

var jobInsertOpts InsertOpts
if argsWithOpts, ok := args.(JobArgsWithInsertOpts); ok {
	jobInsertOpts = argsWithOpts.InsertOpts()
}
metadata := insertOpts.Metadata
if len(metadata) == 0 {
	metadata = []byte("{}")
}

Replacing a jobs args and schedule

I want to insert a job and then override it with newer args and an updated schedule.

In the specific use case, I'd like to trigger a long-running remote process with some data. As soon as the data comes to my service, I trigger the job to start in 45 seconds, but if more data comes in, I want to update the arguments and only have the most recent queued item handled.

Does River have that capability? It looks like the first one wins and duplicate inserts are discarded.

Document Auto-vacuum Starvation

I think we implemented this after your time at Heroku. We finally realized that sometimes the 2 PG auto vacuum processes would both end up working on tables that were taking hours to complete. That kept them from vacuuming the que_jobs table, which would lead to poor performance. We ended up adding a clock process task to manually vacuum our high churn tables at regular intervals, and we haven't seen poor job selection performance since.

It might be worthwhile to add something similar to what que has to your docs to mitigate this potential issue for river users.

flaky StopAndCancel test in CI

GitHub Actions continues to surprise us with timing issues @brandur:

client_test.go:546: Waiting on job to be done
        client_test.go:559: 
            	Error Trace:	/home/runner/work/river/river/client_test.go:559
            	Error:      	Max difference between 2023-11-23 18:02:54.632264664 +0000 UTC m=+0.706377072 and 2023-11-23 18:02:54.531149022 +0000 UTC m=+0.6052614[30](https://github.com/riverqueue/river/actions/runs/6973035679/job/18976333126#step:8:31) allowed is 100ms, but difference was 101.115642ms
            	Test:       	Test_Client_StopAndCancel/jobs_in_progress,_only_completing_when_context_is_canceled

Intermittent test failure: `TestPeriodicJobEnqueuer/EnqueuesPeriodicJobs`

Another intermittent failure. This time in the periodic job enqueuer.

Sample failure run:

https://github.com/riverqueue/river/actions/runs/7956225078/job/21716534964?pr=214

--- FAIL: TestPeriodicJobEnqueuer (0.00s)
    --- FAIL: TestPeriodicJobEnqueuer/EnqueuesPeriodicJobs (1.57s)
        logger.go:225: time=2024-02-19T07:53:42.969Z level=INFO msg="PeriodicJobEnqueuer: Run loop started"
        periodic_job_enqueuer_test.go:112: 
            	Error Trace:	/home/runner/work/river/river/internal/maintenance/periodic_job_enqueuer_test.go:82
            	            				/home/runner/work/river/river/internal/maintenance/periodic_job_enqueuer_test.go:112
            	Error:      	"[%!s(*dbsqlc.RiverJob=&{45 [1[23](https://github.com/riverqueue/river/actions/runs/7956225078/job/21716534964?pr=214#step:8:24) 125] 0 <nil> [] {482531000 63843926023 0x107e100} [] <nil> periodic_job_500ms 25 [123 125] 1 default available {482709000 63843926023 0x107e100} []}) %!s(*dbsqlc.RiverJob=&{46 [123 125] 0 <nil> [] {983499000 63843926023 0x107e100} [] <nil> periodic_job_500ms 25 [123 125] 1 default available {983650000 63843926023 0x107e100} []})]" should have 3 item(s), but has 2
            	Test:       	TestPeriodicJobEnqueuer/EnqueuesPeriodicJobs
            	Messages:   	Expected to find exactly 3 job(s) of kind: periodic_job_500ms, but found 2
        logger.go:225: time=20[24](https://github.com/riverqueue/river/actions/runs/7956225078/job/21716534964?pr=214#step:8:25)-02-19T07:53:44.472Z level=INFO msg="PeriodicJobEnqueuer: Run loop stopped"
FAIL
FAIL	github.com/riverqueue/river/internal/maintenance	1.761s

Enqueue problem (PostgreSQL 22P02)

Have been playing around with River and I get an error when adding a job to the queue. I define 3 string arg as job args.

The error I received is as follows: error getting unique job: ERROR: invalid input syntax for type json (SQLSTATE 22P02)

type ExampleJobArgs struct {
	Email  string `json:"email"`
	UserID string `json:"user_id"`
}

func (ExampleJobArgs) Kind() string {
	return "activation_email"
}

func (ExampleJobArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{
		UniqueOpts: river.UniqueOpts{
			ByArgs:   true,
			ByPeriod: 4 * time.Hour,
		},
	}
}

// ...
type api struct { queue *river.Client[pgx.Tx] }

queue, err := river.NewClient(riverpgxv5.New(postgres), &river.Config{})
if err != nil {
	panic(err)
}

// ...

func (a *api) enqueueHandler(w http.ResponseWriter, r *http.Request) {
	ctx, cancel := context.WithCancel(r.Context())
	defer cancel()

	if _, err = a.queue.Insert(ctx, worker.ExampleJobArgs{
		Email:    "[email protected]",
		UserID:  "1",
	}, nil); err != nil {
		// ...
	}

	// ...
}

My periodic jobs works correctly, but I only get this error when enqueueing from API.

Updating job metadata while running

I use jobs to perform unique short-to-medium-running tasks. The tasks are not just some background stuff, but something a user can observe.
The nature of a task lets me roughly estimate its progress (I paginate through an API), so I'm considering writing that progress to job metadata column once in a while. The reason is showing a rough progress estimation is much better user experience than just displaying a spinner for indefinite amount of time.

Yes, I am aware river API does not let me do that, but I have no problem with doing that myself. Just wanted to ask if that's a good idea and what are the chances of me breaking river job processing logic if I do so.

Gorm integration

Hi, great project!

I'm looking at using this to replace Asynq in my application and remove the Redis dependency. I'm wondering how I can integrate this with Gorm and insert jobs as part of a Gorm transaction - they are both using the PGX driver so it should be possible but Gorm is wrapping transactions in layers of abstraction. A dirty workaround might be to get Gorm to execute the insert query here: https://github.com/riverqueue/river/blob/master/internal/dbsqlc/river_job.sql.go#L450 but that doesn't seem ideal.

Gorm is a popular ORM so I expect easy integration would improve adoption of River...

v0.0.8 migration fails when there are existing jobs with `NULL` tags

I've got a database with a bunch of jobs still around in completed state, which causes the migration to fail:

failed: error applying version 003 [UP]: ERROR: column "tags" of relation "river_job" contains null values (SQLSTATE 23502)

Seems like the migration could update all existing rows to have [], assuming there's a way to do that without race conditions. I guess you'd have to lock the entire table?

In my case I think I'll just TRUNCATE or DELETE FROM the full river_job table prior to running the migration.

Intermittent test failure: `Test_Client_InsertTriggersImmediateWork`

Notced this one in #212, but since it's not reproducible, and I've definitely seen it many times before from time to time, I don't believe it's related to my change. Even doing local run counts of 5000 iterations with -race, I'm unable to repro.

Sample failure:

https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212

Run go test -p 1 -race ./...
--- FAIL: Test_Client_InsertTriggersImmediateWork (5.14s)
    logger.go:225: time=2024-02-19T06:52:27.013Z level=INFO msg="River client successfully started" client_id=01HQ026AZWE2GNHS5GK2[8](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:9)383[9](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:10)P
    logger.go:225: time=2024-02-19T06:52:27.013Z level=INFO msg="Election change received" is_leader=false
    logger.go:225: time=2024-02-19T06:52:27.013Z level=INFO msg="producer: Producer started" queue=default
    logger.go:225: time=2024-02-19T06:52:27.013Z level=INFO msg="producer: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.[10](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:11)6Z level=INFO msg="Election change received" is_leader=true
    logger.go:225: time=2024-02-19T06:52:27.106Z level=INFO msg="PeriodicJobEnqueuer: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.106Z level=INFO msg="Reindexer: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.106Z level=INFO msg="Scheduler: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.106Z level=INFO msg="JobCleaner: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.106Z level=INFO msg="Rescuer: Run loop started"
    logger.go:225: time=2024-02-19T06:52:27.109Z level=INFO msg="Rescuer: Ran successfully" num_jobs_discarded=0 num_jobs_retry_scheduled=0
    logger.go:225: time=2024-02-19T06:52:27.[11](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:12)1Z level=INFO msg="JobCleaner: Ran successfully" num_jobs_deleted=0
    logger.go:225: time=2024-02-19T06:52:27.[13](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:14)4Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:27.611Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:28.110Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:28.609Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:29.110Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:29.610Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:30.110Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:30.611Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:31.109Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:31.609Z level=INFO msg="Scheduler: Ran successfully" num_jobs_scheduled=0
    logger.go:225: time=2024-02-19T06:52:32.0[14](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:15)Z level=INFO msg="Client: Job stats (since last stats line)" num_jobs_run=1 average_complete_duration=83.[15](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:16)9736ms average_queue_wait_duration=67.223225ms average_run_duration=58.96µs
    logger.go:225: time=2024-02-19T06:52:32.015Z level=INFO msg="producer: Heartbeat" num_completed_jobs=1 num_jobs_running=0 queue=default
    client_test.go:2462: timed out waiting for 2nd job to start
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="Client: Stop started"
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="producer: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="Election change received" is_leader=false
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="producer: Producer stopped" queue=default num_completed_jobs=1
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="PeriodicJobEnqueuer: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.098Z level=INFO msg="Rescuer: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.099Z level=INFO msg="Scheduler: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.099Z level=INFO msg="Reindexer: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.099Z level=INFO msg="JobCleaner: Run loop stopped"
    logger.go:225: time=2024-02-19T06:52:32.100Z level=INFO msg="Client: All services stopped"
    logger.go:225: time=2024-02-19T06:52:32.100Z level=INFO msg="Client: Stop complete"
FAIL
FAIL	github.com/riverqueue/river	[18](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:19).0[35](https://github.com/riverqueue/river/actions/runs/7955682330/job/21715052284?pr=212#step:8:36)s

Rate limit queue

Hi, wondering if there is a way to rate limit a queue such as 10 jobs per minute, etc? Otherwise, is there a suggested alternative? Thanks!

Make SQL Migrations accessible as a method

Thank you for publishing such a great library. The library fits exactly my needs for private project I would to like to open source once its reaches stability.

Currently I am doing database migrations with https://github.com/pressly/goose on the startup of the app. This makes it easy to deploy on hosters like heroku or fly.io.

Would it be possible to make the functionality of 'river migrate-up' accessible as method? I know this could be a conflict for the leader election if every instance tries to do the migrations on startup but it should be no problem on a single instance.

If you are open to it, I could try to create a pull request for this changes.

Worker Coordinator fails with PgBouncer due to unsupported statement_timeout

Thanks for the extensive documentation on using river with PgBouncer!

I’m still having a small issue with river and PgBouncer - when the worker tries to connect, it fails (wrapped for better readability):

river client start: error making initial connection to database:
failed to connect to `host=... user=... database=...`:
server error (FATAL: unsupported startup parameter: statement_timeout (SQLSTATE 08P01))

PgBouncer doesn’t seem to support the statement_timeout parameter. Disabling the statement_timeout in the Notifier by removing the line makes the worker run. The comment above this line indicates that the timeout is necessary, though:

// Rely on an overall statement timeout instead of setting identical context timeouts on every query:

Side Note: PgBouncer does seem to have an option to ignore certain startup parameters, but I’m a bit wary of doing so, given that ignoring the statement timeout might have hard-to-notice side effects for other applications. I’d rather have the applications fail while connecting and adapt them however necessary.

My questions:

  1. Do you expect river to be negatively impacted by removing the statement timeout? I interpret the comment mentioned above as the statement timeout being required for reliable operation, although I'm wondering whether context-based timeouts would be necessary anyway to properly deal with network issues that would prevent river from receiving a response from Postgres. I found at least one context-based timeout in the Notifier, I'm not sure that's enough to cover all cases, though.
  2. If river is able to work reliably without the statement timeout, would you be open to making the timeout optional in some way or replace it with context-based timeouts where necessary?

I’m happy to submit a PR if you think a change in river makes sense.

invalid go version '1.21.4'

Attempting to run go install github.com/riverqueue/river/cmd/river@latest yields the error invalid go version '1.21.4': must match format 1.23

Periodic JobArgs lost on subsequent invocations

Hi River devs, I'm trying out the periodic job queue. I was hoping to be able to insert a periodic job and have it run at regular intervals, using the PeriodicJobArgs until it's no longer needed. I see the first run of the job with the correct JobArgs values, but subsequent runs no longer have access to the JobArgs. Apologies if this is an obvious omission on my part. I suppose I could get around this by using a single-shot job and re-inserting a new job before exiting the Work() function, but that defeats the object of the periodic job!

Following taken from the periodic job example, with updates to get around no access to internal imports, and adding a transaction to insert a job. Any suggestions welcomed 😄

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	//"github.com/riverqueue/river/internal/riverinternaltest"
	//"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type PeriodicJobArgs struct {
	JobVal string
}

// Kind is the unique string name for this job.
func (PeriodicJobArgs) Kind() string { return "periodic" }

// PeriodicJobWorker is a job worker for sorting strings.
type PeriodicJobWorker struct {
	river.WorkerDefaults[PeriodicJobArgs]
}

func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error {
	fmt.Printf("This job will run once immediately with JobVal:%s then approximately once every 30 secs\n", job.Args.JobVal)
	return nil
}

var job1Args PeriodicJobArgs = PeriodicJobArgs{
	JobVal: "11112222",
}

// Example_periodicJob demonstrates the use of a periodic job.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// // Required for the purpose of this test, but not necessary in real usage.
	// if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
	// 	panic(err)
	// }

	workers := river.NewWorkers()
	river.AddWorker(workers, &PeriodicJobWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		//Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		PeriodicJobs: []*river.PeriodicJob{
			river.NewPeriodicJob(
				river.PeriodicInterval(30*time.Second),
				func() (river.JobArgs, *river.InsertOpts) {
					return PeriodicJobArgs{}, nil
				},
				&river.PeriodicJobOpts{RunOnStart: true},
			),
		},
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 2},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	// Insert a transaction to run periodically
	_, err = riverClient.InsertTx(ctx, tx, job1Args, nil)
	if err != nil {
		// handle error
		fmt.Printf("Error inserting transaction for jobArg:%s, error:%s\n", job1Args.JobVal, err.Error())
	}
	tx.Commit(ctx)

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	// There's no need to explicitly insert a periodic job. One will be inserted
	// (and worked soon after) as the client starts up.
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 10)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) {
	var (
		timeout  = 10 * time.Hour
		deadline = time.Now().Add(timeout)
		events   = make([]*river.Event, 0, numJobs)
	)

	for {
		select {
		case event := <-subscribeChan:
			events = append(events, event)

			if len(events) >= numJobs {
				fmt.Printf("events:%d numJobs:%d\n", len(events), numJobs)

				return
			}

		case <-time.After(time.Until(deadline)):
			panic(fmt.Sprintf("WaitOrTimeout timed out after waiting %s (received %d job(s), wanted %d)",
				timeout, len(events), numJobs))
		}
	}
}

Output is:

This job will run once immediately with JobVal:11112222 then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs

Is elector namespace unnecessary?

I came across this section while working on something else today:

river/client.go

Lines 446 to 449 in 6771fd3

// TODO: for now we only support a single instance per database/schema.
// If we want to provide isolation within a single database/schema,
// we'll need to add a config for this.
instanceName := "default"

This instanceName variable is only used in the subsequent initialization of the leader election routine. I think the original intention here was that if we had a config for which Postgres schema to use, that the leader election would need to be scoped to that particular schema. But given how our support for multiple schemas evolved as available by default accidentally (merely due to them being configurable in the pgx config) I'm not sure if this is actually needed at all anymore. Won't the elector inherit the schema from the pgx pool config, and thus automatically be acting on the correct schema with no additional configuration?

Should we just remove the variable altogether @brandur?

Crdb support?

I expect cockroachdb would probably work fine, because it uses pgx, but I wanted to ask if you had any thoughts about potential issues with using it with river. My main concern is that crdb requires transaction retry and so there are wrappers used with pgx for commit, see https://pkg.go.dev/github.com/cockroachdb/cockroach-go/crdb/crdbpgx and their ExecuteTx method. I would guess it should be fine, I can call ExecuteTx from within the job maybe, but I worry I'm missing obvious conflicts with how river may use transactions under the hood. I think there is good pairing between rivers goals and crdb, though, and so I'm hoping any potential conflicts are minor.

(pgx v5 version of that wrapper library https://pkg.go.dev/github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5 )

Support abandoning jobs that take too long to cancel

Context

Hiya, thanks for making River! Really digging it so far.

I have an app that queues jobs that I don't really need to wait to complete before exiting, even after cancelling. I just want to guarantee fast deploys and let jobs just get tossed around like a hot potato during a rolling deploy.

Before I continue: this isn't blocking me, and it's a nuanced topic on an edge case within an edge case, so no rush.

StopAndCancel is almost what I want. Cancelled jobs end up in a retryable state and they either get picked back up elsewhere or recovered when the instance comes back. The one case it doesn't handle is when jobs take too long to stop (typically by not respecting ctx.Done(), likely a bug).

I tried to pass a timeout ctx to StopAndCancel, but it doesn't currently respect it:

stopCtx, stop := context.WithTimeout(context.Background(), 10*time.Second)
defer stop()
if err := db.workerClient.StopAndCancel(stopCtx); err != nil {
	slog.Error("failed to stop and cancel workers", "error", err)
}
if stopCtx.Err() != nil {
	slog.Warn("timed out cleaning up jobs")
}

With a job that does time.Sleep(time.Minute) the StopAndCancel will just wait the full 1 minute instead of being interrupted after 10 seconds. This seems like a simple bug, happy to submit a PR.

Even if it did respect the timeout, there's a secondary problem: if we bail out at that point, the worker's jobs will be left in a running state. At this point the job is stuck, and if you have uniqueness rules like I do that'll prevent future attempts too. As far as I can tell the only recourse at this point is to delete the job from the database or set its state to retryable.

Proposal

This might be problematic for other use cases, but at least for mine, this would be a big help:

  1. StopAndCancel respects ctx cancellation
  2. When StopAndCancel times out it marks all running jobs on the local worker as retryable

Alternatively, having some way to do (2) independently would be fine. For example I could do a db.workerClient.Abandon(stopCtx) in the if stopCtx.Err() != nil { branch above.

This would allow graceful shutdown in most cases, prevent hanging in the "truly stuck" case, and allow jobs to be retried in all cases except kill -9.

Alternative

The root of the problem here is really the stuck running jobs. I would also be OK with leaving everything as-is and instead just enforcing a stop timeout at the infra layer, letting the app get kill -9ed, and having River automatically detect when a running job is abandoned so things aren't stuck forever.

Unique periodic jobs

The documentation states it is possible to combine unique and periodic jobs:

Fortunately, many of these concerns can be addressed by combining periodic jobs with unique jobs and the RunOnStart option. For example, a job which is configured to be unique at the hourly level will only enqueue once in that hour no matter how many times it's attempted.

Unfortunately I was not able to make it work. After some source code studying I found out that periodic jobs are inserted using JobInsertMany. That explains why uniqueness does not take effect, as the docs say:

...which is why job uniqueness is not supported on batch insertion.

Is it something that needs to be fixed, or am I just missing something?

Skip the `jobExecutor: Job errored` log line if `ErrorHandler.HandleError()` is implemented

[This may be considered more a question than a feature request so if we have to move to discussions that feels OK!]

I am implementing a struct to satisfy river.ErrorHandler because I want a catch-all to send off (some) failures to Sentry for error reporting.

In the handler I also added in logging (it felt like a good place to add logging) and it resulted in double logging:

time=2023-12-13T15:54:37.472-06:00 level=ERROR msg="jobExecutor: Job errored" error="sadly, I always fail; 0" job_id=30
{"level":"error","error":"sadly, I always fail; 0","job_id":30,"job":{"ID":30,"Attempt":1,"AttemptedAt":"2023-12-13T15:54:37.469843-06:00","AttemptedBy":["01HHJJSBWNF3EQAGPDSJ0304WA"],"CreatedAt":"2023-12-13T15:54:37.466116-06:00","EncodedArgs":"eyJ2YWx1ZSI6IDB9","Errors":[],"FinalizedAt":null,"Kind":"square","MaxAttempts":25,"Priority":1,"Queue":"default","ScheduledAt":"2023-12-13T21:54:37.466116Z","State":"running","Tags":[]},"time":"2023-12-13T15:54:37-06:00","message":"river job errored"}
time=2023-12-13T15:54:38.600-06:00 level=ERROR msg="jobExecutor: Job errored" error="sadly, I always fail; 0" job_id=30
{"level":"error","error":"sadly, I always fail; 0","job_id":30,"job":{"ID":30,"Attempt":2,"AttemptedAt":"2023-12-13T15:54:38.594818-06:00","AttemptedBy":["01HHJJSBWNF3EQAGPDSJ0304WA","01HHJJSBWNF3EQAGPDSJ0304WA"],"CreatedAt":"2023-12-13T15:54:37.466116-06:00","EncodedArgs":"eyJ2YWx1ZSI6IDB9","Errors":[{"at":"2023-12-13T21:54:37.47285Z","attempt":1,"error":"sadly, I always fail; 0","trace":""}],"FinalizedAt":null,"Kind":"square","MaxAttempts":25,"Priority":1,"Queue":"default","ScheduledAt":"2023-12-13T21:54:38.546433Z","State":"running","Tags":[]},"time":"2023-12-13T15:54:38-06:00","message":"river job errored"}
time=2023-12-13T15:54:42.612-06:00 level=ERROR msg="jobExecutor: Job errored" error="sadly, I always fail; 9" job_id=31
{"level":"error","error":"sadly, I always fail; 9","job_id":31,"job":{"ID":31,"Attempt":1,"AttemptedAt":"2023-12-13T15:54:42.59942-06:00","AttemptedBy":["01HHJJSBWNF3EQAGPDSJ0304WA"],"CreatedAt":"2023-12-13T15:54:42.467549-06:00","EncodedArgs":"eyJ2YWx1ZSI6IDN9","Errors":[],"FinalizedAt":null,"Kind":"square","MaxAttempts":25,"Priority":1,"Queue":"default","ScheduledAt":"2023-12-13T21:54:42.467549Z","State":"running","Tags":[]},"time":"2023-12-13T15:54:42-06:00","message":"river job errored"}
time=2023-12-13T15:54:43.608-06:00 level=ERROR msg="jobExecutor: Job errored" error="sadly, I always fail; 9" job_id=31
{"level":"error","error":"sadly, I always fail; 9","job_id":31,"job":{"ID":31,"Attempt":2,"AttemptedAt":"2023-12-13T15:54:43.600428-06:00","AttemptedBy":["01HHJJSBWNF3EQAGPDSJ0304WA","01HHJJSBWNF3EQAGPDSJ0304WA"],"CreatedAt":"2023-12-13T15:54:42.467549-06:00","EncodedArgs":"eyJ2YWx1ZSI6IDN9","Errors":[{"at":"2023-12-13T21:54:42.612759Z","attempt":1,"error":"sadly, I always fail; 9","trace":""}],"FinalizedAt":null,"Kind":"square","MaxAttempts":25,"Priority":1,"Queue":"default","ScheduledAt":"2023-12-13T21:54:43.521708Z","State":"running","Tags":[]},"time":"2023-12-13T15:54:43-06:00","message":"river job errored"}
...

Is the recommended thing to do here just to avoid any logging in my HandleError() / HandlePanic() and add my own custom logger in river.Config{}.Logger?

(😆 I have added a customer Logger yet because I haven't found or implemented a suitable slog-compatible wrapper for zerolog.)

Is it possible to have the worker run in a separate process or is this a non-goal?

River works great but if I try enqueue a job that has a worker running in a separate process (and thus using a different River client) I get a:

job kind is not registered in the client's Workers bundle: <job kind>

I dug into the code a little and indeed it checks for workers that were registered on that same client:

if _, ok := c.config.Workers.workersMap[args.Kind()]; !ok {
     return &UnknownJobKindError{Kind: args.Kind()}
}

I would like to run some workers in a separate process because some of the jobs require a lot of memory and these processes are more likely to get OOM killed (or potentially have CPU starvation). I don't want these jobs to disrupt the response time or reliability of the main process which serves HTTP requests.

Is this something that will be supported in the future or a non-goal for the project at the moment?

API naming inconsistency

I realized while writing up the changelog entry for #117 that I had named that API JobList(), while I named the API from #141 Cancel() with no Job prefix. We have other job-related query APIs to add like Get, RetryImmediately, etc. These should probably all either have a prefix or not have one, but without inconsistency.

Thoughts @brandur? Should be an easy fix as we are still prerelease on the new API.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.