Giter VIP home page Giter VIP logo

node-resque's Introduction

node-resque: The best background jobs in node.

Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 and up required). It includes priority queues, plugins, locking, delayed jobs, and more! This project is a very opinionated but API-compatible with Resque and Sidekiq (caveats). We also implement some of the popular Resque plugins, including resque-scheduler and resque-retry

The full API documentation for this package is automatically generated from the main via typedoc branch and published to https://node-resque.actionherojs.com/

Nodei stats

Test

The Resque Factory (How It Works)

Overview

Resque is a queue based task processing system that can be thought of as a "Kanban" style factory. Workers in this factory can each only work one Job at a time. They pull Jobs from Queues and work them to completion (or failure). Each Job has two parts: instructions on how to complete the job (the perform function), and any inputs necessary to complete the Job.

Queues

In our factory example, Queues are analogous to conveyor belts. Jobs are placed on the belts (Queues) and are held in order waiting for a Worker to pick them up. There are three types of Queues: regular work Queues, Delayed Job Queues, and the Failed Job Queue. The Delayed Job Queues contains Job definitions that are intended to be worked at or in a specified time. The Failed Job Queue is where Workers place any Jobs that have failed during execution.

Workers

Our Workers are the heart of the factory. Each Worker is assigned one or more Queues to check for work. After taking a Job from a Queue the Worker attempts to complete the Job. If successful, they go back to check out more work from the Queues. However, if there is a failure, the Worker records the job and its inputs in the Failed Jobs Queue before going back for more work.

Scheduler

The Scheduler can be thought of as a specialized type of Worker. Unlike other Workers, the Scheduler does not execute any Jobs, instead it manages the Delayed Job Queue. As Job definitions are added to the Delayed Job Queue they must specify when they can become available for execution. The Scheduler constantly checks to see if any Delayed Jobs are ready to execute. When a Delayed Job becomes ready for execution the Scheduler places a new instance of that Job in its defined Queue.

API Docs

You can read the API docs for Node Resque @ node-resque.actionherojs.com. These are generated automatically from the master branch via TypeDoc

Version Notes

  • The version of redis required is >= 2.6.0 as we use lua scripting to create custom atomic operations
  • ‼️ Version 6+ of Node Resque uses TypeScript. We will still include JavaScript transpiled code in NPM releases, but they will be generated from the TypeScript source. Functionality between node-resque v5 and v6 should be the same.
  • ‼️ Version 5+ of Node Resque uses async/await. There is no upgrade path from previous versions. Node v8.0.0+ is required.

Usage

I learn best by examples:

import { Worker, Plugins, Scheduler, Queue } from "node-resque";

async function boot() {
  // ////////////////////////
  // SET UP THE CONNECTION //
  // ////////////////////////

  const connectionDetails = {
    pkg: "ioredis",
    host: "127.0.0.1",
    password: null,
    port: 6379,
    database: 0,
    // namespace: 'resque',
    // looping: true,
    // options: {password: 'abc'},
  };

  // ///////////////////////////
  // DEFINE YOUR WORKER TASKS //
  // ///////////////////////////

  let jobsToComplete = 0;

  const jobs = {
    add: {
      plugins: [Plugins.JobLock],
      pluginOptions: {
        JobLock: { reEnqueue: true },
      },
      perform: async (a, b) => {
        await new Promise((resolve) => {
          setTimeout(resolve, 1000);
        });
        jobsToComplete--;
        tryShutdown();

        const answer = a + b;
        return answer;
      },
    },
    subtract: {
      perform: (a, b) => {
        jobsToComplete--;
        tryShutdown();

        const answer = a - b;
        return answer;
      },
    },
  };

  // just a helper for this demo
  async function tryShutdown() {
    if (jobsToComplete === 0) {
      await new Promise((resolve) => {
        setTimeout(resolve, 500);
      });
      await scheduler.end();
      await worker.end();
      process.exit();
    }
  }

  // /////////////////
  // START A WORKER //
  // /////////////////

  const worker = new Worker(
    { connection: connectionDetails, queues: ["math", "otherQueue"] },
    jobs,
  );
  await worker.connect();
  worker.start();

  // ////////////////////
  // START A SCHEDULER //
  // ////////////////////

  const scheduler = new Scheduler({ connection: connectionDetails });
  await scheduler.connect();
  scheduler.start();

  // //////////////////////
  // REGISTER FOR EVENTS //
  // //////////////////////

  worker.on("start", () => {
    console.log("worker started");
  });
  worker.on("end", () => {
    console.log("worker ended");
  });
  worker.on("cleaning_worker", (worker, pid) => {
    console.log(`cleaning old worker ${worker}`);
  });
  worker.on("poll", (queue) => {
    console.log(`worker polling ${queue}`);
  });
  worker.on("ping", (time) => {
    console.log(`worker check in @ ${time}`);
  });
  worker.on("job", (queue, job) => {
    console.log(`working job ${queue} ${JSON.stringify(job)}`);
  });
  worker.on("reEnqueue", (queue, job, plugin) => {
    console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`);
  });
  worker.on("success", (queue, job, result, duration) => {
    console.log(
      `job success ${queue} ${JSON.stringify(
        job,
      )} >> ${result} (${duration}ms)`,
    );
  });
  worker.on("failure", (queue, job, failure, duration) => {
    console.log(
      `job failure ${queue} ${JSON.stringify(
        job,
      )} >> ${failure} (${duration}ms)`,
    );
  });
  worker.on("error", (error, queue, job) => {
    console.log(`error ${queue} ${JSON.stringify(job)}  >> ${error}`);
  });
  worker.on("pause", () => {
    console.log("worker paused");
  });

  scheduler.on("start", () => {
    console.log("scheduler started");
  });
  scheduler.on("end", () => {
    console.log("scheduler ended");
  });
  scheduler.on("poll", () => {
    console.log("scheduler polling");
  });
  scheduler.on("leader", () => {
    console.log("scheduler became leader");
  });
  scheduler.on("error", (error) => {
    console.log(`scheduler error >> ${error}`);
  });
  scheduler.on("cleanStuckWorker", (workerName, errorPayload, delta) => {
    console.log(
      `failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`,
    );
  });
  scheduler.on("workingTimestamp", (timestamp) => {
    console.log(`scheduler working timestamp ${timestamp}`);
  });
  scheduler.on("transferredJob", (timestamp, job) => {
    console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`);
  });

  // //////////////////////
  // CONNECT TO A QUEUE //
  // //////////////////////

  const queue = new Queue({ connection: connectionDetails }, jobs);
  queue.on("error", function (error) {
    console.log(error);
  });
  await queue.connect();
  await queue.enqueue("math", "add", [1, 2]);
  await queue.enqueue("math", "add", [1, 2]);
  await queue.enqueue("math", "add", [2, 3]);
  await queue.enqueueIn(3000, "math", "subtract", [2, 1]);
  jobsToComplete = 4;
}

