Giter VIP home page Giter VIP logo

quasiqueue's Introduction

QuasiQueue

QuasiQueue is a MultiProcessing library for Python that makes it super easy to have long running MultiProcess jobs. QuasiQueue handles process creation and cleanup, signal management, cross process communication, and all the other garbage that makes people hate dealing with multiprocessing.

QuasiQueue works by splitting the work into two components- the main process whose job it is to feed a Queue with work, and then read processes that take work off of the Queue to run. All the developers have to do is create two functions-

  • writer is called when the queue gets low. It should return an iterable (list, generator) that QuasiQueue uses to grow the multiprocess Queue.
  • reader is called once for each item in the Queue. It runs in a completely different process from the writer.
flowchart LR
  writer(writer)-->queue((queue))
  queue-->reader1(reader)
  queue-->reader2(reader)
  queue-->reader3(reader)
  queue-->reader4(reader)

These functions can be as simple or complex as you need.

import asyncio

from quasiqueue import QuasiQueue

async def writer(desired_items: int):
  """Feeds data to the Queue when it is low.
  """
  return range(0, desired_items)


async def reader(identifier: int|str):
  """Receives individual items from the queue.

  Args:
      identifier (int | str): Comes from the output of the Writer function
  """
  print(f"{identifier}")


runner = QuasiQueue(
  "hello_world",
  reader=reader,
  writer=writer,
)

asyncio.run(runner.main())

Use Cases

There are a ton of use cases for QuasiQueue.

WebServer

QuasiQueue could be the basis for a web server. The write function would need to feed sockets to the Queue, would would be picked up by the reader for handling.

flowchart LR

  subgraph Configuration
  http
  end

  subgraph Server
  http-->writer
  writer(writer)--socket-->queue((queue))
  queue--socket-->reader1(reader)
  queue--socket-->reader2(reader)
  queue--socket-->reader3(reader)
  queue--socket-->reader4(reader)
  end

Website Image Crawler

QuasiQueue could be used to crawl a website, or series of websites, to download data.

flowchart RL

  subgraph Crawler
  writer(writer)-->queue((queue))
  queue-->reader1(reader)
  end
  database(Links)--Stale or Unread Links-->writer
  reader1(reader)--Images-->FileSystem
  reader1(reader)--Found Links-->database

As new pages are found they get added to a database. The write pulls pages out of the database as the queue gets smaller, and the reader adds new pages that it finds to the database. The writer function can pull links that haven't been crawled at all, and once it runs out of those it can recrawl links based on their age.

Image Processor

QuasiQueue can be used to run large one off jobs as well, such as processing a list of images. If someone has several thousand images to process they can have the writer function feed the list into the Queue, and reader processes can take the files from the queue and run the processing needed.

flowchart LR

  subgraph Configuration
  filelist
  end

  subgraph ImageProcessor
  filelist-->writer
  writer(writer)-->queue((queue))
  queue-->reader1(reader)
  end
  reader1(reader)-->ProcessedFiles

Installation

pip install quasiqueue

Arguments

Name

The first argument when initilizing QuasiQueue is the name of the queue. This is used when naming new processes (which makes logging and ps commands a lot more useful)

Reader

The reader function is called once per item in the queue.

async def reader(identifier: int|str):
  """Receives individual items from the queue.

  Args:
      identifier (int | str): Comes from the output of the Writer function
  """
  print(f"{identifier}")

The reader can be extremely simple, as this one liner shows, or it can be extremely complex.

The reader can be asynchronous or synchronous. Since each reader runs in its own process there is no performance benefits to using async, but it does make it easier for projects that use a lot of async code to reuse their existing async libraries inside of the reader.

Writer

The write function is called whenever the Queue is low. It has to return an iterator of items that can be pickled (strings, integers, or sockets are common examples) that will be feed to the Reader. Generators are a great option to reduce memory usage, but even simple lists can be returned. The writer function has to be asynchronous.

The writer function only has one argument- the desired number of items that QuasiQueue would like to retrieve and add to the Queue. This number is meant to allow for optimization on behalf of the developers- it can be completely ignored, but QuasiQueue will run more efficiently if you keep it as close the desired_items as possible.

