Giter VIP home page Giter VIP logo

graphile / worker Goto Github PK

View Code? Open in Web Editor NEW
1.8K 15.0 96.0 4.91 MB

High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)

Home Page: http://worker.graphile.org/

License: MIT License

PLpgSQL 17.68% TypeScript 62.20% JavaScript 9.82% Dockerfile 0.14% Shell 0.12% CSS 4.16% Python 0.13% MDX 5.75%
postgresql node worker job-queue graphile nodejs postgres queue

worker's Introduction

graphile-worker

Patreon sponsor button Discord chat room Package on npm MIT license Follow

Job queue for PostgreSQL running on Node.js - allows you to run jobs (e.g. sending emails, performing calculations, generating PDFs, etc) "in the background" so that your HTTP response/application code is not held up. Can be used with any PostgreSQL-backed application. Pairs beautifully with PostGraphile or PostgREST.

Crowd-funded open-source software

To help us develop this software sustainably, we ask all individuals and businesses that use it to help support its ongoing maintenance and development via sponsorship.

And please give some love to our featured sponsors 🤩:

The Guild
The Guild
*
Dovetail
Dovetail
*
Stellate
Stellate
*
Steelhead
Steelhead
*
LatchBio
LatchBio
*
Trigger.dev
Trigger.dev

* Sponsors the entire Graphile suite

worker's People

Contributors

archlemon avatar benjie avatar blimmer avatar bmhaskar avatar christophemacabiau avatar commanderroot avatar countcain avatar danieldiekmeier avatar dependabot[bot] avatar evolkmann avatar gaetan-craft avatar garcianavalon avatar geekuillaume avatar gregplaysguitar avatar hgl avatar jcapcik avatar jemgillam avatar joshbwlng avatar keepitsimple avatar madflow avatar moustacheful avatar nmummau avatar nodesocket avatar psteinroe avatar rattrayalex avatar singingwolfboy avatar spiffytech avatar thhareau avatar timelf123 avatar winetgh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

worker's Issues

inconsistent job table columns

installing via:

npx graphile-worker -c $DB_CONNECTION_STRING --schema-only

there is no locked at or locked_at or locked_by

image

but after adding a job, they appear

image

this causes me an issue when querying via graphql the exposed schema

Cancel/modify delayed jobs

I'm exploring a case where I need to be able to somehow cancel or modify jobs in the graphile worker queue which have been delayed using the run_at argument. Looking at the source, it seems this would be trivial to achieve by deleting from jobs by id, but because the id is a serial, I would need to store it somewhere. If the id could be pre-generated (as a uuid4 or something similar) and passed to the add_job function, then I could generate the same id again in order to delete or update the job. So the changes would be

  • use a uuid for the jobs primary key instead of bigserial
  • default it to gen_random_uuid but allow the id to be provided as an optional argument to add_job

Is there some reason I haven't considered, why this would be a bad idea? I'm assuming the insert performance would be better if the sequence was removed, but unsure how it would impact reads from the job table.

Concurrency by job_key?

From the docs

Note: If a job is updated using add_job once it is already running or completed, the second job will be scheduled separately, meaning both will run. Likewise, calling remove_job for a running or completed job is a no-op.

I have a job which I can run concurrently only if certain parameters are different, so I use these parameters as part of the job_key. So if a job is already running then it schedules a second job. I want the job to be queued until all the jobs with the same job_key are completed, which could be seen as a concurrency limit per job_key. Is there any way to achieve this currently using worker? I'm not really familiar with Postgres so have only been looking at the Node API.

addJob - helper function

I'm using this as lightweight event bus. There are a bunch of places in my platform where events need to be produced from random code blocks. The intent of this library appears to be queue messages from db triggers, but in my case there is no db corresponding db write and synthesizing one seems hamfisted.

Currently, the addJob function is attached to the runner instance. It's be cool if the addJob function was exported separately that either generate the query and the call params or take a db connection and run the insert. The reason for the connection and not a pool is that cloud functions dont do well with pools and people might want to queue messages from webhooks or something.

The main argument for exporting this and having it travel with this library is that if the db schema changes or something, it would also get updated.

If you are open to it, I'll do the work. Would you accept a PR that exports the addJob function standalone?

If not, this is no big deal. It is easy to queue with the sproc. Thank you for making this lib. :)

PgPools don't have error handlers

worker/src/runner.ts

Lines 39 to 43 in a8070ae

