Giter VIP home page Giter VIP logo

faktory-rs's Introduction

faktory-rs

Crates.io Documentation Codecov dependency status

API bindings for Faktory workers and job producers.

This crate provides API bindings for the language-agnostic Faktory work server. For a more detailed system overview of the work server, what jobs are, and how they are scheduled, see the Faktory docs.

System overview

At a high level, Faktory has two primary concepts: jobs and workers. Jobs are pieces of work that clients want to have executed, and workers are the things that eventually execute those jobs. A client enqueues a job, Faktory sends the job to an available worker (and waits if they're all busy), the worker executes the job, and eventually reports back to Faktory that the job has completed.

Jobs are self-contained, and consist of a job type (a string), arguments for the job, and bits and pieces of metadata. When a job is scheduled for execution, the worker is given this information, and uses the job type to figure out how to execute the job. You can think of job execution as a remote function call (or RPC) where the job type is the name of the function, and the job arguments are, perhaps unsuprisingly, the arguments to the function.

In this crate, you will find bindings both for submitting jobs (clients that produce jobs) and for executing jobs (workers that consume jobs). The former can be done by making a Producer, whereas the latter is done with a Consumer. See the documentation for each for more details on how to use them.

Encrypted connections (TLS)

To connect to a Faktory server hosted over TLS, add the tls feature, and see the documentation for TlsStream, which can be supplied to Producer::connect_with and Consumer::connect_with.

Examples

If you want to submit jobs to Faktory, use Producer.

use faktory::{Producer, Job};
let mut p = Producer::connect(None).unwrap();
p.enqueue(Job::new("foobar", vec!["z"])).unwrap();

If you want to accept jobs from Faktory, use Consumer.

use faktory::ConsumerBuilder;
use std::io;
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
    println!("{:?}", job);
    Ok(())
});
let mut c = c.connect(None).unwrap();
if let Err(e) = c.run(&["default"]) {
    println!("worker failed: {}", e);
}

Run test suite locally

First ensure the "Factory" service is running and accepting connections on your machine. To launch it a Factory container with docker, run:

docker run --rm -it -v faktory-data:/var/lib/faktory -p 127.0.0.1:7419:7419 -p 127.0.0.1:7420:7420 contribsys/faktory:latest /faktory -b :7419 -w :7420

After that run the tests:

FAKTORY_URL=tcp://127.0.0.1:7419 cargo test --all-features --locked --all-targets

Please note that setting "FAKTORY_URL" environment variable is required for e2e tests to not be skipped.

faktory-rs's People

Contributors

icefoxen avatar jonhoo avatar jssblck avatar kwest-ng avatar rustworthy avatar syfaro avatar ttdonovan avatar wezm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

faktory-rs's Issues

Measurement and threading with core affinity

Hi Jon,

Thank you for creating and maintaining the crate, it comes handy for probably quite a lot of people. However I do have a few specific questions that I can't find the answers from the doc.

Q1: Is it possible to measure the time it takes while executing each job? I know that register can return something other than () wrapped within a Result but I don't think that is what c.run returns. Is this kind of measurement possible at all? I am doing logging right now but ideally the results can be stored and pull together at some point.

c.register("foobar", |job| -> io::Result<()> {
    println!("{:?}", job);
    Ok(())
});

Q2: I read the doc and see that this crate comes with a workers option where we can specify more than one thread. Is it possible to pin those threads to different cores to actually take advantage of all the cores I have? If not, I guess we will need to use thread + core affinity with more than one consumers, right?

Best,

Context/State for Job Workers?

To preface, I'm a fairly amateur rust developer and what I'm suggesting is no doubt a breaking change for the worker API, and I'm probably not skilled enough to implement a completely sound library change with this sort of scope.

I'm attempting to use workers written in rust, but I quickly ran into issues with managing state that exists outside of the worker itself. While reading over previously closed issues (particularly #13 and the PR that closed it), I'm wondering if there is a roadmap or want for something akin to rocket's request guards. My primary use case for this is managing DB connections with diesel. Something utilizing https://docs.rs/state/0.4.1/state/ might be possible?

I suppose with the current API I could write a closure that wraps a Fn and grabs a connection, but that seems a bit awkward and I could easily see the function stringing together the workers becoming harder to reason about.

This isn't an absolutely critical or show-stopping goal, since I'm writing my workers in python in the interim but it would be a "nice to have" so that my workers and main codebase can share the exact same model structs and logic without duplication.

Forced termination does not actually terminate anything

Currently, the code will exit once the next job finishes (or immediately if there are no current jobs), but that's not sufficient. A sketch of the right approach is given as a TODO inside Consumer:

// this is not sufficient. in this case, we should FAIL the current job,
// send END, shutdown the socket, and then exit from `run`. however, that's
// pretty tricky given the current code layout. we'd probably need to "invert"
// it so that the worker is in a thread and the heartbeat handling isn't. we'd
// also then need to share the "current" job id between the two (probably using
// AtomicOption). the hardest part is likely going to be ensuring that
// self.last_job_result is correctly updated (so that we can resume), but this
// might be made easier by having the main thread update it by subscribing to
// messages from the worker thread, and then simply not updating it when
// TERMINATE is given (which is likely what we'd want anyway).

Open to replacing `failure` with `thiserror`?

Hey there! I'm starting a new project using this library.

Now that failure is deprecated, and the ecosystem is moving towards a system where libraries report implementations of Error, would you be open to a PR replacing usage of failure with proper Error implementations?

Use isize instead of usize for retry Job property

Hello,

as per Faktory docs, we need to send a rety value of -1 for not retrying jobs but keep them in the dead queue for later inspection, as of now, the use of the usize type for the retry property prevent this.

Can you change it to isize so we are able to send negative numbers to the Faktory server?

Thanks,

Support spawning a worker pool with `run`

Currently, run (and by extension, run_to_completion) only spawns a single worker thread. While users are free to manage threads themselves, this is still unfortunate as it duplicates heartbeats, and makes error handling more complex (e.g., run_to_completion returning on one thread would kill the others without allowing them to fail jobs).

This should be fixed by instead spinning up a worker pool of configurable size to handle jobs.

No TLS support

Adding support for TLS should be fairly straightforward:

Given that users may also want to be able to specify options, we might need some intermediate builder that exposes TlsConnectorBuilder. I'm not entirely sure what the most ergonomic way to do this is, but one suggestions is to change StreamConnector to:

pub trait StreamConnector {
    type Addr: FromUrl;
    type Stream: Read + Write + Sized + 'static;
    fn connect(self, addr: Self::Addr) -> io::Result<Self::Stream>;
}

And the following impl block for the connection methods on Client to:

impl<C: StreamConnector> Client<C::Stream> {

This will require adding a little more code to the impl for TcpStream, but shouldn't be too bad.

Is there a way to implement async handlers?

Is it possible to register async handlers? I want to do some work in a handler that ultimately invokes a function that calls await. Currently my code that looks like:

let mut c = ConsumerBuilder::default();
c.register("job_type", &async_handler);

...

async fn async_handler(job: Job) -> io::Result<()> {
	// here I would do stuff and .await
	Ok(())
}

produces:

error[E0271]: expected `&fn(Job) -> impl Future<Output = Result<(), Error>> {async_handler}` to be a reference that returns `Result<(), Error>`, but it returns `impl Future<Output = Result<(), Error>>`
   --> src/main.rs:48:22
    |
48  |     c.register("job_type", &async_handler);
    |       --------          ^^^^^^^^^^^^ expected `Result<(), Error>`, found future
    |       |
    |       required by a bound introduced by this call

Consumer - how do I convert back to the original data?

I've got a small test running. I can send data through Faktory, and the consumer receives it fine.

If I enqueue a job with args:

vec! [1, 2, 3, 4]

I receive:

[Number(1), Number(2), Number(3), Number(4)]

That shows up with a type of &[Value]. How do I get it back to a Vec? I've been going through serde_json docs trying to figure this out.

Thanks.

Consumer::run_to_completion() has an unused type parameter?

Consumer::run_to_completion() has the following signature:

fn run_to_completion<Q, U>(self, queues: &[Q]) -> !

The U parameter appears to be used for nothing, and defining it to be anything when I call the function seems to change nothing; without an explicit definition for it the call does not compile.

Shorten heartbeat window to enable faster reaction times

Currently, the heartbeat thread sends a heartbeat every 5 seconds, as suggested by the Faktory documentation. However, the heartbeat handling code also performs additional operations, such as watching for the worker thread terminating. This means that there will be a ~2.5s delay between changes to the worker state and run returning, which is unfortunate.

The fix to this is to have the heartbeat thread sleep for shorter amounts of time, but only write once 5s has elapsed.

Use error type instead of using catch all failure::Error in Producer::con

Per the failure guidance the failure::Error type is typically only recommended when prototyping or for "applications" that won't handle the error. It would be better to define a custom error type and 'derive Fail' for it.

I see there are some parts of the code using a custom error type but others are falling back on Error.

If this sounds reasonable I'm happy to make this change.

Handle signals

We should add a dependency on ctrlc so that the worker can be told to cleanly exit when run_to_completion is called. The implementation will likely be fairly straightforward, essentially just storing STATUS_TERMINATING and letting the heartbeat thread do the cleanup (including failing the currently execution job). This may require some minor changes to make the heartbeat thread recognize that this is indeed a forced exit.

We may also want to allow the user to opt-in to signal handling for run, although that's less clear (we might instead want to return some kind of handle that the caller can use to signal to exit or wait for termination).

Confused about labels

Hi the documentation mentions a "labels" field on Job however as far as I can tell there is no such field. Is the documentation incorrect or is there another detail I'm missing?

Provide ergonomic access to Pro features

Faktory now has a Pro edition, and that comes with (at the time of writing) two new API-facing features. Specifically, it adds unique jobs and expiring jobs. It'd be good to have a convenient interface to these without having to set fields in job.custom manually in the right format. These additional convenient methods should probably be under a non-default feature called "pro" so that non-pro users don't see methods they can't use.

Making consumer jobs closures is super inconvenient.

Sad but true. I want to have this:

