Giter VIP home page Giter VIP logo

nsqphp's Introduction

NSQPHP

PHP client for NSQ.

Notice

I cannot maintain this anymore - apologies for ignored PRs. If someone wants to take over I am happy to transfer ownership. Send me a message via @davegardnerisme.

NSQ basics

You can read all about NSQ via the readme on Github, or via the Bitly blog post describing it. More details on nsqd, nsqlookupd are provided within each folder within the project.

Here's some thing I have learned:

  • Clustering is provided only in the sense that nsqlookupd will discover which machines hosts messages for a particular topic. To consume from a cluster, you simply ask an instance of nslookupd where to find messages and then connect to every nsqd it tells you to (this is one of the things that makes nsq good).
  • HA (for pub) is easy due to the fact that each nsqd instance is isolated; you can simply connect to any and send publish your message (I have built this into the client).
  • Resilience is provided by simply writing to more than one nsqd and then de-duplicating on subscribe (I have built this into the client).
  • nsq is not designed as a work queue (for long running tasks) out of the box. The default setting of msg-timeout is 60,000ms (60 seconds). This is the time before nsq will automatically consider a message to have failed and hence requeue it. Our "work" should take much less time than this. Additionally, PHP is a blocking language, and although we are using a non-blocking IO event loop, any work you do to process a message will block the client from being able to reply to any heartbeats etc.

Installation

nsqphp is available to add to your project via composer. Simply add the following to your composer.json.

{
    ...
    "require": {
        ...
        "davegardnerisme/nsqphp": "dev-master"
    }
    ...
}

You can also simply clone it into your project:

git clone git://github.com/davegardnerisme/nsqphp.git
cd nsqphp
git submodule update --init --recursive

To use nsqphp in your projects, just include the bootstrap.php file, or setup autoloading via composer. The design lends itself to a dependency injection container (all dependencies are constructor injected), although you can just setup the dependencies manually when you use it.

Testing it out

Follow the getting started guide to install nsq on localhost.

Publish some events:

php cruft/test-pub.php 10

Fire up a subscriber in one shell:

php cruft/test-sub.php mychannel > /tmp/processed-messages

Then tail the redirected STDOUT in another shell, so you can see the messages received and processed:

tail -f /tmp/processed-messages

Note

In these tests I'm publishing first since I haven't yet got the client to automatically rediscover which nodes have messages for a given topic; hence if you sub first, there are no nodes found with messages for the topic.

Other tests

Multiple channels

The blog post describes a channel:

| Each channel receives a copy of all the messages for a topic. In | practice, a channel maps to a downstream service consuming a topic.

So each message in a topic will be delivered to each channel.

Fire up two subscribers with different channels (one in each shell):

php cruft/test-sub.php mychannel
php cruft/test-sub.php otherchannel

Publish some messages:

php cruft/test-pub.php 10

Each message will be delivered to each channel. It's also worth noting that the API allows you to subscribe to multiple topics/channels within the same process.

Multiple nsqds

Setup a bunch of servers running nsqd and nsqlookupd with hostnames nsq1, nsq2 ... Now publish a bunch of messages to both:

php cruft/test-pub.php 10 nsq1
php cruft/test-pub.php 10 nsq2

Now subscribe:

php cruft/test-sub.php mychannel > /tmp/processed-messages

You will receive 20 messages.

Resilient delivery

Same test as before, but this time we deliver the same message to two nsqd instances and then de-duplicate on subscribe.

php cruft/test-pub.php 10 nsq1,nsq2
php cruft/test-sub.php mychannel > /tmp/processed-messages

This time you should receive only 10 messages.

To do

  • Requeue failed messages using a back-off strategy (currently only simple fixed-delay requeue strategy)
  • Continuously re-evaluate which nodes contain messages for a given topic (that is subscribed to) and establish new connections for those clients (via event loop timer)

The PHP client interface

Messages

Messages are encapsulated by the nsqphp\Message\Message class and are referred to by interface within the code (so you could implement your own).

Interface:

public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();

Publishing

