Comments (15)
If I remember correctly, the subscriber should connect, the publisher should bind. I'll try to find it in the zmq guide but it is pretty vague on that.
Strange that it "kinda works" the way @somdoron showed.
I assume the original zmq might be having the same issue.
from netmq.
try this:
[Test]
public void BindSubscriber()
{
using (NetMQContext contex = NetMQContext.Create())
{
var sub = contex.CreateSubscriberSocket();
sub.Bind("tcp://127.0.0.1:5002");
sub.Subscribe("");
Task.Factory.StartNew(() =>
{
using (var pub = contex.CreatePublisherSocket())
{
pub.Connect("tcp://127.0.0.1:5002");
for (int i = 0; i < 5; i++)
{
// let the subscrbier connect to the publisher before sending a message
pub.Send("Hello");
Thread.Sleep(100);
}
}
});
bool more;
string m = sub.ReceiveString(out more);
Assert.AreEqual("Hello", m);
Assert.False(more);
sub.Dispose();
}
}
from netmq.
This sort of works, but always looses the first message.
from netmq.
@reiroldan I think the first message is lost because publisher sends the first message right after it has connected, so subscriber has no time to subscribe. To fix that you can move Thread.Sleep(100)
before pub.Send("Hello");
from netmq.
And that very sleep is what I really hate about zmq :(
Lets say "Connecting Publisher" is a valid way to connect pub / sub sockets (and I can't find anything opposing that right now):
In this case, shouldn't subscribe(...)
block until it is really subscribed? Then messages should not be lost when a connected publisher sends after connect
from netmq.
Actually Sleep
should be added only in proof-of-concept code, in production you should either assume that subscriber always connects to a continuous stream of events somewhere in the middle (weather update model), or you should introduce additional layer to catch up all the events you have missed (e.g. using Getting an Out-of-Band Snapshot).
In base ZMQ protocol publisher never knows whether those 2 subscribers that just subscribed (even if we watch for them) are all the subscribers that want to track its updates.
And concerning blocking subscribe
... subscribers are usually running on different threads or (more likely) different processes or servers. So even if subscriber blocks until is has actually subscribed (that is, blocking until publisher is actually up, running, and "confirmed" the reception of subscription, which goes against the asynchronous concept), most likely it has already missed all the events published during the time (maybe even months) that publisher was working without knowing about new subscriber.
from netmq.
@almazik I know and understand everything you just wrote but my point was that:
- if it is legal to bind sub-sockets
- sending publishers message should not be lost: the messages were sent after the call to connect and that call to connect happened after the subscriber socket had been bound.
If it worked that way, we wouldn't need sleeps in the examples, tests etc.
I know the patterns around those sleeps (and you mentioned some of them) but it is really a pity that zmq beginners will fall into this "subscriber won't receive all messages"-pit, read sample code with loads of sleeps in it only to realize that they are "doing it all wrong" and should be using the proper patterns.
But maybe this issue#94 isn't the right place to discuss this topic ;) Sorry if I distracted from the issue at hand...
from netmq.
@tobi-tobsen Yes it is, all socket types allow for bind/connect semantics. Its not a problem of who binds or who connects, the current implementation is not correct. What I've implemented is just a copy of something I have working in php/c++/python/clrzmq. I've done the same thing in the 4 languages to run a benchmark. There is only an issue with the netmq implementation.
@almazik Negative on that one, its not a timing issue. You can sleep it for a day, and still loose the first message.
I'll start doing some serious digging to see if I can catch whats wrong.
from netmq.
@reiroldan I just tested the code that @somdoron submitted, and none of the messages were lost if Thread.Sleep
stands before pub.Send
... To make sure we're on the same page here is the complete test that passes successfully on my environment (basically the original test with extra using
and Thread.Sleep
before pub.Send
):
[Test]
public void BindSubscriber()
{
using (NetMQContext context = NetMQContext.Create())
using (var sub = context.CreateSubscriberSocket())
{
sub.Bind("tcp://127.0.0.1:5002");
sub.Subscribe("");
Task.Factory.StartNew(() =>
{
// ReSharper disable once AccessToDisposedClosure
using (var pub = context.CreatePublisherSocket())
{
pub.Connect("tcp://127.0.0.1:5002");
for (int i = 0; i < 5; i++)
{
Thread.Sleep(100); // let the subscriber connect to the publisher before sending a message
pub.Send("Hello " + i);
}
}
});
for (int i = 0; i < 5; i++)
{
bool more;
string m = sub.ReceiveString(out more);
Assert.AreEqual("Hello " + i, m);
Assert.False(more);
}
}
}
from netmq.
@somdoron @almazik Yes this does work, in fact you can change it to
pub.Connect("tcp://127.0.0.1:5002");
Thread.Sleep(100); // let the subscriber connect to the publisher before sending a message
for (int i = 0; i < 5; i++) {
pub.Send("Hello " + i);
}
and it will work, no need to sleep on each iteration. I think the issue I ran into was due to having the publisher and subscriber run on the same thread. I'll keep looking into what is going on, as I still have the issue on 2 separate processes.
from netmq.
Any news on this bug? I have to Connect a Publisher and adding such Thread.Sleep is quite annoying.
from netmq.
@Shamar There isn't a work around for this situation. The same happens with the c zmqlib. You have to give it a little time for it to connect.
from netmq.
can I close this?
from netmq.
Yes, if that's "by 0mq specification", we can't fix it here. :-(
from netmq.
In this code the Subscriber socket binds , and publisher socket connects .
Both the codes give zmq error when run.This is CPP code. Why?
publisher.cpp
#include
#include <zmq.hpp>
#include <zhelpers.hpp>
using namespace std;
int main () {
zmq::context_t context (1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("tcp://*:5556");
while (1) {
zmq::message_t request (12);
memcpy (request.data (), "Pub-1 Data", 12);
sleep(1);
publisher.send (request);
}
return 0;
}
subcriber.cpp
#include
#include <zmq.hpp>
int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.bind("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);//subscribe to all messages
// Process 10 updates
int update_nbr;
for (update_nbr = 0; update_nbr < 10 ; update_nbr++) {
zmq::message_t update;
subscriber.recv (&update);
std::string updt = std::string(static_cast<char*>(update.data()), update.size());
std::cout << "Received Update/Messages/TaskList " << update_nbr <<" : "<< updt << std::endl;
}
return 0;
}
from netmq.
Related Issues (20)
- RobiniaDocs API Explorer
- ZMQ_ROUTER_NOTIFY this?
- Critical CVE-2021-24112
- Feature. NetMQSocket TryReceive with CancellationToken
- IPC not working between .net7 and .net461 HOT 1
- NetMQ.NetMQException: Exception of type 'NetMQ.NetMQException' was thrown at NetMQ.Core.Ctx.CreateSocket(ZmqSocketType type)
- netmq is missing NuGet package README file
- Setting RouterMandatory on RouterSocket causes InvalidException
- "Closed" callback is called after the "Connected" callback when changing socket address.
- SubscriberSocket System.InvalidOperationException: Operation already in progress
- Poller thorws exception when signal fails on Recv() when resource temporarily unavailable
- Fix Division by Zero Error in FairQueueing Class When m_active is Zero
- Leverage framework dependencies where possible HOT 4
- Mailbox.TryRead still throwing in debug mode
- Process crash due to an unhandled exception in Mechanism.Encode
- StreamSocket does not send empty connect/disconnect message when peers connect/disconnect HOT 2
- Update of license from LGPL to MPL
- whether NetMQ supports queue data persistence ?
- Simplest example doesn't work
- MQ Client fails at sending
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from netmq.