pgPool = new Pool({ connectionString: options.connectionString });
releasers.push(() => pgPool.end());
} else if (process.env.DATABASE_URL) {
pgPool = new Pool({ connectionString: process.env.DATABASE_URL });
releasers.push(() => pgPool.end());

Should have .on('error', ...) handlers to prevent node crashing when PostgreSQL restarts.

Run once alias option not working

$ /tmp/aaa/node_modules/.bin/graphile-worker --help
Optionen:
  --help            Hilfe anzeigen                                     [boolean]
  --version         Version anzeigen                                   [boolean]
  --connection, -c  Database connection string, defaults to the 'DATABASE_URL'
                    envvar                                              [string]
  --once, -1        Run until there are no runnable jobs left, then exit
                                                     [boolean] [Standard: false]
  --watch, -w       [EXPERIMENTAL] Watch task files for changes, automatically
                    reloading the task code without restarting worker
                                                     [boolean] [Standard: false]
  --jobs, -j        number of jobs to run concurrently             [Standard: 1]
  --poll-interval   how long to wait between polling for jobs in milliseconds
                    (for jobs scheduled in the future/retries)
                                                         [Zahl] [Standard: 2000]
  • yarn run graphile-worker -c invalid => Alias is working
  • yarn run graphile-worker -1 => Alias is not working
  • yarn run graphile-worker --1 => Alias is working

queueName is not used

From the code it seems like the queueName is not actually used in any way.

What is it for?

Thanks!

Is task de-duplication supported?

Hi,

I really like the idea of a postgresql based job queue and graphile worker in particular.

My only concern is whether there is a way to prevent the same task to be added to the queue twice (by multiple parallel processes somehow wanting to schedule the very same task).

In Google Cloud Tasks de-duplication is supported by allowing the task creator to specify an ID for the task.

Can a similar behaviour somehow be achieved in graphile worker?

One solution I could see is adding a unique index on the job payload like this:

CREATE UNIQUE INDEX unique_payload ON table graphile_worker.jobs(payload);

Then the task creator would be responsible to include a UUID or some other data that makes each task payload unique and the unique index would make a job addition fail if the same was added twice.

Is this solution feasible or is there some better way to do this?

Worker Test Example

Was looking through the code for how to test as its not really clear from the readme. Here is something I hacked up.

Would appreciate any feedback on it. Once its decent I would be happy to make a pr. This example is for jest with @babel/preset-env

Thanks for the great lib!

// tasks/hello.js
export default async (payload, helpers) => {
  const { name } = payload;
  helpers.logger.info(`Hello, ${name}`);
};
// test/tasks/hello.test.js
import hello from '../../src/tasks/hello';
import config from 'config'; // 
import { runOnce, quickAddJob } from 'graphile-worker';

const connectionString = config.get('pg_db_url'); // i.e. "postgres://user:pass@host:port/db?ssl=true"
const payloadName = 'Bobby Tables';

// based on https://github.com/graphile/worker#quickstart-library
describe('hello task', () => {
  it('does things', async (done) => {
    const spy = jest.spyOn(global.console, 'info');
    // Or add a job to be executed:
    const job = await quickAddJob(
      // makeWorkerUtils options
      { connectionString },

      // Task identifier
      'hello',

      // Payload
      { name: payloadName }
    );

    console.log(job);
    expect(job).toBeTruthy();

    // Run a worker to execute jobs:
    // eslint-disable-next-line
    const runner = await runOnce({
      connectionString,
      concurrency: 5,
      // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
      noHandleSignals: false,
      pollInterval: 1000,
      // you can set the taskList or taskDirectory but not both
      taskList: {
        hello,
      },
      // or:
      //   taskDirectory: `${__dirname}/tasks`,
    });

    // until https://github.com/graphile/worker/issues/28 is resolved with events
    // or i write my own event emmitter. we'll just test the logger
    //
    // this also is ok https://github.com/bpedersen/jest-mock-console
    // but i don't want to swallow logs this early in teh dev cycle
    //
    // https://stackoverflow.com/a/56677834/511710
    // https://jest-bot.github.io/jest/docs/expect.html#expectstringcontainingstring

    /**
     * graphile-worker's build in logger passes console calls like this
     * 
     * 1: "[%s%s] %s: %s", "job", "(worker-e03827077c435f77f5: hello{15})", "INFO", "Hello, Bobby Tables"
     * 2: "[%s%s] %s: %s", "worker", "(worker-e03827077c435f77f5)", "INFO", "Completed task 15 (hello) with success (20.03ms)"
     * 
     */
     expect(spy).toHaveBeenCalledWith(
      expect.anything(),
      expect.anything(),
      expect.anything(),
      expect.anything(),
      expect.stringContaining(payloadName)
    );

    done();
  });
});

Same Postgres configuration API as PostGraphile

Postgraphile api supports any configuration object compatible with the pg library. That includes the standard PGX environment variables used in many systems configuration.

It would be a nice feature to support this environment variables for compatibility with environments that are not using connection string. Furthermore, having the same API with postgraphile makes it easier to use together in the same project.

Unable to run perfTest with docker-compose

I'm trying to run the perfTest according to the README.

I'm running it on macOS with Docker 19.03.4, Docker Compose 1.25.2

I ran

docker-compose up

Then in another terminal:

docker-compose exec app ./perfTest/run.sh

I always get this error:

> docker-compose exec app ./perfTest/run.sh
Recreating database graphile_worker_test
error: database "graphile_worker_test" already exists
    at Connection.parseE (/app/node_modules/pg/lib/connection.js:604:11)
    at Connection.parseMessage (/app/node_modules/pg/lib/connection.js:401:19)
    at Socket.<anonymous> (/app/node_modules/pg/lib/connection.js:121:22)
    at Socket.emit (events.js:223:5)
    at addChunk (_stream_readable.js:309:12)
    at readableAddChunk (_stream_readable.js:290:11)
    at Socket.Readable.push (_stream_readable.js:224:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:181:23) {
  name: 'error',
  length: 103,
  severity: 'ERROR',
  code: '42P04',
  detail: undefined,
  hint: undefined,
  position: undefined,
  internalPosition: undefined,
  internalQuery: undefined,
  where: undefined,
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'dbcommands.c',
  line: '480',
  routine: 'createdb'
}

This is the output from docker-compose up:

Starting graphile-worker_db_1  ... done
Starting graphile-worker_app_1 ... done
Attaching to graphile-worker_db_1, graphile-worker_app_1
db_1   | The files belonging to this database system will be owned by user "postgres".
db_1   | This user must also own the server process.
db_1   | 
db_1   | The database cluster will be initialized with locale "en_US.utf8".
db_1   | The default database encoding has accordingly been set to "UTF8".
db_1   | The default text search configuration will be set to "english".
db_1   | 
db_1   | Data page checksums are disabled.
db_1   | 
db_1   | fixing permissions on existing directory /var/lib/postgresql/data ... ok
db_1   | creating subdirectories ... ok
db_1   | selecting default max_connections ... 100
db_1   | selecting default shared_buffers ... 128MB
db_1   | selecting default timezone ... UTC
db_1   | selecting dynamic shared memory implementation ... posix
db_1   | creating configuration files ... ok
db_1   | running bootstrap script ... ok
app_1  | yarn run v1.21.1
app_1  | $ mkdir -p dist && touch dist/cli.js && chmod +x dist/cli.js && tsc --watch
db_1   | performing post-bootstrap initialization ... sh: locale: not found
db_1   | 2020-01-28 19:13:21.998 UTC [26] WARNING:  no usable system locales were found
app_1  | [7:13:22 PM] Starting compilation in watch mode...
app_1  | 
db_1   | ok
db_1   | syncing data to disk ... ok
db_1   | 
db_1   | Success. You can now start the database server using:
db_1   | 
db_1   |     pg_ctl -D /var/lib/postgresql/data -l logfile start
db_1   | 
db_1   | 
db_1   | WARNING: enabling "trust" authentication for local connections
db_1   | You can change this by editing pg_hba.conf or using the option -A, or
db_1   | --auth-local and --auth-host, the next time you run initdb.
db_1   | ****************************************************
db_1   | WARNING: No password has been set for the database.
db_1   |          This will allow anyone with access to the
db_1   |          Postgres port to access your database. In
db_1   |          Docker's default configuration, this is
db_1   |          effectively any other container on the same
db_1   |          system.
db_1   | 
db_1   |          Use "-e POSTGRES_PASSWORD=password" to set
db_1   |          it in "docker run".
db_1   | ****************************************************
db_1   | waiting for server to start....2020-01-28 19:13:22.412 UTC [31] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
db_1   | 2020-01-28 19:13:22.425 UTC [32] LOG:  database system was shut down at 2020-01-28 19:13:22 UTC
db_1   | 2020-01-28 19:13:22.428 UTC [31] LOG:  database system is ready to accept connections
db_1   |  done
db_1   | server started
db_1   | CREATE DATABASE
db_1   | 
db_1   | 
db_1   | /usr/local/bin/docker-entrypoint.sh: ignoring /docker-entrypoint-initdb.d/*
db_1   | 
db_1   | 2020-01-28 19:13:22.636 UTC [31] LOG:  received fast shutdown request
db_1   | 2020-01-28 19:13:22.636 UTC [31] LOG:  aborting any active transactions
db_1   | waiting for server to shut down....2020-01-28 19:13:22.638 UTC [31] LOG:  background worker "logical replication launcher" (PID 38) exited with exit code 1
db_1   | 2020-01-28 19:13:22.639 UTC [33] LOG:  shutting down
db_1   | 2020-01-28 19:13:22.647 UTC [31] LOG:  database system is shut down
db_1   |  done
db_1   | server stopped
db_1   | 
db_1   | PostgreSQL init process complete; ready for start up.
db_1   | 
db_1   | 2020-01-28 19:13:22.746 UTC [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
db_1   | 2020-01-28 19:13:22.746 UTC [1] LOG:  listening on IPv6 address "::", port 5432
db_1   | 2020-01-28 19:13:22.750 UTC [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
db_1   | 2020-01-28 19:13:22.761 UTC [42] LOG:  database system was shut down at 2020-01-28 19:13:22 UTC
db_1   | 2020-01-28 19:13:22.764 UTC [1] LOG:  database system is ready to accept connections
app_1  | [7:13:29 PM] Found 0 errors. Watching for file changes.
app_1  | 
db_1   | 2020-01-28 19:14:11.406 UTC [49] ERROR:  database "graphile_worker_test" already exists
db_1   | 2020-01-28 19:14:11.406 UTC [49] STATEMENT:  create database "graphile_worker_test";
db_1   | 2020-01-28 19:14:11.417 UTC [49] LOG:  could not receive data from client: Connection reset by peer

Am I doing something wrong?

Why this library depends on uuid-ossp?

I only found use of the gen_random_uuid() function, but it comes from pgcrypto, no one from uuid-ossp, maybe uuid-ossp is a rudiment? Also, uuid-ossp does not provide the uuid type because it is already built-in.

How to solve issue when extensions aren't in public

To fix the issue where your pgcrypto extension is install in a different schema to public, you can create an alias function; replacing YOUR_SCHEMA with the schema you've installed the extension into.

create function public.gen_random_uuid() returns uuid as $$
  select YOUR_SCHEMA.gen_random_uuid();
$$ language sql;

Feature: Add an API for giving workers more helpers

Usecase

Give users of graphile-worker the ability to pass in custom helpers. This could include custom database connection objects, metadata, application-specific helpers, etc...

For example:

My specific usecase is to pass a Slonik database connection object.

Implementation

// TODO: add an API for giving workers more helpers

It looks like if this TODO was done, this usecase could be satisified.

I'm not sure what the ultimate implementation would look like. Would there be a specific file like helpers.ts that would be loaded and named exports would automatically be included?

Pitfalls/Things to watch out for

  • Naming conflicts with existing helpers -- Is it okay to override?

[Discussion] Let TS users overload `addJob`

I see two different options for doing this, and there are tradeoffs for each. Wanted to lay those out here before implementing (none entail much work, and I don't think any would be breaking changes):

NOTE: For any of these to work (assuming they are in interfaces.ts, the interface being augmented should to be exported by name from the top level.

Option 1: Change AddJobFunction to an interface with a call signature

Changes:

export interface AddJobFunction {
  (identifier: string, payload?: any, options?: TaskOptions): Promise<Job>;
}

Adding an overload:

import { Job, TaskOptions } from 'graphile-worker'

declare module 'graphile-worker' {
 interface AddJobFunction {
    (identifier: 'taskName', payload: Payload, options?: TaskOptions): Promise<Job>
  }
}

Special TS rules bump this overload to the top of the list due the non-union literal.

Pros:

  • Users can't screw it up at call sites

Cons:

  • Verbose
  • Have to import extra types

Option 2: Generic overload with a type map

Changes:

export interface TaskPayload {}
export interface AddJobFunction {
  <T extends keyof TaskPayload>(identifier: T, payload: TaskPayload[T], options?: TaskOptions): Promise<Job>
  (identifier: string, payload?: any, options?: TaskOptions): Promise<Job>;
}

Adding an overload:

declare module 'graphile-worker' {
 interface TaskPayload {
    taskName: Payload
  }
}

Pros:

  • Easier to add an "overload"

Cons:

  • Users can pass the type param explicitly or pass something other than a string literal as the task name; this could cause bogus inputs to type check

I had another idea that actually won't work because the payload is required in (at least most) overload signatures and not in the default signature.

I started writing this favoring option 2, but at this point I feel like correctness trumps brevity. They're both easy to implement, so whatever is decided here can be turned around quickly.

Using different clients inside a task handler

Currently, the different helper methods available in the task handlers (query, addJob, withPgClient) all use the same client object. AFAIK, this is intended by design. This is a limitation when trying to use commands that block, for example pg-cursor.

In our use-case, we have a table of elements. Every time a new element is added, we need to do some somewhat expensive calculations in all pair combinations of this element and the already existing rows. To perform this operation, we have two jobs:

  • create_batches: reads the elements table and creates batches of 100 and delegates them for processing
  • process_batch: takes a batch and performs the calculations, which involve calling to external APIs which impose batch size limits (and might fail)

The initial implementation of the create_batches job looked like this: (simplified for this issue)

const { promisify } = require('util')
const Cursor = require('pg-cursor')
Cursor.prototype.readAsync = promisify(Cursor.prototype.read)

module.exports = async (payload, { query, addJob }) => {
  const sql = '...'
  const values = [...]

  const cursor = await query(new Cursor(sql, values))

  let rows = await cursor.readAsync(100)

    while (rows.length) {

      const batch = {...}

      await addJob('process_batch', batch)

      rows = await cursor.readAsync(100)
    }
}

This doesn't work because the open cursor will block the addJob command in the client. We ended up solving our case by using quickAddJob, but this is not optimal as we have to create and destroy many pgPools, which is expensive and inefficient.

Other options involve using WorkerUtils, but because the documentation recommends to use WorkerUtils as a singleton, is not trivial to require it from the task handler

I can think of several ways of improving Graphile Worker to support multiple clients:

  • providing some API to allow addJob to fetch a new client from the pool
  • Adding WorkerUtils or a runner reference to the helpers to access/make other addJob methods

Add multiple leaky bucket rate limiting

I have a send-message job that sends a text message. We have a global 3,000 messages / minute rate limit, a limit of 6 messages / second per sending phone number, and have many different clients that send messages. Each client has hundreds of phone numbers.

Our goals are:
a) To stay compliant with our global rate limit
b) To stay compliant with our per-phone number rate limit
c) To prevent any client from clogging the queue for all other clients, such that one client sending 6,000 messages in a minute means that all other clients messages are delayed by 2. Something like 1000 / minute would probably be sensible here, given that not all clients are going to send their maximum volume at once.

One way to do this is a Postgres friendly simplification of the leaky bucket algorithm, where you would have buckets:

buckets (
    bucket_type text,
    interval text, // 'second' or 'minute'
    capacity integer
)

And bucket_intervals:

bucket_intervals (
    bucket_type text,
    bucket_name text,
    bucket_time timestamp, // truncated to the second or minute based on the bucket interval
    tokens_used integer, // number of jobs in this bucket
    primary key (bucket_time, bucket_type, bucket_name)
)

Whenever run_at is computed, if the job has specified buckets (via a buckets text[] column / parameter), run_at would be set to the max of the user specified run_at (or now) and the next available slot which doesn't overflow any bucket interval.

For our use case, our buckets table would be:

insert into buckets (bucket_type, interval, capacity)
values 
    ('send-message-global', 'minute', 3000),
    ('send-message-client', 'minute', 1000),
    ('send-message-phone', 'second', 6);

And each send-message job would be queued with three buckets:

  1. send-message-global (which would map to bucket_type = 'send-message-global', bucket_name = null)
  2. send-message-client|<client-id> (bucket_type = 'send-message-client', bucket_name = <client-id>)
  3. send-message-phone|<phone-number> (bucket_type = 'send-message'phone, bucket_name = <phone-number`)

I think this could be accomplished via:

  1. adding an optional buckets parameter to add_job
  2. triggers on the jobs table that only run when the job has buckets, and as a result have no negative performance impact for users who don't need this feature

To keep it performant, we would need to delete past buckets. This could either be done on job completion / failure, or we could just write a SQL function to do it and leave periodically calling that function up to the user.

Although I'd like to be able to use an approach similar to the one described here, in this case we have multiple different queues whose rate limits interact.

Although it's also possible to implement this in user land with a custom add_job function and by overwriting fail_job, the project it is part of is closed source, and other users may benefit from having a simple way to rate limit jobs that interact with rate limited external APIs.

Do you think this is a good solution / would you like a PR for it, or do you think this level of complexity is best kept outside of this project?

Allow to configure which database function is called by runner.addJob

Motivation

For some use cases, users might want to create their own internal add_job function that wraps graphile_worker.add_job to add extra logic. For example, to add entries to a historical table with all jobs for monitoring.

These approach is fine when jobs are created through the database elements like triggers and functions, but doesn't work when jobs are created through the javascript api runner.addJob. Using this api will call graphile_worker.add_job

Proposed solution

Graphile worker could add a configuration parameter addJobFunctionName when creating the runner, that will allow to change the underlying function call from runner.addJob.

const runner = await run({
      pgPool,
      concurrency: 5,
      pollInterval: 1000,
      taskDirectory: './tasks',
      addJobFunctionName: 'my_schema.add_job' // <-- user function
    })

addJobFunctionName should default to graphile_worker.add_job, making the parameter safe to add without breaking anyones code.

RLS on worker tables

The current schema migration creates tables with RLS enabled and no policies. This means that only the table owner is able to enqueue jobs.

This means, that I have to start the worker with a specific owner that will enqueue the jobs later, who can create a schema and tables.

But - In an application environment where different database roles a crucial (Postgraphile, Postgrest) I would not be able to enqueue jobs with any other user than the table owner (except with a Security Definer).

Since there are now policies anyway - I would suggest to drop them from the tables.

Or... I am missing something here...?

Make `process.kill` optional

We use graphile-worker as a lib, as part of a more complex module. Shutting it down gracefully requires shutting down several components in order

With graphile-worker (0.4.0) we are no longer able to shut down our module gracefully, due to registerSignalHandlers killing the current process

Database grinding to a halt waiting on transaction ID

Hey there!
We are just experimenting with migrating to Graphile Worker as our main worker driver, but it seems to have brought us down in production today, and I'm trying to find out why.

  • Version: 0.6.0
  • Node: 12.x
  • I'm passing my existing pool from node-postgres when constructing workerUtils.
  • I'm creating 5 jobs per second in one queue. Each of the 5 jobs has a consistent jobKey.

The idea is that these 5 jobs will run every second, unless they take longer than a second to complete, then the job will be re-queued up to one second after the previous attempt completes.

These jobs trigger a select from a table of data, and an insert into a table in another database. This shouldn't matter to the problem at hand, but both the select and the insert use their own transactions.

Sometimes this source table that the jobs are select from can build up to contain millions of rows because the database they are going to be inserted into becomes unavailable and the jobs that process them timeout. We have indexes on the table that should make it so that selecting from the table is still fast. But we've observed that when this table starts to build up, after a few hours the add_job graphile function starts to go insane waiting on transactionid:

image

image

Snapshots from RDS's dashboard^

What are the possibilities here? Am I doing something wrong with my transactions in the insertion jobs? is this an indication that I'm leaking transaction IDs? The database got so saturated that the application ground to a halt. A reboot of the Web servers didn't help. Only completely canceling the publishing of new jobs to graphile helped. When I commented out the code that published new jobs, and restarted the servers, everything came instantly back up.

That does mean that the servers stopped processing the jobs, however, so maybe our jobs are taking way too long to fail when their target DB becomes unavailable, and they're using up all available transactions / connections?

Support for standard PG environment variables

Opened as continuation of #45 to propose an actual implementation.

Rationale

Postgres tooling, for example psql, uses these environment variables defined in libpq to connect to the database. AFAIK, this is the standard way to configure applications on different environments. In my particular case, I use these variables in my Postgraphile-powered backend to behave differently in local, dev or production environments.

The underlying library, pg-node, also supports these variables for connection. One convenience is that these library automatically detects when using UNIX Sockets based on the host string, and builds the connection string accordingly. In my particular case, I make use of these feature using postgraphile: when running locally I use HTTP sockets, when running on the cloud (GCP App Engine) I use UNIX sockets.

To configure graphile worker, I've had to add extra logic to build a full database connection url. Simplified version:

// this pgConfig is used to configure postgraphile also
const pgConfig = { 
  user: process.env.PGUSER || 'postgres',
  host: process.env.PGHOST || 'localhost',
  database: process.env.PGDATABASE || 'myuser',
  password: process.env.PGPASSWORD || 'mypassword',
  port: process.env.PGPORT || 5432
}
// this connection string is used to configure graphile worker
let connectionString
    if (process.env.DB_USE_UNIX_SOCKETS) { // or logic based on host
      connectionString = `postgres+pg8000://${pgConfig.user}:${pgConfig.password}@/${pgConfig.database}?unix_sock=${pgConfig.host}/.s.PGSQL.${pgConfig.port}`
    } else {
      connectionString = `postgres://${pgConfig.user}:${pgConfig.password}@${pgConfig.host}:${pgConfig.port}/${pgConfig.database}`
    }

I feel this is redudant with pg_node features and should be handled by that library.

Current Behaviour

Current implementation of graphile woker fails to use this variables (seems to only support a DATABASEURL variable)

From CLI

Error: Please use `--connection` flag or set `DATABASE_URL` envvar to indicate the PostgreSQL connection string.
    at main (/Users/garcianavalon/Development/api-monolith/node_modules/graphile-worker/dist/cli.js:67:15)
    at Object.<anonymous> (/Users/garcianavalon/Development/api-monolith/node_modules/graphile-worker/dist/cli.js:98:1)
    at Module._compile (internal/modules/cjs/loader.js:799:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:810:10)
    at Module.load (internal/modules/cjs/loader.js:666:32)
    at tryModuleLoad (internal/modules/cjs/loader.js:606:12)
    at Function.Module._load (internal/modules/cjs/loader.js:598:3)
    at Function.Module.runMain (internal/modules/cjs/loader.js:862:12)
    at findNodeScript.then.existing (/usr/local/lib/node_modules/npm/node_modules/libnpx/index.js:268:14)

As library

Error: You must either specify `pgPool` or `connectionString`, or you must make the `DATABASE_URL` environmental variable available.
    at assertPool (/Users/garcianavalon/Development/api-monolith/node_modules/graphile-worker/dist/runner.js:41:15)
    at withReleasers (/Users/garcianavalon/Development/api-monolith/node_modules/graphile-worker/dist/runner.js:80:30)
(node:84111) UnhandledPromiseRejectionWarning: Error: Fatal error starting runner %s
    at module.exports.startRunner (/Users/garcianavalon/Development/api-monolith/src/runner.js:26:11)
(node:84111) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)

Proposed solution

My first proposed solution is to maintain the same API as postgraphile and accept the pgConfig object. This is not a viable solution (check #45)

The proposed solution would be to not fail if DATABASE_URL is not found and delegate to pg_node to parse the PG* env variables to build the correct connection string. This would be a DRY approach that avoids adding more logic (both in graphile and in our applications) and following standard pg configuration.

option to connect to existing db schema instead of creating new one

Feature Request

Allow the startup command to take an option to connect to an existing database schema.

For example, in https://github.com/graphile/examples/blob/master/db/100_jobs.sql it's part of the overall database initialization, and with name app_jobs.

So it will be nice to be able to do something like this:

npx graphile-worker -c "postgres://localhost/mydb" -s "app_jobs"

which skip the schema creation steps and just start polling.

Problem to Solve

Currently there is a depenency problem for PostGraphile user to make use of graphile-worker:

To add job within the main database code, such as perform graphile_worker.add_job("send_mail", ...), the schema graphile_worker has to already exist at the moment of the main database schema creation, otherwise the sql will fail.

Therefore we always have to install graphile-worker before initializing the main db, even if we just want to add the job and leave it there.

And in my case, the database schema (pure sql) is handled by one team, and the job processing (pure javascript) is handled by another team. Such dependency is certainly going to generate a lot of unnecessary mess.

Proposal

  1. Skip the migration here if -s flag found
    // Migrate
  2. Pass the -s value to all functions where sql query string were constructed, use the variable instead of hardcoded graphile_worker
    https://github.com/graphile/worker/search?l=TypeScript&q=graphile_worker

Make worker schema configurable

It would be great if the default graphile_worker schema could be made configurable (perhaps in the existing src/config.ts). It's a small and unimportant request, but in large projects with many other schemas in the database, it's nice to give it a name that follows the project naming convention.

New 0.1.0-alpha release?

There are some new features since the last alpha release that I am using. I can use master but would you mind cutting a new alpha? Thanks!

Question: What happens to the task result?

I noticed that while the task is running, it is available in postgres, but I'm wondering what if the function returns a result, like a json or string object? Can that result be stored as a JOB_RESULT or something? Maybe I need to investigate under the hood a bit.

Question: Does this need to run in a separate process?

This is probably a dumb question but I'm wondering if I can use this inside an existing web app (express).

Lets say I'm adding tasks that'll take upwards of 1-2 minutes to complete, will the tasks run in a separate thread/process? Or will there be a negative impact on my server?

Handle SIGINT gracefully.

I have added graphile-worker to my project and have it running. When I kill the node app, I get the following message:

/var/local/fileserver/vcs/git/laminar-pgphile/laminar-pgphile/node_modules/graphile-worker/dist/main.js:37
            Promise.all(allWorkerPools.map(pool => pool.gracefulShutdown(`Forced worker shutdown due to ${signal}`))).finally(() => {
                                                                                                                             ^

TypeError: Promise.all(...).finally is not a function
    at process.handler (/var/local/fileserver/vcs/git/laminar-pgphile/laminar-pgphile/node_modules/graphile-worker/dist/main.js:37:126)
    at emitOne (events.js:116:13)
    at process.emit (events.js:211:7)
    at Signal.wrap.onsignal (internal/process.js:197:44)

Does a finally block need to be added to the Promise.all running the workers, or do I need to pass something in to avoid this message?

Questions

Hi,
I've just read the README and would like to ask for some clarifications/details:

  • What do run and makeWorkerUtils do that requires them to return a promise (instead of the plain runner/workerutils object)? How can they fail (i.e. how should I handle errors from them)?
  • "it's often sensible to split large jobs into smaller jobs, this also allows them to run in parallel resulting in faster execution. This is particularly important for tasks that are not idempotent (i.e. running them a second time will have extra side effects) - for example sending emails." - How exactly would that work? I imagine that when splitting most tasks, the jobs should run sequentially. Should they be put on the same job queue then? Or should one job trigger the next one upon completion?
  • When a job fails (as detailed in "What if something goes wrong?"), it means it is blocking the queue that it is part of?
  • What exactly is the unique identifier (primary key) for a job? Is it just the job_key, or is it the task identifier plus the job_key?
  • How do I access the id or the queue name of a job I created, if I want to reference them elsewhere (and not roll my own sequence to pass in explicitly, but use the implicitly generated "random" values)? I see that addJob/quickAddJob return (a promise for) a Job, but that type is undocumented.
  • In a task executor, why should I use withPgClient instead of directly accessing my global singleton PgPool (that I also passed to run/makeWorkerUtils)? Is it really just for dependency injection?
  • When using PostGraphile, and scheduling a job from within one of my custom makeExtendSchemaPlugin mutations, should I use the javascript addJob method of a Runner/WorkerUtils instance? That would presumably open a separate db connection, am I right? To schedule the job within the Postgraphile mutation transaction, I would need to call the SQL add_job function through the context.pgClient?
  • Similarly, inside a task executor, do withPgClient and addJob somehow share their connection?

Allow TS users to overload `addJob` (take two)

It would be nice for TS users who enqueue jobs from within tasks (or via the addJob function returned by run) to have payload types checked at the call site.

NOTE: This has nothing to do with validating payloads. This feature would be only useful insofar as the payload types are correct, and would only provide type checking at addJob call sites.

In my current setup, I use runtypes to derive both payload validators and the static types that I would use in these overloads. In this way there is a single source of truth, and when the runtime type is updated, TS will trickle that down to all call sites. This feels pretty robust.

Adding this feature would give additional type checks and an improved editor experience.

The requirement for this to work is that you pass a string literal (or a value typed as a non-union string literal) as the first parameter. It's assumed that if you add an entry for a task that the payload will be a required argument. You can however make a payload type nullable if you wish.

Passing any value that isn't inferred as a string key of the TaskPayload should result in falling back to the original function type. The reason for the non-union key requirement is that in that case (with current implementation) you could satisfy the type checker with an incorrect payload type.

This is (almost) fully implemented in the playground link below:

Playground Link

Postgres does not exist error on init

The current try/catch logic ensures that Postgres will have the following error output at least once causing confusion for those using this library:

ERROR: relation "graphile_worker.migrations" does not exist at character 16
STATEMENT: select id from "graphile_worker".migrations order by id desc limit 1;

A simple change to the way schema existence is performed should solve this.

`process_after` sql function hook

I'm proposing adding:

create function :GRAPHILE_WORKER_SCHEMA.add_job(
  identifier text,
  payload json = null,
  queue_name text = null,
  process_after = null,
  run_at timestamptz = null,
  max_attempts int = null,
  job_key text = null,
  priority int = null
) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$

process_after (name should change to something clearer), which is an optional string reference to a SQL function that is called after the Javascript execution and allowed to perform some work in the database (maybe queueing another job, maybe writing the result to some tables, etc.

Any such function would take one parameter - a JSON blob which is what the Javascript job returned – and either throw an error (failing the job) or have its result discarded.

This would make job re-use easier. For example, suppose in one part of my application I want hit a geocoding API and record a customer's coordinates, and in another part I want to do the same for a store. WIth process_after, I could do:

select graphile_worker.add_job('geocode', json_build_object('address', user.address), null, 'my_app.record_user_coordinates');

and

select graphile_worker.add_job('geocode', json_build_object('address', store.address), null, 'my_app.record_store_coordinates');

while only having one graphile-worker / Javascript job. You could imagine this making very small jobs possible, like http-get, http-post, etc.

Smaller jobs that just did the part that Postgres shouldn't do would also make testing easier, since you could call your process_after functions with mocks.

Add a `query` helper

Just a shortcut for withPgClient(pgClient => pgClient.query(...)) to reduce boilerplate.

Question about concurrency and queues

To simplify lets's assume I have 2 queues "Qslow" and "Qfast" and concurrency set to 2. Tasks in Qfast are really quick and tasks in Qslow take a long time to finish (2hours).

My setup periodically runs a cloud function which checks if there are any jobs to be run (would be cool to have lib api for that), if there are some jobs the cloud function spawns one (never more) worker instance. This instance uses runOnce method and dies after it resolves (no more jobs to run).

My goal is to run one slow task after another (serially), however don't block the fast tasks while the slow are running (fast tasks don't have to wait until the slow tasks are finished).

I assumed that I can accomplish this by setting the concurrency to 2 but it does not seem to work. Jobs in Qfast are waiting until jobs in Qslow are done.

Is this a bug or I did not understand the docs?

Version: "graphile-worker": "0.5.0"

Thank you for this amazing library 👏

EDIT: Maybe it is important to point out that the fast jobs may be scheduled while the slow ones are already running.

Make idle delay (polling interval) configurable

The time between polling for new jobs is pinned to 2000 ms at the moment.

The doc for the IDLE_DELAY constant reads:

 * IDLE_DELAY: This is how long to wait between polling for jobs.

 * Note: this does NOT need to be short, because we use LISTEN/NOTIFY to be
 * notified when new jobs are added - this is just used in the case where
 * LISTEN/NOTIFY fails for whatever reason.
  • Question: Is polling also necessary for other use cases? Picking up failed jobs, Running delayed jobs, etc? Or is this really just a safety net.

  • Feature Request: Make this constant configurable. I actually would not need such a short polling for my use case and it spams the log files :). Maybe renaming it to Polling Interval (POLLING_INTERVAL) would it also make it more clear for what it is doing.

I would be happy to provide a PR, but I am hesitant to just start hacking away on this - since I'd guess there would be some refactoring and moving code around involved.

Add hooks/events

I am evaluation worker in conjunction with a table that holds a job "state" (created|retry|completed|failed|etc). For this it would be great to have global or queue events (or hooks?). At the moment I am not quite sure how to document a failed job without something like this. Possible events could be onError, onRetry, onFailed, onCompleted.

Maybe there are other use cases.

Something like:

async function main() {
  await run({
    connectionString: process.env.DATABASE_URL,
    concurrency: 1,
    pollInterval: 60000,
    taskList: tasks,
    events: {
      global: {
        onCompleted: async (job, result, helpers) => {} // Global events
      },
      queues : {
        myQueue: {
          onCompleted: async (job, result, helpers) => {} // Only for queue "myQueue"
        }
      }
    }
  });
}

Thoughts?

Counter for how many times a task has been debounced

Proposal: a counter on each job that is incremented every time it has been debounced through the job key.

My use-case is I'd like to debounce notifying the user about messages, so of course I'm using the job
key for that, but I'd like to have the notification be "You received X new message(s)" - the ideal solution to this would just be to have a field on the job that is incremented on every debounce, and then I just pass that to my email template.

Debouncing jobs

We're currently using https://github.com/timgit/pg-boss in production, and it has been solid, but the API is a bit complex. I love how simple the API for "worker" is, and I'm inclined to give it a try.

One feature we use regularly in Pg-boss that doesn't seem to be present yet in Worker is debouncing a job by name.

For example, I have a cron in my app that schedules a background job every 30 seconds, but if that job is already running, or if there's already one queued, I don't want to add another.

Would adding something like this seem inline with your goals for this project @benjie?

function graphile_worker.add_job(unknown, unknown) is not unique

Our app uses graphile worker as a library and sets it up at startup time. When running e2e tests, we start multiple instances of our app and sometimes we get the error

function graphile_worker.add_job(unknown, unknown) is not unique

I think this is related to unlucky timing of migrations run by graphile. This might be fixed by running migrations in a transaction and running a SELECT pg_advisory_xact_lock(SHARED_LOCK_KEY_HERE);. It's important that the value stays the same across different builds/instances/etc: hard coding it works just fine

RFC: Allow deletion and update of scheduled jobs

Objective

  • allow jobs scheduled for the future via run_at to be cancelled or updated prior to being run
  • facilitate “singleton” jobs, where repeated scheduling of the same task results in only one eventual run
  • Maintain full backwards compatibility

Motivation

This change solves the issues raised in #58 and #59 - use cases:

Update a scheduled job (#58)

  • A “send push notification” task is scheduled for the start of an end user’s preferred window, e.g. 9am-5pm
  • Before the notification is sent, the user changes their preferred window settings
  • The previously scheduled notification should be updated to reflect the new preference

Cancel a scheduled job (#58)

  • A “send push notification” task is scheduled for a user as above
  • Before the notification is sent, it is invalidated by a separate system
  • The notification task should be deleted

Debounce a cron-based job (#59)

  • A cron process schedules a background job every 30 seconds
  • If the job is already running, or if there's already one queued, an additional job should not be queued

Design proposal

Functions

graphile_worker.add_job

An optional job_key argument will be added to this function, and it will be changed to use plpgsql to allow conditional logic. New function signature:

 create function graphile_worker.add_job(
   identifier text, 
   payload json = '{}', 
   queue_name text = public.gen_random_uuid()::text, 
   run_at timestamptz = now(), 
   max_attempts int = 25
+  ,job_key text = null
 ) returns graphile_worker.jobs;
If job_key is not provided
  • The add_job behaviour will remain unchanged as a simple insert into graphile_worker.jobs
If job_key is provided
  • The new job will overwrite all details of an existing job with the same key, using insert ... on conflict, only if the job is not locked for processing
  • If the previous job is locked for processing, or completed, a new job will be scheduled
  • Note there is an edge case here, where a job has already been retrieved for processing, but not completed. In this case there is no way to schedule a new job, as would happen if the previous one was complete. The on conflict ... where clause will prevent the job details from changing, and the function will return an empty set - the calling code can use this return value to handle the case as needed. This case will be documented. See #61 (comment)
  • jobs.attempts will be reset to zero and jobs.last_error to null, under the assumption that past failures should not impact the lifecycle of the newly updated job.
  • In a future update, a boolean argument could be added to determine whether existing jobs should be overwritten or not, i.e. on conflict do update vs do nothing

graphile_worker.remove_job

This new function will allow deletion of an existing job by job_key. Function signature:

create function remove_job(
  job_key text
) returns graphile_worker.jobs;
  • Only jobs not already locked may be deleted. This means
    • jobs which a worker has already retrieved for processing cannot be deleted
    • there is no need to "unlock" the job_queues table the way complete_job does
  • If the job_key doesn't exist, or the job is already in progress, the function will do nothing and return an empty set

Schema changes

  1. Add a new nullable unique key column to the graphile_worker.jobs table:

    alter table graphile_worker.jobs add column key text unique;

    Full vs partial indexing for this column will be decided after profiling each option.

    Other column names considered were

    • idempotency_id (misleading as job details may change if other arguments differ)
    • job_id (already used as a function argument, clashes with jobs.id)
    • task_id (clashes with jobs.task_identifier)
  2. Add locked_by and locked_at columns to the jobs table, as per #61 (comment)

Tests

New tests will prove that

  • passing the same job_key to add_job multiple times results in one scheduled job
  • future-scheduled jobs can be removed via remove_job and do not run
  • jobs scheduled for now are not deleted or updated

Performance implications

A write performance impact is expected from the additional column and unique index. However, this should only affect the opt-in new behaviour - if job_key is not specified, performance should stay the same as before.

Before/after performance characteristics will be determined and documented via the bundled perfTest command.

Alternatives considered

  • Change the jobs.id to text, and allow the user to specify it, instead of a separate identifier (job_key). Rejected due to unknown impact on read performance, and backwards incompatibility
  • Store jobs.id value as returned by add_job in a separate table, external to the library, and use that to update/remove jobs. Rejected due to additional complexity and statefulness required in the implementing code, and because the problem is general enough to be a valuable library addition

Questions / discussion

  • Is there any solution to the edge case noted when updating a job which is already processing?
  • Which additional tests, if any, should be written?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.