async def writer(desired_items: int):
  """Feeds data to the Queue when it is low.
  """
  return range(0, desired_items)

In the event that there are no items available to put in the Queue the write function should return None. This will signal to QuasiQueue that there is nothing for it, and it will add a slight (configurable) delay before attempting to retrieve more items.

QuasiQueue will prevent items that were recently placed in the Queue from being requeued within a configurable time frame. This is meant to make the write function more lenient- if it happens to return duplicates between calls QuasiQueue will just discard them.

Context

The context function is completely optional. It runs once, and only once, when a new reader process is launched. It is used to initialize resources such as database pools so they can be reused between reader calls.

If the function is provided it should return a dictionary. The reader function will need to have a context argument, which will be the results from the context function. The context function can be asynchronous or synchronous.

def context():
  ctx = {}
  ctx['http'] = get_http_connection_pool()
  ctx['dbengine'] = get_db_engine_pool()
  return ctx

def reader(identifier: int|str, ctx: Dict[str, Any]):
  """Receives individual items from the queue.

  Args:
      identifier (int | str): Comes from the output of the Writer function
      ctx (Dict[str, Any]): Comes from the output of the Context function
  """
  ctx['dbengine'].execute("get item")
  ctx['http'].get("url")
  print(f"{identifier}")


runner = QuasiQueue(
  "hello_world",
  reader=reader,
  writer=writer,
  context=context
)

Although this function is not required it can have amazing performance implications. Connection pooling of databases and websites can save a remarkable amount of resources on SSL handshakes alone.

Settings

QuasiQueue has a variety of optimization settings that can be tweaked depending on usage.

Name Type Description Default
empty_queue_sleep_time float The time in seconds that QuasiQueue will sleep the writer process when it returns no results. 1.0
full_queue_sleep_time float The time in seconds that QuasiQueue will sleep the writer process if the queue is completely full. 5.0
graceful_shutdown_timeout integer The time in seconds that QuasiQueue will wait for readers to finish when it is asked to gracefully shutdown. 30
lookup_block_size integer The default desired_items passed to the writer function. This will be adjusted lower depending on queue dynamics. 10
max_jobs_per_process integer The number of jobs a reader process will run before it is replaced by a new process. 200
max_queue_size integer The max allowed size of the queue. 300
num_processes integer The number of reader processes to run. 2
prevent_requeuing_time integer The time in seconds that an item will be prevented from being readded to the queue. 300
queue_interaction_timeout float The time QuasiQueue will wait for the Queue to be unlocked before throwing an error. 0.01

Settings can be configured programmatically, via environment variables, or both.

Environment Variables

All Settings can be configured via environment variables. The variables should start with the QuasiQueue name and an underscore. For example, if you named your QuasiQueue Acme then ACME_NUM_PROCESS would be used to set the number of processes.

Programmatic

There are two methods to programmatically define the settings.

The first one is to initialize the settings and override the specific ones.

from quasiqueue import Settings, QuasiQueue

QuasiQueue(
  "MyQueue",
  reader=reader,
  writer=writer,
  settings=Settings(lookup_block_size=50)
)

This method is simple, but the downside is that you lose the environment variable prefixes. So when using this method you have to set NUM_PROCESSES rather than MYQUEUE_NUM_PROCESSES. The work around is to extend the Settings object to give it your desired prefix.

from quasiqueue import Settings, QuasiQueue

class MySettings(Settings)
  lookup_block_size: int = 50

  class Config:
    prefix="MY_QUEUE_"

QuasiQueue(
  "MyQueue",
  reader=reader,
  writer=writer,
  settings=MySettings()
)

quasiqueue's People

Contributors

dependabot[bot] avatar tedivm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

quasiqueue's Issues

RuntimeError: Type not yet supported: str | None

When building from source and running quasiqueue --help after ran into this error. Seems to be maybe typer-related?