boot();

// and when you are done
// await queue.end()
// await scheduler.end()
// await worker.end()

Node Resque Interfaces: Queue, Worker, and Scheduler

There are 3 main classes in node-resque: Queue, Worker, and Scheduler

  • Queue: This is the interface your program uses to interact with resque's queues - to insert jobs, check on the performance of things, and generally administer your background jobs.
  • Worker: This interface is how jobs get processed. Workers are started and then they check for jobs enqueued into various queues and complete them. If there's an error, they write to the error queue.
    • There's a special class called multiWorker in Node Resque which will run many workers at once for you (see below).
  • Scheduler: The scheduler can be thought of as the coordinator for Node Resque. It is primarily in charge of checking when jobs told to run later (with queue.enqueueIn or queue.enqueueAt) should be processed, but it performs some other jobs like checking for 'stuck' workers and general cluster cleanup.
    • You can (and should) run many instances of the scheduler class at once, but only one will be elected to be the 'leader', and actually do work.
    • The 'delay' defined on a scheduled job does not specify when the job should be run, but rather when the job should be enqueued. This means that node-resque can not guarantee when a job is going to be executed, only when it will become available for execution (added to a Queue).

Configuration Options:

  • new queue requires only the "queue" variable to be set. If you intend to run plugins with beforeEnqueue or afterEnqueue hooks, you should also pass the jobs object to it.
  • new worker has some additional options:
options = {
  looping: true,
  timeout: 5000,
  queues: "*",
  name: os.hostname() + ":" + process.pid,
};

Note that when using "*" queue:

  • there's minor performance impact for checking the queues
  • queues are processed in undefined order

The configuration hash passed to new NodeResque.Worker, new NodeResque.Scheduler or new NodeResque.Queue can also take a connection option.

const connectionDetails = {
  pkg: "ioredis",
  host: "127.0.0.1",
  password: "",
  port: 6379,
  database: 0,
  namespace: "resque", // Also allow array of strings
};

const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math" },
  jobs,
);

worker.on("error", (error) => {
  // handler errors
});

await worker.connect();
worker.start();

// and when you are done
// await worker.end()

You can also pass redis client directly.

// assume you already initialized redis client before
// the "redis" key can be IORedis.Redis or IORedis.Cluster instance

const redisClient = new Redis();
const connectionDetails = { redis: redisClient };

// or

const redisCluster = new Cluster();
const connectionDetails = { redis: redisCluster };

const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math" },
  jobs,
);

worker.on("error", (error) => {
  // handler errors
});

await worker.connect();
worker.start();

// and when you are done
await worker.end();