The client supports publishing to N nsqd servers, which must be specified explicitly by hostname. Unlike with subscription, there is no facility to lookup the hostnames via nslookupd (and we probably wouldn't want to anyway for speed).

Minimal approach:

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo('localhost')
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

It's up to you to decide if/how to encode your payload (eg: JSON).

HA publishing:

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), nsqphp\nsqphp::PUB_QUORUM)
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

We will require a quorum of the publishTo nsqd daemons to respond to consider this operation a success (currently that happens in series). This is assuming I have 3 nsqds running on three hosts which are contactable via nsq1 etc.

This technique is going to log messages twice, which will require de-duplication on subscribe.

Subscribing

The client supports subscribing from N nsqd servers, each of which will be auto-discovered from one or more nslookupd servers. The way this works is that nslookupd is able to provide a list of auto-discovered nodes hosting messages for a given topic. This feature decouples our clients from having to know where to find messages.

So when subscribing, the first thing we need to do is initialise our lookup service object:

    $lookup = new nsqphp\Lookup\Nsqlookupd;

Or alternatively:

    $lookup = new nsqphp\Lookup\Nsqlookupd('nsq1,nsq2');

We can then use this to subscribe:

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', function($msg) {
        echo $msg->getId() . "\n";
        })->run();

Warning: if our callback were to throw any Exceptions, the messages would not be retried using these settings - read on to find out more.

Or a bit more in the style of PHP (?):

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        echo $msg->getId() . "\n";
    }

We can also subscribe to more than one channel/stream:

    $lookup = new nsqphp\Lookup\Nsqlookup;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->subscribe('othertopic', 'somechannel', 'msgCallback')
        ->run();

Retrying failed messages

The PHP client will catch any thrown Exceptions that happen within the callback and then either (a) retry, or (b) discard the messages. Usually you won't want to discard the messages.

To fix this, we need a requeue strategy - this is in the form of any object that implements nsqphp\RequeueStrategy\RequeueStrategyInterface:

    public function shouldRequeue(MessageInterface $msg);

The client currently ships with one; a fixed delay strategy:

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, NULL, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

De-duplication on subscribe

Recall that to achieve HA we simply duplicate on publish into two different nsqd servers. To perform de-duplication we simply need to supply an object that implements nsqphp\Dedupe\DedupeInterface.

public function containsAndAdd($topic, $channel, MessageInterface $msg);

The PHP client ships with two mechanisms for de-duplicating messages on subscribe. Both are based around the opposite of a bloom filter. One maintains a hash map as a PHP array (and hence bound to a single process); the other calls out to Memcached and hence can share the data structure between many processes.

We can use this thus:

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

You can read more about de-duplication on my blog, however it's worth keeping the following in mind:

  • With Memcached de-duplication we can then happily launch N processes to subscribe to the same topic and channel, and only process the messages once.
  • De-duplication is not guaranteed (in fact far from it) - the implementations shipped are based on a lossy hash map, and hence are probabilistic in how they will perform. For events fed down at a similar time, they will usually perform acceptably (and they can be tuned to trade off memory usage for de-duplication abilities)
  • nsq is designed around the idea of idempotent subscribers - eg: your subscriber must be able to cope with processing a duplicated message (writing into Cassandra is an example of a system that copes well with executing something twice).

Logging

The final optional dependency is a logger, in the form of some object that implements nsqphp\Logger\LoggerInterface (there is no standard logger interface shipped with PHP to the best of my knowledge):

    public function error($msg);
    public function warn($msg);
    public function info($msg);
    public function debug($msg);

The PHP client ships with a logger that dumps all logging information to STDERR. Putting all of this together we'd have something similar to the test-sub.php file:

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $logger = new nsqphp\Logger\Stderr;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, logger);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

Design log

  • main client based on event loop (powered by React PHP) to allow us to handle multiple connections to multiple nsqd instances

nsqphp's People

Contributors

angry-elf avatar fwang2002 avatar noisebynorthwest avatar stephensearles avatar wojons avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nsqphp's Issues

support for mutiple bytes character

Hi dave

I have a problem with reading multiple bytes character (utf8 chinese characters) as the Reader skip the negatives (master, nsqphp/src/nsqphp/Wire/Reader.php, line 195)