+ quasiqueue --help
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/bin/quasiqueue:11 in <module>                      │
│                                                                              │
│    8                                                                         │
│    9 if __name__ == '__main__':                                              │
│   10 │   sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])   │
│ ❱ 11 │   sys.exit(app())                                                     │
│   12                                                                         │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ app = <typer.main.Typer object at 0x7f622884b5c0>                        │ │
│ │  re = <module 're' from                                                  │ │
│ │       '/home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207… │ │
│ │ sys = <module 'sys' (built-in)>                                          │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:328 in  │
│ __call__                                                                     │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:311 in  │
│ __call__                                                                     │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:364 in  │
│ get_command                                                                  │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:577 in  │
│ get_command_from_info                                                        │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:553 in  │
│ get_params_convertors_ctx_param_name_from_function                           │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:850 in  │
│ get_click_param                                                              │
│                                                                              │
│ /home/conda/staged-recipes/build_artifacts/quasiqueue_1704731207243/_test_en │
│ v_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac │
│ ehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_ │
│ placehold_placehold_place/lib/python3.12/site-packages/typer/main.py:779 in  │
│ get_click_type                                                               │
╰──────────────────────────────────────────────────────────────────────────────╯
RuntimeError: Type not yet supported: str | None

Here's the environment:

The following NEW packages will be INSTALLED:
    _libgcc_mutex:     0.1-conda_forge           conda-forge
    _openmp_mutex:     4.5-2_gnu                 conda-forge
    annotated-types:   0.6.0-pyhd8ed1ab_0        conda-forge
    bzip2:             1.0.8-hd590300_5          conda-forge
    ca-certificates:   2023.11.17-hbcca054_0     conda-forge
    click:             8.1.7-unix_pyh707e725_0   conda-forge
    colorama:          0.4.6-pyhd8ed1ab_0        conda-forge
    ld_impl_linux-64:  2.40-h41732ed_0           conda-forge
    libexpat:          2.5.0-hcb278e6_1          conda-forge
    libffi:            3.4.2-h7f98852_5          conda-forge
    libgcc-ng:         13.2.0-h807b86a_3         conda-forge
    libgomp:           13.2.0-h807b86a_3         conda-forge
    libnsl:            2.0.1-hd590300_0          conda-forge
    libsqlite:         3.44.2-h2797004_0         conda-forge
    libuuid:           2.38.1-h0b41bf4_0         conda-forge
    libxcrypt:         4.4.36-hd590300_1         conda-forge
    libzlib:           1.2.13-hd590300_5         conda-forge
    markdown-it-py:    3.0.0-pyhd8ed1ab_0        conda-forge
    mdurl:             0.1.2-pyhd8ed1ab_0        conda-forge
    ncurses:           6.4-h59595ed_2            conda-forge
    openssl:           3.2.0-hd590300_1          conda-forge
    pip:               23.3.2-pyhd8ed1ab_0       conda-forge
    psutil:            5.9.7-py312h98912ed_0     conda-forge
    pydantic:          2.5.3-pyhd8ed1ab_0        conda-forge
    pydantic-core:     2.14.6-py312h4b3b743_1    conda-forge
    pydantic-settings: 2.1.0-pyhd8ed1ab_1        conda-forge
    pygments:          2.17.2-pyhd8ed1ab_0       conda-forge
    python:            3.12.1-hab00c5b_1_cpython conda-forge
    python-dotenv:     1.0.0-pyhd8ed1ab_1        conda-forge
    python_abi:        3.12-4_cp312              conda-forge
    quasiqueue:        0.3.1-pyhd8ed1ab_0        local      
    readline:          8.2-h8228510_1            conda-forge
    rich:              13.7.0-pyhd8ed1ab_0       conda-forge
    setuptools:        69.0.3-pyhd8ed1ab_0       conda-forge
    shellingham:       1.5.4-pyhd8ed1ab_0        conda-forge
    tk:                8.6.13-noxft_h4845f30_101 conda-forge
    typer:             0.9.0-pyhd8ed1ab_0        conda-forge
    typing-extensions: 4.9.0-hd8ed1ab_0          conda-forge
    typing_extensions: 4.9.0-pyha770c72_0        conda-forge
    tzdata:            2023d-h0c530f3_0          conda-forge
    wheel:             0.42.0-pyhd8ed1ab_0       conda-forge
    xz:                5.2.6-h166bdaf_0          conda-forge


Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.