Notes

  • Be sure to call await worker.end(), await queue.end() and await scheduler.end() before shutting down your application if you want to properly clear your worker status from resque.
  • When ending your application, be sure to allow your workers time to finish what they are working on
  • This project implements the "scheduler" part of rescue-scheduler (the daemon which can promote enqueued delayed jobs into the work queues when it is time), but not the CRON scheduler proxy. To learn more about how to use a CRON-like scheduler, read the Job Schedules section of this document.
  • "Namespace" is a string which is appended to the front of your keys in redis. Normally, it is "resque". This is helpful if you want to store multiple work queues in one redis database. Do not use keyPrefix if you are using the ioredis (default) redis driver in this project (see #245 for more information.)
  • If you are using any plugins which effect beforeEnqueue or afterEnqueue, be sure to pass the jobs argument to the new NodeResque.Queue() constructor
  • If a job fails, it will be added to a special failed queue. You can then inspect these jobs, write a plugin to manage them, move them back to the normal queues, etc. Failure behavior by default is just to enter the failed queue, but there are many options. Check out these examples from the ruby ecosystem for inspiration:
  • If you plan to run more than one worker per nodejs process, be sure to name them something distinct. Names must follow the pattern hostname:pid+unique_id. For example:
  • For the Retry plugin, a success message will be emitted from the worker on each attempt (even if the job fails) except the final retry. The final retry will emit a failure message instead.

If you want to learn more about running Node-Resque with docker, please view the examples here: https://github.com/actionhero/node-resque/tree/master/examples/docker

const name = os.hostname() + ":" + process.pid + "+" + counter;
const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math", name: name },
  jobs,
);

Worker#performInline

DO NOT USE THIS IN PRODUCTION. In tests or special cases, you may want to process/work a job in-line. To do so, you can use worker.performInline(jobName, arguments, callback). If you are planning on running a job via #performInline, this worker should also not be started, nor should be using event emitters to monitor this worker. This method will also not write to redis at all, including logging errors, modify resque's stats, etc.

Queue Management

const queue = new NodeResque.Queue({ connection: connectionDetails, jobs });
await queue.connect();

API documentation for the main methods you will be using to enqueue jobs to be worked can be found @ node-resque.actionherojs.com.

Failed Job Management

From time to time, your jobs/workers may fail. Resque workers will move failed jobs to a special failed queue which will store the original arguments of your job, the failing stack trace, and additional metadata.

error example

You can work with these failed jobs with the following methods:

let failedCount = await queue.failedCount()

  • failedCount is the number of jobs in the failed queue

let failedJobs = await queue.failed(start, stop)

  • failedJobs is an array listing the data of the failed jobs. Each element looks like: {"worker": "host:pid", "queue": "test_queue", "payload": {"class":"slowJob", "queue":"test_queue", "args":[null]}, "exception": "TypeError", "error": "MyImport is not a function", "backtrace": [' at Worker.perform (/path/to/worker:111:24)', ' at <anonymous>'], "failed_at": "Fri Dec 12 2014 14:01:16 GMT-0800 (PST)"}
  • To retrieve all failed jobs, use arguments: await queue.failed(0, -1)

Failing a Job

We use a try/catch pattern to catch errors in your jobs. If any job throws an uncaught exception, it will be caught, and the job's payload moved to the error queue for inspection. Do not use domain, process.on("exit"), or any other method of "catching" a process crash.

The error payload looks like:

{ worker: 'busted-worker-3',
  queue: 'busted-queue',
  payload: { class: 'busted_job', queue: 'busted-queue', args: [ 1, 2, 3 ] },
  exception: 'ERROR_NAME',
  error: 'I broke',
  failed_at: 'Sun Apr 26 2015 14:00:44 GMT+0100 (BST)' }

await queue.removeFailed(failedJob)

  • the input failedJob is an expanded node object representing the failed job, retrieved via queue.failed

await queue.retryAndRemoveFailed(failedJob)

  • the input failedJob is an expanded node object representing the failed job, retrieved via queue.failed
  • this method will instantly re-enqueue a failed job back to its original queue, and delete the failed entry for that job

Failed Worker Management

Automatically

By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every timeout (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.

To modify the 60 minute check, change stuckWorkerTimeout when configuring your scheduler, ie:

const scheduler = new NodeResque.Scheduler({
  stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms
  connection: connectionDetails
})

Set your scheduler's stuckWorkerTimeout = false to disable this behavior.

const scheduler = new NodeResque.Scheduler({
  stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
  connection: connectionDetails
})

Manually

Sometimes a worker crashes is a severe way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, await queue.cleanOldWorkers(age) is available.

Because there are no 'heartbeats' in resque, it is impossible for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use queue.retryAndRemoveFailed) to re-enqueue the job.

If you know the name of a worker that should be removed, you can also call await queue.forceCleanWorker(workerName) directly, and that will also remove the worker and move any job it was working on into the error queue. This method will still proceed for workers which are only partially in redis, indicting a previous connection failure. In this case, the job which the worker was working on is irrecoverably lost.

Job Schedules

You may want to use node-resque to schedule jobs every minute/hour/day, like a distributed CRON system. There are a number of excellent node packages to help you with this, like node-schedule and node-cron. Node-resque makes it possible for you to use the package of your choice to schedule jobs with.

Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduling the jobs. To help you with this, you can inspect which of the scheduler processes is currently acting as leader, and flag only the master scheduler process to run the schedule. A full example can be found at /examples/scheduledJobs.ts, but the relevant section is:

const NodeResque = require("node-resque");
const schedule = require("node-schedule");
const queue = new NodeResque.Queue({ connection: connectionDetails }, jobs);
const scheduler = new NodeResque.Scheduler({ connection: connectionDetails });
await scheduler.connect();
scheduler.start();