    /// Same as `run()` but gets jobs from a faktory server.
impl Foo {
    fn run_faktory() -> Result<()> {
        let faktory_server = "tcp://localhost:7419";
        let mut c: faktory::ConsumerBuilder<&(Fn(faktory::Job) -> Result<()> + Send + Sync + 'static)> = faktory::ConsumerBuilder::default();

        c.register("pin", |job: faktory::Job| -> Result<()> {
            self.do_stuff()?;
            Ok(())
        });

        c.register("unpin", |job: faktory::Job| -> Result<()> {
            self.do_other_stuff()?;
            Ok(())
        });
        
        let mut consumer = c.connect(Some(faktory_server)).unwrap();
        consumer.run_to_completion::<_, ()>(&["default"]);
        if let Err(e) = consumer.run(&["default"]) {
            println!("Worker failed: {}", e);
        }
        Ok(())
    }
}

First problem: c gets closures of two different types. This is an error 'cause closure types are never equivalent. Okay, so we take references to them. This works, but only because of HECKIN BLACK MAGIC; if you do this:

        c.register("pin", &|job: faktory::Job| -> Result<()> {
            self.do_stuff()?;
            Ok(())
        });

it works, but if you do:

        let closure = |job: faktory::Job| -> Result<()> {
            self.do_stuff()?;
            Ok(())
        });
        c.register("pin", &closure);

it can't figure out that the lifetime for the closure is valid.

Okay, this is inconvenient but not impossible. But self.do_stuff() borrows self, which means the closure is not 'static and Can't Work. Okay, so we factor self out into another structure, which is initialized before c.register() and moved into the closure:

        let mock_self = String::from("foo");
        c.register("pin", &move |job: faktory::Job| -> Result<()> {
            println!("Foo is: {}", mock_self);
            Ok(())
        });

But for some reason the move annihilates rustc's magic ability to figure out the closure's lifetime is valid. And on stable, boxed closures do not implement Fn.

It gets even worse from there because the real mock_self I want to use is not Send but I might be able to engineer around that. But the lifetimes still Don't Work.

Summary: Rust closures suck ass. Sad but true. The only good way around it is to make the method take a trait object instead:

trait Runnable {
    fn run(job: faktory::Job) -> Result<()>;
}
impl Runnable for Box<T> where T: Runnable ...

impl ConnectionBuilder {
    pub fn register<K>(&mut self, kind: K, handler: F) -> &mut Self where
        K: Into<String>, F: Runnable { ... }
}

or something like that. Then your "mock-self" object can just implement Runnable, the object has a specific type and lifetime and all that jazz and you don't have to

Maybe I'm wrong and this works fine and I can't figure out how to make all the bits fit together. But every time I use non-trivial closures in Rust they're utterly awful. If you have any suggestions on how to make them better, I'm all ears.

Fix race between job completion and heartbeats

Currently, there exists a race between the thread that performs heartbeats and the worker thread that completes jobs. Specifically, when the worker waits for the OK following an ACK or a FAIL, it might receive the reply intended for the heartbeat, and vice-versa.

This is normally not an issue, since both heartbeats and workers generally receive OK responses. However, it can misbehave when a non-OK response is given (e.g., an error or a QUIET). There is also a write race here, where technically an outgoing message could be large enough that it ends up being flushed across multiple system calls, at which point a BEAT and an ACK/FAIL could be mixed.

Fixing this will require a bit of engineering. The easiest is probably to spin up a third RPC thread that is responsible for sending messages and waiting for replies in a single atomic step.

Use builder for creating Job

Currently setting properties on Job outside of kind and args requires manually building a Job or building a Job with new, then mutating its properties. I'd suggest using a builder to create jobs and make the fields of Job private. My suggested approach would probably be derive_builder.

If this sounds reasonable I'm happy to make this change.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.