Giter VIP home page Giter VIP logo

work_queue's Introduction

WorkQueue

A simple implementation of the hungry consumer work scheduling model.

Given a queue of work to be processed, we create a a pool of workers. Each worker requests the next item to process from the queue. When it finishes processing, it reports the result back and then requests the next item of work.

The intent is that we do not need a central control point which preassigns work—that is done dynamically by the workers.

This has a couple of advantages. First, it locally maximizes CPU utilization, as workers are always busy as long as there is work in the queue.

Second, it offers a degree of resilience, both against CPU hogging work items and against worker failure.

Simple Example

results = WorkQueue.process(
  fn (val,_) -> { :ok, val*2 } end,   # worker function
  [ 1, 2, 3 ]                         # work items to process
)

assert length(results) == 3
for {input, output} <- results, do: assert(output == input * 2)

This code will allocate a number of workers (the default is 2/3rds of the available processing units in the node). Each worker then runs the given function for as long as there are items to process. The results are collected (in the order the workers return them) and returned.

The API

results = WorkQueue.process(work_processor, item_source, options \\ [])
  • work_processor is a function that transforms an item from the work queue. It receives a value, and returns either {:ok, result} or {:error, reason}

  • item_source is the source of the items to be processed. In the simplest case, this is just an enumerable. Successive items are taken from it and fed to the workers.

    In other situations, you may need to perform additional processing in order to generate the items. In particular, the item source may be unbounded. In this case, you need to provide a get_next_item function (using the options—see below). This function receives the item_source as its initial state.

  • options is an optional keyword list:

    • worker_count: count

      If count is an integer, start that number of workers. If it is a float, it becomes a factor by which we multiply the number of processing units on the node (so specifying 0.5 will start workers for one-half the number of available processing units). You can also pass the symbols :cpu_bound and :io_bound. The former creates a worker for each processing unit, the latter creates 10 workers per processing unit.

    • get_next_item: func

      The function that fetches the next item to be given to a worker. It initially receives item_source as its parameter. It returns a three element tuple. The first element is :ok if an item has been returned, or :done otherwise. The second element is the item to be returned, and the third is the updated item source state.

      The default value of get_next_item for list values of the item source is

       defp traverse_list([]),    do: {:done, nil, []}
       defp traverse_list([h|t]), do: {:ok,   h,   t}
    • report_each_result_to: func

      Invoke func as each result becomes available. The function receives a tuple containing the original work item and the result of running the calculation on that item. Its return value is ignored.

       test "notifications of results" do
         WorkQueue.process(
           &double/2,
           [ 1, 2, 3 ],
           report_each_result_to:
             fn {input, output} -> assert(output == input*2) end
         )
       end
    • report_progress_to: func, report_progress_interval: ms

      Invoke func to report progress. It is passed

      • {:starting} when work is about to start
      • {:progress, count} every ms milliseconds, indicating the total number of items processed so far
      • {:finished, results} reported when processing finishes

      Progress reporting is disabled when report_progress_interval is false (the default).

work_queue's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

work_queue's Issues

test failed when passing anonymous function as 1st argument

on Windows; Elixir 1.0.1;
mix test can pass all 18 cases by default.
but if use fn val -> {:ok, val * 2} end (refer to README) instead of &double/2 as the 1st argument passed to WorkQueue.process/2 in the test case "basic queue", errors raised:

09:05:56.328 [error] Task #PID<0.129.0> started from #PID<0.127.0> terminating
Function: &WorkQueue.Worker.do_process/3
    Args: [%{item_source: [2, 3], opts: %{get_next_item: #Function<3.45664607/1
in WorkQueue.Options.setup_get_next_item/2>, item_source: [], report_each_result
_to: #Function<0.45664607/1 in WorkQueue.Options.default_options/0>, report_prog
ress_interval: false, report_progress_to: #Function<1.45664607/1 in WorkQueue.Op
tions.default_options/0>, worker_args: [], worker_count: 3}, results: [], superv
isor_pid: #PID<0.128.0>, worker_fn: #Function<4.44435900/1 in WorkQueueTest.test
 basic queue/1>}, #PID<0.127.0>, 1]
** (exit) an exception was raised:
    ** (BadArityError) #Function<4.44435900/1 in WorkQueueTest.test basic queue/
1> with arity 1 called with 2 arguments (1, [])
        (work_queue) lib/work_queue/worker.ex:20: WorkQueue.Worker.do_process/3
        (elixir) lib/task/supervised.ex:74: Task.Supervised.do_apply/2
        (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

now trying to investigate this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.