schedule.scheduleJob("10,20,30,40,50 * * * * *", async () => {
  // do this job every 10 seconds, CRON style
  // we want to ensure that only one instance of this job is scheduled in our environment at once,
  // no matter how many schedulers we have running
  if (scheduler.leader) {
    console.log(">>> enqueuing a job");
    await queue.enqueue("time", "ticktock", new Date().toString());
  }
});

Plugins

Just like ruby's resque, you can write worker plugins. They look like this. The 4 hooks you have are beforeEnqueue, afterEnqueue, beforePerform, and afterPerform. Plugins are classes which extend NodeResque.Plugin

const { Plugin } = require("node-resque");

class MyPlugin extends Plugin {
  constructor(...args) {
    // @ts-ignore
    super(...args);
    this.name = "MyPlugin";
  }

  beforeEnqueue() {
    // console.log("** beforeEnqueue")
    return true; // should the job be enqueued?
  }

  afterEnqueue() {
    // console.log("** afterEnqueue")
  }

  beforePerform() {
    // console.log("** beforePerform")
    return true; // should the job be run?
  }

  afterPerform() {
    // console.log("** afterPerform")
  }
}

And then your plugin can be invoked within a job like this:

const jobs = {
  add: {
    plugins: [MyPlugin],
    pluginOptions: {
      MyPlugin: { thing: "stuff" },
    },
    perform: (a, b) => {
      let answer = a + b;
      return answer;
    },
  },
};

