Giter VIP home page Giter VIP logo

Comments (15)

tobi-tobsen avatar tobi-tobsen commented on August 19, 2024 4

If I remember correctly, the subscriber should connect, the publisher should bind. I'll try to find it in the zmq guide but it is pretty vague on that.
Strange that it "kinda works" the way @somdoron showed.
I assume the original zmq might be having the same issue.

from netmq.

somdoron avatar somdoron commented on August 19, 2024

try this:

    [Test]
    public void BindSubscriber()
    {
        using (NetMQContext contex = NetMQContext.Create())
        {
            var sub = contex.CreateSubscriberSocket();              
            sub.Bind("tcp://127.0.0.1:5002");
            sub.Subscribe("");

            Task.Factory.StartNew(() =>
                                    {

                                        using (var pub = contex.CreatePublisherSocket())
                                        {
                                            pub.Connect("tcp://127.0.0.1:5002");

                                            for (int i = 0; i < 5; i++)
                                            {


                                                                        // let the subscrbier connect to the publisher before sending a message
                                                                        pub.Send("Hello");

                                                                        Thread.Sleep(100);
                                            }                                               
                                        }
                                    });

            bool more;

            string m = sub.ReceiveString(out more);

            Assert.AreEqual("Hello", m);
            Assert.False(more);


            sub.Dispose();
        }
    }

from netmq.

reiroldan avatar reiroldan commented on August 19, 2024

This sort of works, but always looses the first message.

from netmq.

almazik avatar almazik commented on August 19, 2024

@reiroldan I think the first message is lost because publisher sends the first message right after it has connected, so subscriber has no time to subscribe. To fix that you can move Thread.Sleep(100) before pub.Send("Hello");

from netmq.

tobi-tobsen avatar tobi-tobsen commented on August 19, 2024

And that very sleep is what I really hate about zmq :(

Lets say "Connecting Publisher" is a valid way to connect pub / sub sockets (and I can't find anything opposing that right now):
In this case, shouldn't subscribe(...) block until it is really subscribed? Then messages should not be lost when a connected publisher sends after connect

from netmq.

almazik avatar almazik commented on August 19, 2024

Actually Sleep should be added only in proof-of-concept code, in production you should either assume that subscriber always connects to a continuous stream of events somewhere in the middle (weather update model), or you should introduce additional layer to catch up all the events you have missed (e.g. using Getting an Out-of-Band Snapshot).

In base ZMQ protocol publisher never knows whether those 2 subscribers that just subscribed (even if we watch for them) are all the subscribers that want to track its updates.

And concerning blocking subscribe... subscribers are usually running on different threads or (more likely) different processes or servers. So even if subscriber blocks until is has actually subscribed (that is, blocking until publisher is actually up, running, and "confirmed" the reception of subscription, which goes against the asynchronous concept), most likely it has already missed all the events published during the time (maybe even months) that publisher was working without knowing about new subscriber.

from netmq.

tobi-tobsen avatar tobi-tobsen commented on August 19, 2024

@almazik I know and understand everything you just wrote but my point was that:

  • if it is legal to bind sub-sockets
  • sending publishers message should not be lost: the messages were sent after the call to connect and that call to connect happened after the subscriber socket had been bound.

If it worked that way, we wouldn't need sleeps in the examples, tests etc.
I know the patterns around those sleeps (and you mentioned some of them) but it is really a pity that zmq beginners will fall into this "subscriber won't receive all messages"-pit, read sample code with loads of sleeps in it only to realize that they are "doing it all wrong" and should be using the proper patterns.

But maybe this issue#94 isn't the right place to discuss this topic ;) Sorry if I distracted from the issue at hand...

from netmq.

reiroldan avatar reiroldan commented on August 19, 2024

@tobi-tobsen Yes it is, all socket types allow for bind/connect semantics. Its not a problem of who binds or who connects, the current implementation is not correct. What I've implemented is just a copy of something I have working in php/c++/python/clrzmq. I've done the same thing in the 4 languages to run a benchmark. There is only an issue with the netmq implementation.

@almazik Negative on that one, its not a timing issue. You can sleep it for a day, and still loose the first message.

I'll start doing some serious digging to see if I can catch whats wrong.

from netmq.

almazik avatar almazik commented on August 19, 2024

@reiroldan I just tested the code that @somdoron submitted, and none of the messages were lost if Thread.Sleep stands before pub.Send... To make sure we're on the same page here is the complete test that passes successfully on my environment (basically the original test with extra using and Thread.Sleep before pub.Send):

[Test]
public void BindSubscriber()
{
    using (NetMQContext context = NetMQContext.Create())
    using (var sub = context.CreateSubscriberSocket())
    {
        sub.Bind("tcp://127.0.0.1:5002");
        sub.Subscribe("");

        Task.Factory.StartNew(() =>
        {
            // ReSharper disable once AccessToDisposedClosure
            using (var pub = context.CreatePublisherSocket())
            {
                pub.Connect("tcp://127.0.0.1:5002");

                for (int i = 0; i < 5; i++)
                {
                    Thread.Sleep(100); // let the subscriber connect to the publisher before sending a message
                    pub.Send("Hello " + i);
                }
            }
        });

        for (int i = 0; i < 5; i++)
        {
            bool more;
            string m = sub.ReceiveString(out more);
            Assert.AreEqual("Hello " + i, m);
            Assert.False(more);
        }
    }
}

from netmq.

reiroldan avatar reiroldan commented on August 19, 2024

@somdoron @almazik Yes this does work, in fact you can change it to

pub.Connect("tcp://127.0.0.1:5002");
Thread.Sleep(100); // let the subscriber connect to the publisher before sending a message
for (int i = 0; i < 5; i++) {
pub.Send("Hello " + i);
}

and it will work, no need to sleep on each iteration. I think the issue I ran into was due to having the publisher and subscriber run on the same thread. I'll keep looking into what is going on, as I still have the issue on 2 separate processes.

from netmq.

Shamar avatar Shamar commented on August 19, 2024

Any news on this bug? I have to Connect a Publisher and adding such Thread.Sleep is quite annoying.

from netmq.

reiroldan avatar reiroldan commented on August 19, 2024

@Shamar There isn't a work around for this situation. The same happens with the c zmqlib. You have to give it a little time for it to connect.

from netmq.

somdoron avatar somdoron commented on August 19, 2024

can I close this?

from netmq.

Shamar avatar Shamar commented on August 19, 2024

Yes, if that's "by 0mq specification", we can't fix it here. :-(

from netmq.

bajaj689 avatar bajaj689 commented on August 19, 2024

In this code the Subscriber socket binds , and publisher socket connects .
Both the codes give zmq error when run.This is CPP code. Why?
publisher.cpp

#include
#include <zmq.hpp>
#include <zhelpers.hpp>

using namespace std;
int main () {

zmq::context_t context (1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("tcp://*:5556");

while (1) {

    zmq::message_t request (12);
    memcpy (request.data (), "Pub-1 Data", 12);
	sleep(1);
    publisher.send (request);

}

return 0;

}


subcriber.cpp

#include
#include <zmq.hpp>

int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.bind("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);//subscribe to all messages

//  Process 10 updates
int update_nbr;
for (update_nbr = 0; update_nbr < 10 ; update_nbr++) {
	
    zmq::message_t update;
    subscriber.recv (&update);
    std::string updt = std::string(static_cast<char*>(update.data()), update.size());
    std::cout << "Received Update/Messages/TaskList " << update_nbr <<" : "<< updt << std::endl;

}


return 0;

}

from netmq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.