Giter VIP home page Giter VIP logo

observable-persistent-queue's Introduction

OPQ (Observable Persistent Queue)

This project was born of the necessity to create a simple persistent message queue that would be usable by NPM libraries, so that no database would need to be installed; this is especially useful during NPM postinstall routines. NPM install commands should not happen in parallel, so we need a queue of some variety. Not only that, the queue needs to be persistent and shared, such that if npm install commands happen in parallel and are requested via multiple processes, that this information can be shared in one place.

  • Observable => Using RxJS, we observe queue for changes and react accordingly; RxJS allows for creating highly composable and flexible APIs.
  • Persistent => The queue is on the filesystem, and can be read + written to via multiple processes
  • Queue => standard FIFO queue, in this case, lines in a text file, separated by newline chars; however, you can create priority requests and pull items off the queue in any order you wish based on custom searches/queries.

Installation

npm install -S observable-persistent-queue

Disclaimer

Because the lock/unlock cycle takes about 3 ms, the queue will grow unboundedly if items are added to the queue faster than every 15 ms. Unfortunately adding more readers in different processes probably will not help drain the queue, because locking/unlocking is still a bottleneck, and the more lock/unlock requestors the slower it is for everyone to read from the queue. This library will delay adding new items to the queue if the number of requests waiting for a lock is greater than a certain number. You can use the {controlled:true} option with q.enq() to limit the rate at which items are added to the queue.

Limitations

You cannot store unicode characters in the text file queue; also for the moment colon characters ':' are not permitted, although that restriction may be removed later.

Design

This library uses live-mutex for locking to control read/write access to the queue and prevent corruption. This is much more robust than file locking (with the NPM lockfile library or similar). As stated, this library uses RxJS, specifically RxJS5.

Usage / API

initializing a queue

If the file already exists on the filesystem - no problem, that is expected, and we will use that as the queue. If the file does not exist, we will create the new empty file (or with initial data). OPQ will also manage port conflicts, if there is already a live-mutex broker listening on port 8888, then we will use that broker. You will need to pass in a different port if you don't want to use the existing broker.

const path = require('path');
const OPQ = require('observable-persistent-queue');

const q = new OPQ({
     port: 8888,
     priority: 5,  // 5 different priority levels, 1,2,3,4,5
     initialData: [],
     filePath: path.resolve(process.env.HOME, 'queue.txt')
});

// enqueue a message to be placed onto the queue
q.enq('some message to put on the queue');
 // dequeue 1 (or zero) items from the queue
q.deq();

// but wait, this is node.js, where is the callback? Here we go:
q.enq('some message').subscribe(function next(result){});
q.deq().subscribe(function next(result){}); 

// for the enq() and deq() methods, you do not need to call subscribe to initiate the action
// but you will need to call subscribe to see/use the results.

advanced calls

Note that unless a fairly fatal error happens, errors will be passed to the next callback like so:

q.deq().subscribe(function next(result){
    if(result.error){
        
    }
}); 

This is because using error and the error handler will break subscripton flows. We will only break that flow when it's appropriate, which it is mostly not.

// enqueue multiple items in one call, you can only enqueue stringified data, otherwise an error will be thrown
q.enq(['a','b','c','d'], {
    priority: 3
});

// batch items so that they are all processed together
q.enq(['a','b','c','d'], {
    priority: 3,
    batch: true
});

// the above is the same as:
q.enqBatch(['a','b','c','d'], {
    priority: 3
});

// dequeue 5 items from the queue, if only 3 exist, then 3 will be returned, if only 2 exist, then an error will be 
// immediately passed back

q.deq({
    min: 3,
    count: 5
});

// most likely, you will want to get the resulting data from dequeue calls; use subscribe
q.deq({
    min: 3,
    count: 5
})
.subscribe(function next(data){
    
});

// wait until at least 3 items are available, an error will *not* be raised if 3 items are not available immediately
q.deq({
    wait: true,
    min: 3,  // if min is not supplied, min will be 5
    count: 5
});

// dequeue items that match a pattern
q.deq({
    pattern: /abc/,
    min: 3,
    count: 5
});

// dequeue items that are older or younger than a date
q.deq({
    youngerThan: new Date(),
    olderThan: Date.parse(),//,
    min: 3,
    count: 5
});

cleaning up when you're done

TBD

observable-persistent-queue's People

Contributors

oresoftware avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

observable-persistent-queue's Issues

obs - creating method on Observable prototype

const Rx = require('rxjs');

const proto = Rx.Observable.prototype;


proto.whenCompleted = function () {

    const source = this;

    return Rx.Observable.create(sub => {

        const ret = [];

        return source.subscribe(
            function (v) {
                ret.push(v);
            },
            function (e) {
                sub.error(e);
            },
            function () {
                sub.error(ret);
            }
        )

    });

};


module.exports = Rx;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.