This lib is awesome and must be considered with all aspects. But in my opinion, negatives don't mess up with anything. why did you skip them while reading, and for what purpose?

Hope that I'm not bothering you :p
Best wishes!

not close socket after publish?

in source code , use fsockopen() in function getSocket() in Connection.php, when read/write message.
function publish() in file nsqphp.php, loop every conn do read/write.after success, return directly. have not any do for close socket.

when connect 2 server in one request, fd will more than 1024, lead to error.

Automatically discover new hosts and attempt reconnection

nsqphp subscription works by discovering N nodes that host messages for a given channel and then establishing a connection to each of these hosts. There are two issues with this:

  1. In the event of a connection error, an Exception is thrown and this cannot be recovered from
  2. If a new nsqd suddenly starts having messages for a given topic, a running subscriber will never find out about these new nodes automatically

We can fix these together by adjusting how subscription works. Specifically we can:

  • attach a timer to the event loop which will, every X minutes, go and check the lookup again to gather a list of hosts that contain messages of a given topic, adding these hosts to some internal list
  • adjust socket failure cases such that a failure simply causes a connection to be closed, destroyed, and removed from the event loop as a read stream (with logging)
  • attach a timer to the event loop which will check if there are any hosts that the client is not connected to and then establish a connection to these

With all three of these we get:

  • automatic reconnection to an nsqd if it goes down and then comes back
  • ability to continue subscribing from X-1 nodes in the event that a single node experiences failure (currently an Exception is thrown that will kill the whole program)
  • automatic discovery of any new nodes that host messages for a given topic

packString in Write.php is time consuming

I don't know why using packString to publish strings, and why not use $str[$i] in the loop instead of substr.

$str = "abcdefg";
$len = strlen($str);
$a = pack("a*", $str);
$b = "";
for($i = 0; $i < $len; $i++){
    $b .= pack("c", ord(substr($str, $i, 1))); 
}
if ($a === $b)
    echo 'ok';

Actually, $a and $b is completely the same.

PHP Fatal error: Uncaught Error: Class 'SplClassLoader'

PHP Fatal error: Uncaught Error: Class 'SplClassLoader' not found in //vendor/davegardnerisme/nsqphp/bootstrap.php:7
davegardnerisme/nsqphp/dependencies/spl-class-loader
davegardnerisme/nsqphp/dependencies/react-php
folders are empty when intaling with composer.

Ability to stop loop

Since it's impossible to call nsq->loop->stop() (loop is private variable), I added this code to allow stop nsq loop:

366a367,375

/**
 * Stop subscribe event loop
 */
public function stop()
{
    $this->loop->stop();
}

i met two bugs with using nsqphp

  1. Warning: unpack(): Type c: not enough input, need 1, have 0 in nsqphp/src/nsqphp/Wire/Reader.php on line 192
  2. PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 1479356726 bytes) in nsqphp/src/nsqphp/Connection/Connection.php on line 169

So, how can i solve it?

Frames out of order?

We were getting a notice in our logs:

PHP Notice:  Undefined index: error in vendor/davegardnerisme/nsqphp/src/nsqphp/nsqphp.php on line 288 

After inspecting, we saw that the frame read was:

[26-Nov-2014 13:37:35 America/Los_Angeles] Array ( 
  [type] => 0 
  [size] => 15 
  [response] => _heartbeat_ 
)

If it helps, we're attempting to just use the library for publishing with long-lived local code publishing, with a connection to a remote lookupd service.

Are we doing something wrong? Thanks

Version 5.3.3

PHP Catchable fatal error: Argument 1 passed to nsqphp\nsqphp::tryFunc() must be an instance of nsqphp\Callable, instance of Closure given, called in /home/nsqphp/src/nsqphp/nsqphp.php on line 297 and defined in /home/nsqphp/src/nsqphp/nsqphp.php on line 315

Is the library still maintained?

Looking at latest GitHub activity, I feel quite certain that the library has been abandoned. September 2014 was the last time @davegardnerisme answered in an issue report, and zero pull requests have been either merged or at least reviewed.

Using the PSR3 Logger interface

Hi Dave,