notes

  • You need to return true or false on the before hooks. true indicates that the action should continue, and false prevents it. This is called toRun.
  • If you are writing a plugin to deal with errors which may occur during your resque job, you can inspect and modify this.worker.error in your plugin. If this.worker.error is null, no error will be logged in the resque error queue.
  • There are a few included plugins, all in the src/plugins/* directory. You can write your own and include it like this:
const jobs = {
  add: {
    plugins: [require("Myplugin").Myplugin],
    pluginOptions: {
      MyPlugin: { thing: "stuff" },
    },
    perform: (a, b) => {
      let answer = a + b;
      return answer;
    },
  },
};

The plugins which are included with this package are:

  • DelayQueueLock
    • If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again
  • JobLock
    • If a job with the same name, queue, and args is already running, put this job back in the queue and try later
  • QueueLock
    • If a job with the same name, queue, and args is already in the queue, do not enqueue it again
  • Retry
    • If a job fails, retry it N times before finally placing it into the failed queue

Multi Worker

node-resque provides a wrapper around the Worker class which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The MultiWorker handles this by spawning more and more node-resque workers and managing the pool.

const NodeResque = require("node-resque");

const connectionDetails = {
  pkg: "ioredis",
  host: "127.0.0.1",
  password: "",
};

const multiWorker = new NodeResque.MultiWorker(
  {
    connection: connectionDetails,
    queues: ["slowQueue"],
    minTaskProcessors: 1,
    maxTaskProcessors: 100,
    checkTimeout: 1000,
    maxEventLoopDelay: 10,
  },
  jobs,
);

// normal worker emitters
multiWorker.on("start", (workerId) => {
  console.log("worker[" + workerId + "] started");
});
multiWorker.on("end", (workerId) => {
  console.log("worker[" + workerId + "] ended");
});
multiWorker.on("cleaning_worker", (workerId, worker, pid) => {
  console.log("cleaning old worker " + worker);
});
multiWorker.on("poll", (workerId, queue) => {
  console.log("worker[" + workerId + "] polling " + queue);
});
multiWorker.on("ping", (workerId, time) => {
  console.log("worker[" + workerId + "] check in @ " + time);
});
multiWorker.on("job", (workerId, queue, job) => {
  console.log(
    "worker[" + workerId + "] working job " + queue + " " + JSON.stringify(job),
  );
});
multiWorker.on("reEnqueue", (workerId, queue, job, plugin) => {
  console.log(
    "worker[" +
      workerId +
      "] reEnqueue job (" +
      plugin +
      ") " +
      queue +
      " " +
      JSON.stringify(job),
  );
});
multiWorker.on("success", (workerId, queue, job, result) => {
  console.log(
    "worker[" +
      workerId +
      "] job success " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      result,
  );
});
multiWorker.on("failure", (workerId, queue, job, failure) => {
  console.log(
    "worker[" +
      workerId +
      "] job failure " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      failure,
  );
});
multiWorker.on("error", (workerId, queue, job, error) => {
  console.log(
    "worker[" +
      workerId +
      "] error " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      error,
  );
});
multiWorker.on("pause", (workerId) => {
  console.log("worker[" + workerId + "] paused");
});
multiWorker.on("multiWorkerAction", (verb, delay) => {
  console.log(
    "*** checked for worker status: " +
      verb +
      " (event loop delay: " +
      delay +
      "ms)",
  );
});

multiWorker.start();

MultiWorker Options

The Options available for the multiWorker are:

  • connection: The redis configuration options (same as worker)
  • queues: Array of ordered queue names (or *) (same as worker)
  • minTaskProcessors: The minimum number of workers to spawn under this multiWorker, even if there is no work to do. You need at least one, or no work will ever be processed or checked
  • maxTaskProcessors: The maximum number of workers to spawn under this multiWorker, even if the queues are long and there is available CPU (the event loop isn't entirely blocked) to this node process.
  • checkTimeout: How often to check if the event loop is blocked (in ms) (for adding or removing multiWorker children),
  • maxEventLoopDelay: How long the event loop has to be delayed before considering it blocked (in ms),

Presentation

This package was featured heavily in this presentation I gave about background jobs + node.js. It contains more examples!

Acknowledgments

node-resque's People

Contributors

arthurschreiber avatar bkohrs avatar bleonard avatar cody-greene avatar cvx avatar dependabot-preview[bot] avatar dependabot[bot] avatar ehynds avatar evantahler avatar faceair avatar fiznool avatar flyerhzm avatar gcoonrod avatar glensc avatar greenkeeperio-bot avatar i3design-dev avatar jamesjnadeau avatar jamonkko avatar jcmais avatar jdwuarin avatar jpulec avatar krishnaglick avatar manuelurbanotn avatar nathanbowser avatar petetnt avatar pine avatar stangah avatar synthmeat avatar whatl3y avatar witem avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-resque's Issues

keep scheduler polling. can not exec mongoose query inside the job perform function

When exec the job, it can not exec the mongoose query.

here is part of the job code, previous step is fine, but when the process reach inside the perform block, it can not execute the mongoose query... just hangs.. even can not trigger callback, and keep scheduler polling..

Here is part of job code.

perform: function(topic_id, callback) {
   var mongoose = require('mongoose');
    mongoose.connect('mongodb://localhost/test');
      var Cat = mongoose.model('Cat', { name: String });
      var kitty = new Cat({ name: 'Zildjian' });
      kitty.save(function (err) {          // hangs here!!
        if (err) console.log('err',err);
        console.log('meow');
        callback(null, 'meow');
      });
    }

Is this resque issue or mongoose issue? I've tested with mongoose separately,, no problem.
I have worked on it for quite long time, no idea..so please help me to figure out..thanks so much

forceCleanWorker should allow wildcards

If you have multi worker that dies an untimely death, there may be multiple workers that need to be cleaned. For example, you have a server that has 11 jobs processing on a multiworker. If that server dies because ec2 networking issues, you need to clean up redis. (this happened to us today) You can't simply pass the IP of the dead server into queue.forceCleanWorker You need to pass IP+PID:<worker id>. It'd be nice if we could pass wildcards or a filter function into forceCleanFilter to determine if the worker should be forced cleaned.

Thoughts? Will you take a PR?

Redis connections are built up indefinitely

With every invocation to connect a new connection to Redis is created. This can easily be verified with LIST CLIENTS in Redis. Shouldn't connections be re-used? Imagine a scenario jobs are queued on an iterative basis, more and more connections are created.

var queue = new NR.queue({connection: connectionDetails}, jobs);
queue.on('error', function(error){ console.log(error); });

queue.connect(function(){
  queue.enqueue('math', "add", [1,2]);
  queue.enqueue('math', "add", [1,2]);
  queue.enqueue('math', "add", [2,3]);
  queue.enqueueIn(3000, 'math', "subtract", [2,1]);
});

Why is the connection not closed after the connect block? Wouldn't that make semantically more sense?

Why use this over coffee-resque?

I'm looking around for a simple node-based queuing system and this project seems to do very similar things to coffee-resque. Do you have a comparison between the two projects, i.e. why I should use this over coffee-resque?

async issue with queue.protoype.enqueue

Hi, I'm looping through some records, putting them in the queue, when I put them I am synced, when queue.prototype.enqueue runs it receives the records in the order I put them in, but when self.encode(q, func, args) runs it always gets the last one.

Here is an example:

var queue = new NR.queue({connection: obj.connectionDetails}, jobs, function(){
  // Use connect method to connect to the Server
    var creator = function(queue_name, action, record){
        console.log("=========RECORD I AM GOING TO ENQUEUE=======");
        console.log(JSON.stringify(record));
        queue.enqueue(queue_name, action, record);
        console.log("=========I HAVE ENQUEUED=========");
      };
      var record = {};
    for(var x = 0; x < 4; x++){
       record.x = x;
       creator("myqueue","stuff_to_do",record);
    }


  });

I put some log statements in queue.js to see give some output

queue.prototype.enqueue = function(q, func, args, callback){
  var self = this,
  encode;
  if(arguments.length === 3 && typeof args === 'function'){
    callback = args;
    args = [];
  }else if(arguments.length < 3){
    args = [];
  }
  args = arrayify(args);
  console.log("============IN queue.prototype.enqueue=========");
  console.log(JSON.stringify(args, null, ' '));
  console.log("============END queue.prototype.enqueue ========");
  var job = self.jobs[func];
  self.runPlugins('before_enqueue', func, q, job, args, function(err, toRun){
    if(toRun === false){
      if(typeof callback === 'function'){ callback(err, toRun); }
    }else{
      self.connection.ensureConnected(callback, function(){
        self.connection.redis.sadd(self.connection.key('queues'), q, function(){
          encode = self.encode(q, func, args);
          console.log("encode was " + encode);
          self.connection.redis.rpush(self.connection.key('queue', q), encode, function(){
            self.runPlugins('after_enqueue', func, q, job, args, function(){
              if(typeof callback === 'function'){ callback(err, toRun); }
            });
          });
        });
      });
    }
});
};

Which gives me the following output

=========RECORD I AM GOING TO ENQUEUE=======
{"x":0}
============IN queue.prototype.enqueue=========
[
{
"x": 0
}
]
============END queue.prototype.enqueue ========
=========I HAVE ENQUEUED=========
=========RECORD I AM GOING TO ENQUEUE=======
{"x":1}
============IN queue.prototype.enqueue=========
[
{
"x": 1
}
]
============END queue.prototype.enqueue ========
=========I HAVE ENQUEUED=========
=========RECORD I AM GOING TO ENQUEUE=======
{"x":2}
============IN queue.prototype.enqueue=========
[
{
"x": 2
}
]
============END queue.prototype.enqueue ========
=========I HAVE ENQUEUED=========
=========RECORD I AM GOING TO ENQUEUE=======
{"x":3}
============IN queue.prototype.enqueue=========
[
{
"x": 3
}
]
============END queue.prototype.enqueue ========
=========I HAVE ENQUEUED=========
encode was {"class":"stuff_to_do","queue":"myqueue","args":[{"x":3}]}
encode was {"class":"stuff_to_do","queue":"myqueue","args":[{"x":3}]}
encode was {"class":"stuff_to_do","queue":"myqueue","args":[{"x":3}]}
encode was {"class":"stuff_to_do","queue":"myqueue","args":[{"x":3}]}

so I mean obvious there is async behavior going on. But I'm not sure what I should do in this case, I'm supposing this is a common enough use case that there is probably just some configuration I need to do or is this a real issue I'm finding here?

why include the `perform:`?

Since we're in the happy world of node and first-class functions, why bother with the concept of a 'class'?

e.g for Ruby resque

class DoSomeThing
 def self.perform
   puts "hi"
 end
end

we could simply write var jobs = { DoSomeThing: function() { console.log("hi") } };

Multiworker cleanup error

My app hit an error last night in the multiWorker.workerCleanup function:

 TypeError: Object {"class":"jobClass,"queue":"squeegee","args":["someId","abc123"]} has no method 'forEach'
        at /home/app/code/node_modules/node-resque/lib/worker.js:283:15
        at try_callback (/home/app/code/node_modules/node-resque/node_modules/redis/index.js:592:9)
        at RedisClient.return_reply (/home/app/code/node_modules/node-resque/node_modules/redis/index.js:685:13)
        at ReplyParser.<anonymous> (/home/app/code/node_modules/node-resque/node_modules/redis/index.js:321:14)
        at ReplyParser.emit (events.js:95:17)
        at ReplyParser.send_reply (/home/app/code/node_modules/node-resque/node_modules/redis/lib/parser/javascript.js:300:10)
        at ReplyParser.execute (/home/app/code/node_modules/node-resque/node_modules/redis/lib/parser/javascript.js:203:22)
        at RedisClient.on_data (/home/app/code/node_modules/node-resque/node_modules/redis/index.js:547:27)
        at Socket.<anonymous> (/home/app/code/node_modules/node-resque/node_modules/redis/index.js:102:14)
        at Socket.emit (events.js:95:17)
        at Socket.<anonymous> (_stream_readable.js:765:14)
        at Socket.emit (events.js:92:17)
        at emitReadable_ (_stream_readable.js:427:10)
        at emitReadable (_stream_readable.js:423:5)
        at readableAddChunk (_stream_readable.js:166:9)
        at Socket.Readable.push (_stream_readable.js:128:10)
        at TCP.onread (net.js:529:21)

I did some googling, and it looks like this may have been caused by something in node-redis. I can kind of reproduce this by throwing this block somewhere in the code

redis.monitor(function (err, res) { })

I say kind of because pretty much all redis command then fail. I also don't think this caused the problem I ran into because my app didn't enter monitor mode. I think it may have been some other node-redis error.

We should probably handle errors instead of ignore them in workerCleanup.

self.getPids(function(err, pids){
    self.connection.redis.smembers(self.connection.key('workers'), function(err, workers){
     // TODO err is ignored. 
      workers.forEach(function(w){
``

queue_lock

in the declaration of a job, allow

queue_lock: function(a,b) {
// returns a lock key or null
}

if there is a job in any queue at that moment (not running), do not enqueue (no-op)
this provides mutual exclusion at the queue level. This is only for non-delayed / normal queues

Per queue scheduler

Hey,

I've got a few different instances of workers running against the same Resque database. For better robustness and self-containment I'd like to have schedulers run separately for each queue, too. How would one do that? Thanks!

"No job defined for class" emits error without job

Hey,

When Worker.prototype.perform throws Error: No job defined for class... it does it before setting worker.job. That results in the error handler being called without the job reference and makes it impossible to retry/requeue that job.

Scheduler: possible EventEmitter memory leak

When running a scheduler process: if the redis connection is temporarily lost, then I'll start getting these warnings

warning: possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at RedisClient.addListener (events.js:239:17)
    at connection.connect (./node_modules/node-resque/lib/connection.js:87:14)
    at new queue (./node_modules/node-resque/lib/queue.js:19:19)
    at ./node_modules/node-resque/lib/scheduler.js:24:18
    at RedisClient.<anonymous> (./node_modules/node-resque/lib/connection.js:68:7)

Rather than attempting a new connection, the scheduler should probably wait for the connect event and resume polling, also ensuring the scheduler's callback is only invoked once.

Better inspection & deletion methods

I don't always know the parameters passed to a delayed task, but I'd still like to inspect how many are scheduled at a given time. A plausible workaround is having a unique queue per task, but a more flexible option makes sense to me.

Re-queue failed jobs

It would be nice to specify an option to re-queue failed jobs automatically instead of them disappearing.

Worker connect to all queues

I'd love to use dynamic queue names. Is there / could there be a way to use a wildcard (or perhaps omit) the array of queues to work when creating the worker?

var worker = new NR.worker({connection: connectionDetails, queues: "all"} ...);

Correct workflow

Could someone point me to the correct workflow? I am not clear as to how the scheduler works or what it is supposed to be doing. For example, if you have a worker running (meaning you don't kill it when done) it polls the queue and fires the moment something is pushed in.

Is the scheduler supposed to be firing the workers? If so, does this apply to the multiworker pattern as well?

job_lock

in the declaration of a job, allow

job_lock: function(a,b) {
// returns a lock key or null
}

if there is a job running right now with the same lock key, it re-enques itself.
this provides mutual exclusion at the runtime job level.

Worker not registered if Redis down on startup

Hey,

The Redis library happily reconnects if the server is down for a second when createClient is called. NodeResque, however, doesn't seem to be too aware of that as the worker isn't registered to Redis in that case, even though jobs seem to run fine.

Features/Documentation Request

Hi I am planning to use this for some of my old php projects + upcoming ruby/node-js projects.

  • Is it possible to have documentation for the key-name format + redis functions, I can follow in order to enqueue jobs from php/ruby
  • how do I track the status of job, (scenario is, multiple users will be putting items to queue, and want to check the status of the job they put in.)

Awesome project btw... works beautifully, and easy to implement.

Thank you

Delayed job state

We're running into a weird state, where there is a key delayed:<timestamp>, but that timestamp isn't part of the delayed_queue_schedule. This means the job never runs, but the delayQueueLock thinks the job is delayed and will run in the future. The timestamp: key also exists.

I'm wondering if there is some weird problem:

self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){
      self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){
        // Something happens and the zadd doesn't execute correctly?
        self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
          if(typeof callback == "function"){ callback(); }
        });
      });
    });

I have a few thoughts...

  1. What's causing the zadd to not add this to the delayed set. I'm trying to come up with a test for this. Maybe it's not a problem with the zadd, and rather a problem w/ cleanup in the scheduler.cleanupTimestamp
self.connection.redis.del(key);
      self.connection.redis.zrem(self.connection.key('delayed_queue_schedule'), timestamp);
  1. Should the delayQueueLock be smarter?
  2. Should the worker.end() or schedule.end() clean up jobs that get into this state?

What do you think, @evantahler?

Queue constructor callback runs twice

When supplying a redis client object as the connection details for a new queue, the callback defined as the third parameter of the queue is called twice.

var resque = require('node-resque');
var redis = require('redis').createClient(6379, '127.0.0.1');

var queue = new resque.queue({
  connection: { redis: redis },
}, {}, function () {
  console.log('CALLBACK');
});

// output:
//   CALLBACK
//   CALLBACK

Tested with:
[email protected] and [email protected]
[email protected]

null job in success callback

I'm looking into this to try and come up with a test case for it, but I'm hoping someone might see something that I'm overlooking. Last week we had a few issues where the job was null in the success callback.

i.e.

worker.on('success', function (queue, job, result) {
  // job was null
})

Worker processes not being cleanup properly on heroku

I'm having problems with workers not being cleaned up properly on heroku. Since heroku uses different dynos per worker, its not possible to use the built in worker cleanup inside node-resque (workers will be 1 PID per dyno).

Here is the worker code I am using:

NR = require("node-resque")

worker = new NR.worker {connection: config.resque, queues: queues, timeout: 1000}, jobs, ->
  debug "Worker running on DYNO: #{process.env.DYNO}"
  worker.start()

worker.on 'error', (queue, job, error) ->
  debug("Worker error: " + queue + " " + JSON.stringify(job) + " >> " + error)

worker.on "end", ->
  debug "Worker has ended"

process.once "SIGTERM", ->
  debug "Worker issued SIGTERM, shutting down"
  debug "Worker is currently working" if worker.working is true
  worker.end ->
    debug "Worker has been untracked, quitting process"
    process.exit()

debug "Instance of worker loaded."

With heroku, they issue a SIGTERM to all processes when a dyno shuts down. This in our case will call worker.end and then process.exit(). If the process doesn't end within 10 seconds, heroku will issue a SIGKILL - we are not seeing it get this far in our logs.

Sometimes when a dyno is terminated, we see this in the logs (a healthy shutdown):
Note also that heroku logs are not deterministic in their ordering.

Aug 13 14:21:44 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker has ended 
Aug 13 14:21:44 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker issued SIGTERM, shutting down 
Aug 13 14:21:45 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker has been untracked, quitting process
Aug 13 14:21:45 heroku/worker.2:  Stopping all processes with SIGTERM 

Note the untracked log in the snip above, this in my mind is a healthy shutdown.

However, sometimes it is terminated and we see this (resque thinks this worker is still working):

Aug 14 09:03:21 heroku/worker.4:  State changed from up to down 
Aug 14 09:03:25 app/worker.4:  Thu, 14 Aug 2014 16:03:25 GMT Worker issued SIGTERM, shutting down 
Aug 14 09:03:25 app/worker.4:  Thu, 14 Aug 2014 16:03:25 GMT Worker has ended 
Aug 14 09:03:25 app/worker.4:  Error waiting for process to terminate: No child processes 
Aug 14 09:03:26 heroku/worker.4:  Stopping all processes with SIGTERM 
Aug 14 09:03:26 heroku/worker.4:  Process exited with status 22 

I have also been following up with heroku support on this issue in case it's a problem with how they shut down instances, but it appears as though the problem is from within node-resque. They mentioned that sometimes node doesn't completely flush its logs before the process exits, this might be the case for why the logs are different above, but it should still untrack the worker in redis.

Happy to provide any more information and tests to help track this down, appreciate anyone who can help look into it.

Worker job timeout

It looks like we're lacking the ability to time out a job if it hasn't completed for some specified period, or perhaps I'm overlooking that functionality.

I just noticed I had two workers that were stuck for 15+ minutes on something that should take < 1 second, which means something funky happened. I want to make sure that worker knows it can abort whatever is hung, and start polling again.

Thoughts?

workerCleanup untracks all workers on Windows

When this method is called, it removes all workers from the workers list, which makes it impossible to see the workers on resque-web.

This happens because of the check pids.indexOf(pid) < 0 on line worker.js#L278

Because of

worker.workerCleanup() only works for *nix operating systems (osx, unix, solaris, etc)

There should be a check for windows too, to return early.

edit: I've started to work on this, I will make a pull request soon.

Change worker callback API

The callback for a job's perform function doesn't really follow typical node conventions. The callback receives a single argument, and then there's a check in the worker.js to see if it's an error. e.g.

cb.apply(null, [].slice.call(args).concat([function(result) {
  self.runPlugins('after_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(err, toRun){
    if (result instanceof Error) {
      self.completeJob(null, result, callback);
    } else {
      self.completeJob(result, null, callback);
    }
  });
}]));

And the perform looks like this:

perform: function (next) {
  next(new Error('Blue smoke'))
  // or next(myAnswer)
}

I think we should follow node conventions and have the perform callback receive an error as the first argument, and the result as the second. If there's no error, send null.

Would you accept a PR for this?

Resque-Scheduler w/CRON Schedule Example

Do you have an example of using node-resque with a CRON schedule? I didn't see any examples you had of this working, but you say you support resque-scheduler and I wasn't sure if that included the scheduling as well.

Another option would be to run node-cron alongside node-resque to accomplish what I want to do by having that add items to the queue for node-resque to process.

Custom plugins cannot be configured

The custom plugins don't support pluginOptions. This isn't the end of the world, since a custom plugin could implement the options internally, but it would be better if they supported pluginOptions.

Changelog.md or similar?

Would it be possible to provide a Changelog.md or History.md file etc. so one can check the actual changes happening for every release?

Much apprecaited, thx

Any benefit of creating a new connection to Redis every time?

Hey,

I noticed all of the various classes Node-Resque uses create their own connection to Redis (and sometimes more than one). Is this necessary or beneficial in some way over sharing a single connection? Reading the Redis README hints that it does support multiple parallel requests...

concurrency

Haven't noticed any mention of concurrency. Since we're using node, lots of jobs that are mostly waiting on IO would be far more efficiently dealt with if we allow concurrency.

Multiworker: EventEmitter memory leak

Hello,
while using node-resque, What i do is in app.js, I declare queue as global variable

global.Resque = new nR.queue({connection: connectionDetails}, "");
Resque.connect(resolve);

and whenever i need to enqueue a job i just do
Resque.enqueue from any part of app but sometimes it gives this error while enqueueing job.

(node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Redis.addListener (events.js:239:17)
    at connection.connect (node-resque/lib/connection.js:78:14)
    at queue.connect (queue.js:24:19)
    at worker.connect (worker.js:52:20)
    at multiWorker.startWorker (multiWorker.js:62:10)
    at Immediate._onImmediate (multiWorker.js:148:12)
    at processImmediate [as _immediateCallback] (timers.js:374:17)

Ensure that multiWorker works with fakereids

Currently it does not. The eventloop block detection does not work with a local in-line redis. We need to implement timers (with a rolling average?) to check for delay in this case.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.