You wrote "there is no standard logger interface shipped with PHP to the best of my knowledge". You could reuse the PSR-3 "standard"'s LoggerInterface (https://github.com/php-fig/log) with the benefit we can inject a more robust logging frameworks such as Monolog.

SUB does not react well to failure

Test case A (works acceptably):

  • start up cruft/test-sub.php, pointing to nsqd running on localhost
  • kill nsqd

The sub script exits with an uncaught Exception (eg: we are informed of the broken SUB connection).

Test case B (works unacceptably):

  • start up cruft/test-sub.php, pointing to nsqd running on localhost
  • run cruft/test-pub.php to push some messages through the system
  • kill nsqd

The sub script hangs in an IO loop, consuming CPU.

Notice: Undefined index: address in socloz/nsqphp/src/nsqphp/Lookup/Nsqlookupd.php line 102

After running app/console socloz:nsq:topic:consume viglink I see an error:

 Notice: Undefined index: address in /var/www/vhosts/trim/vendor/socloz/nsqp
  hp/src/nsqphp/Lookup/Nsqlookupd.php line 102

this is the dump of $producers

array(3) {
  ["status_code"]=>
  int(200)
  ["status_txt"]=>
  string(2) "OK"
  ["data"]=>
  array(2) {
    ["channels"]=>
    array(0) {
    }
    ["producers"]=>
    array(1) {
      [0]=>
      array(6) {
        ["remote_address"]=>
        string(15) "127.0.0.1:39969"
        ["hostname"]=>
        string(21) "localhost.localdomain"
        ["broadcast_address"]=>
        string(21) "localhost.localdomain"
        ["tcp_port"]=>
        int(4150)
        ["http_port"]=>
        int(4151)
        ["version"]=>
        string(5) "0.3.5"
      }
    }
  }
}

Code stop execution at publish line.

I have this simple code:

echo "publising";
$nsq = new nsqphp;
$nsq->publishTo('127.0.0.1:4151')->publish('test', new Message('some message payload'));
echo "finished";

I expect to see the string "publising" and "finished" but only the frist one show, I guess that some issue make my code stop at the publish line.
I can publish to local nspd by using curl:

$ curl -d 'hello world 7' 'http://127.0.0.1:4151/put?topic=test'
OK

Nothing come out from error log.
Can some one help me to 'debug' this issue?
Thanks you so much!

nsqd v0.2.29-alpha responds with hostname not address in Lookup/Nsqlookupd

Got 'address' index not found error when trying to use Nsqlookupd->lookupHosts();

I don't know if they've changed the API in recent versions or if my version is behind, but the JSON returned by my version is this:

{"status_code":200,"status_txt":"OK","data":{"channels":["test"],"producers":[{"remote_address":"127.0.0.1:52939","hostname":"vilya","broadcast_address":"vilya","tcp_port":4150,"http_port":4151,"version":"0.2.29-alpha"},{"remote_address":"127.0.0.1:52950","hostname":"vilya","broadcast_address":"vilya","tcp_port":4050,"http_port":4051,"version":"0.2.29-alpha"}]}}

If it used to return address or v0.2.30 returns address you might want to add a check for either 'hostname' key or 'address' key. If you'd like I'll happily look into this further and submit a pull request to fix this small issue.

a problem with 'Nsqlookupd'

file: Nsqlookupd.php
line:101

$producers = isset($r['data'], $r['data']['producers']) ? $r['data']['producers'] : array()

NSQ API: /lookup?topic=$topic

response : {"channels":[...],"producers":[...]}

maybe change to :

$producers = isset($r['producers']) ? $r['producers'] : array()

Delayed message support

I have recently impemented it on an upper layer based on your library but I think it should be a kernel feature.

But implement it at nsqphp level means breaking compatibility as message envelope must be altered.

Are you willing to introduce this feature ?

(string)$connection type casting will cause Exception.

hi,

Check the code and found some exception when we used.

/src/nsqphp/nsqphp.php

if ($this->logger) { $this->logger->warn(sprintf('Error processing [%s] "%s": %s', (string)$connection, $msg->getId(), $e->getMessage())); }

The $connection is an array.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.