Giter VIP home page Giter VIP logo

chumak's Introduction

chumak

Chumaki

What is chumak?

chumak is a library written in Erlang. It implements the ZeroMQ Message Transport Protocol (ZMTP). chumak supports ZMTP version 3.1.

Goal

The goal of chumak application is to provide up-to-date native Erlang implementation of ZMTP.

Features

  1. Resource Property (NEW in 3.1!)
  2. Request-Reply pattern
  3. Publish-Subscribe pattern
  4. Pipeline Pattern
  5. Exclusive Pair Pattern
  6. Version Negotiation
  7. NULL Security Mechanism
  8. CURVE Security Mechanism
  9. Error Handling
  10. Framing
  11. Socket-Type Property & Identity Property
  12. Backwards Interoperability with ZMTP 3.0

Install

You can install chumak from hex.pm by including the following in your rebar.config:

{deps,[
	{chumak, "X.Y.Z"}
]}.

where X.Y.Z is one of the release versions.

For more info on rebar3 dependencies see the rebar3 docs.

Usage

See examples. Otherwise use just like a regular Erlang/OTP application.

If you would like to use python tests to try language interop, you need to have pyzmq installed.

Build

$ rebar3 compile

By default, this will try to build a version of the application that does not include support for the CURVE security model.

The environment variable CHUMAK_CURVE_LIB can be used to specify a NIF that implements the encryption functions that are required to support the CURVE security model.

The following values for CHUMAK_CURVE_LIB are supported:

  • nacerl - this is the minimal variant using the tweetnacl C library. By default it is fetched and built from https://github.com/willemdj/NaCerl.

         Compilation of nacerl requires gcc and make. Since these tools
         may not be available on windows systems, a check on the
         availability of these tools will be done. If they are not
         available the dependency will not be fetched and there will be
         no support for the CURVE security model.
    
  • nacl - this is similar to nacerl, but it depends on libsodium. The repository for this is https://github.com/tonyg/erlang-nacl. The the build process for Chumak will not automatically fetch and build it, but if CHUMAK_CURVE_LIB is set to "nacl", it will be assumed that this library is available and it will be used.

  • enacl - this also depends on libsodium, but it also requires an Erlang VM that supports dirty schedulers. The repository is https://github.com/jlouis/enacl. The build process for Chumak will not automatically fetch and build it, but if CHUMAK_CURVE_LIB is set to "enacl", it will be assumed that this library is available and it will be used.

Test

$ rebar3 eunit -c

The -c will allow you to see the test coverage by running the command below.

Coverage

$ rebar3 cover

Generate Docs

$ rebar3 edoc

Architecture

Architecture describes the system structure.

Help Wanted

Would you like to help with the project? Pick any of the issues tagged help wanted and contribute!

Contributing

See Contributing.

FAQ

  1. Why another Erlang implementation?

    Because the existing Erlang implementations and bindings are out of date.

  2. Can I use chumak for free?

    Yes, as long as you abide by the terms of the MPLv2 license. In short, you can include this code as a part of a larger work, even commercial. It is only when you modify chumak source code itself that you have to make that change available. Please read the license, as this description is not complete by any means.

  3. Do I have to sign over my copyright when contributing?

    No. Everyone owns the piece of code they contribute. Please see Contributing for details.

License

This project is licensed under Mozilla Public License Version 2.0. See license for complete license terms.

Etymology

From Wikipedia:

Chumak (Ukrainian: чумак) is a historic occupation on the territory of the modern Ukraine as merchants or traders, primarily known for the trade in salt.

How to publish new Hex.pm version

This info is here for maintainers - since I keep forgetting how to do this.

  1. Adjust the version of the package in src/chumak.app.src
  2. Login to hex.pm: rebar3 hex user auth
  3. Put in your hex.pm username and your password (ignore the warning) - enter it 2 more times! (weird)
  4. Publish: rebar3 hex publish

chumak's People

Contributors

c-rack avatar davidalphafox avatar dcheckoway avatar drozzy avatar eshurakov avatar filmor avatar islandusurper avatar kevinwilson541 avatar kianmeng avatar osmuogar avatar prots avatar sappo avatar shishirpy avatar siiky avatar somdoron avatar srevenant avatar sztheory avatar thalesmg avatar visciang avatar willemdj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

chumak's Issues

exception exit

Hello.
I tried to run req client and req/rep server.
And I got an error:
req_server:main().
** exception exit: {noproc,
{gen_server,call,
[chumak_sup,
{start_child,
#{id => 'chumak_socket_my-req',restart => transient,
start => {chumak_socket,start_link,[req,"my-req"]}}},
infinity]}}
in function gen_server:call/3 (gen_server.erl, line 223)
in call from chumak_sup:start_socket/2 (chumak_sup.erl, line 30)
in call from req_server:main/0 (req_server.erl, line 9)

Can you help me please?

Fix dialyzer errors

Currently (July 27, 2021) there are these dialyzer errors. I'm sure some are not important, but I'm placing them here just as a reference.

src/chumak_command.erl
Line 295 Column 1: Function decode_hello_message/2 has no local return
Line 320 Column 1: Function decode_welcome_message/2 has no local return
Line 353 Column 1: Function decode_initiate_message/2 has no local return
Line 408 Column 1: Function decode_curve_ready_message/2 has no local return
Line 430 Column 1: Function decode_curve_message/2 has no local return

src/chumak_curve.erl
Line 141 Column 1: Function receive_command/1 will never be called
Line 191 Column 1: Function send_hello_step/2 will never be called
Line 220 Column 1: Function send_welcome_step/2 will never be called
Line 230 Column 1: Function send_initiate_step/3 will never be called
Line 240 Column 1: Function send_ready_step/3 will never be called
Line 251 Column 1: Function validate_client_curve_data/1 has no local return
Line 261 Column 1: Function validate_server_curve_data/1 has no local return

src/chumak_curve_if.erl
Line 40 Column 1: Function randombytes/1 has no local return
Line 44 Column 9: The variable _ can never match since previous clauses completely covered the type 'none'
Line 50 Column 1: Function box_keypair/0 has no local return
Line 54 Column 9: The pattern 'nacl' can never match the type 'none'
Line 57 Column 9: The variable _ can never match since previous clauses completely covered the type 'none'
Line 71 Column 1: Function box/4 has no local return
Line 75 Column 9: The pattern 'nacl' can never match the type 'none'
Line 78 Column 9: The variable _ can never match since previous clauses completely covered the type 'none'
Line 92 Column 1: Function box_open/4 has no local return
Line 96 Column 9: The pattern 'nacl' can never match the type 'none'
Line 99 Column 9: The variable _ can never match since previous clauses completely covered the type 'none'

src/chumak_dealer.erl
Line 100 Column 13: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}

src/chumak_peer.erl
Line 343 Column 9: The pattern {'ok', NewState} can never match the type {'error',{'error',{'invalid_command_before_ready',_}} | {'shutdown','invalid_resource' | {'server_error',_}},#state{step::'ready' | 'waiting_peer' | 'waiting_ready',host::'nil' | string(),port::'nil' | number(),conn_side::'client' | 'server',resource::string(),type::'dealer' | 'pair' | 'pub' | 'pull' | 'push' | 'rep' | 'req' | 'router' | 'sub' | 'xpub' | 'xsub',identity::string(),peer_identity::string(),peer_version::{integer(),integer()},socket::'nil' | port() | {'$inet',atom(),_},decoder::{'decoder','command_ready' | 'initial' | 'message_ready' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version',integer(),'nil' | binary(),'command_ready' | 'initial' | 'message_ready' | 'nil' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version','nil' | {_,_},'nil' | {_,_},map(),'curve' | 'nil' | 'null',boolean()},parent_pid::pid(),incoming_queue::'nil' | queue:queue(_),msg_buf::[any()],pub_compatible_layer::boolean(),multi_socket_type::boolean(),as_server::boolean(),mechanism::'curve' | 'null',security_data::#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()}}}
Line 352 Column 2: Invalid type specification for function chumak_peer:handshake/1. The success typing is (#state{step::'ready' | 'waiting_peer' | 'waiting_ready',host::'nil' | string(),port::'nil' | number(),conn_side::'client' | 'server',resource::string(),type::'dealer' | 'pair' | 'pub' | 'pull' | 'push' | 'rep' | 'req' | 'router' | 'sub' | 'xpub' | 'xsub',identity::string(),peer_identity::string(),peer_version::{integer(),integer()},socket::port() | {'$inet',atom(),_},decoder::{'decoder','ready',integer(),binary(),'command_ready' | 'initial' | 'message_ready' | 'nil' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version',{'some',integer()},{'some',integer()},#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()},'curve' | 'nil' | 'null',boolean()},parent_pid::pid(),incoming_queue::'nil' | queue:queue(_),msg_buf::[any()],pub_compatible_layer::boolean(),multi_socket_type::boolean(),as_server::boolean(),mechanism::'curve' | 'null',security_data::#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()}}) -> {'error',{'error',{'invalid_command_before_ready',_}} | {'shutdown','invalid_resource' | {'server_error',_}},#state{step::'ready' | 'waiting_peer' | 'waiting_ready',host::'nil' | string(),port::'nil' | number(),conn_side::'client' | 'server',resource::string(),type::'dealer' | 'pair' | 'pub' | 'pull' | 'push' | 'rep' | 'req' | 'router' | 'sub' | 'xpub' | 'xsub',identity::string(),peer_identity::string(),peer_version::{integer(),integer()},socket::'nil' | port() | {'$inet',atom(),_},decoder::{'decoder','command_ready' | 'initial' | 'message_ready' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version',integer(),'nil' | binary(),'command_ready' | 'initial' | 'message_ready' | 'nil' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version','nil' | {_,_},'nil' | {_,_},map(),'curve' | 'nil' | 'null',boolean()},parent_pid::pid(),incoming_queue::'nil' | queue:queue(_),msg_buf::[any()],pub_compatible_layer::boolean(),multi_socket_type::boolean(),as_server::boolean(),mechanism::'curve' | 'null',security_data::#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()}}}
Line 392 Column 9: The pattern {'ok', ReadyState} can never match the type {'error',{'error',{'invalid_command_before_ready',_}} | {'shutdown','invalid_resource' | {'server_error',_}},#state{step::'ready' | 'waiting_peer' | 'waiting_ready',host::'nil' | string(),port::'nil' | number(),conn_side::'client' | 'server',resource::string(),type::'dealer' | 'pair' | 'pub' | 'pull' | 'push' | 'rep' | 'req' | 'router' | 'sub' | 'xpub' | 'xsub',identity::string(),peer_identity::string(),peer_version::{integer(),integer()},socket::'nil' | port() | {'$inet',atom(),_},decoder::{'decoder','command_ready' | 'initial' | 'message_ready' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version',integer(),'nil' | binary(),'command_ready' | 'initial' | 'message_ready' | 'nil' | 'ready' | 'require_size' | 'waiting_as_server' | 'waiting_filler' | 'waiting_mechanism' | 'waiting_minor_version','nil' | {'some',integer()},'nil' | {'some',integer()},#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()},'curve' | 'nil' | 'null',boolean()},parent_pid::pid(),incoming_queue::'nil' | queue:queue(_),msg_buf::[any()],pub_compatible_layer::boolean(),multi_socket_type::boolean(),as_server::boolean(),mechanism::'curve' | 'null',security_data::#{'client_nonce'=>integer(), 'client_public_transient_key'=>binary(), 'client_secret_transient_key'=>binary(), 'cookie_public_key'=>binary(), 'cookie_secret_key'=>binary(), 'curve_publickey'=>binary(), 'curve_secretkey'=>binary(), 'curve_serverkey'=>binary(), 'mechanism'=>'curve', 'role'=>'client' | 'server', 'server_nonce'=>integer(), 'server_public_transient_key'=>binary(), 'server_secret_transient_key'=>binary()}}}
Line 445 Column 1: Function handle_ready_response2/2 has no local return
Line 460 Column 1: Function validate_peer_socket_type/2 has no local return
Line 466 Column 10: The call PatternModule:'valid_peer_type'(PeerSocketType::atom()) requires that PatternModule is of type atom() not {'error','invalid_socket_type'}
Line 587 Column 1: Function send_invalid_socket_type_error/3 will never be called

src/chumak_protocol.erl
Line 158 Column 1: Function build_hello_frame/1 has no local return
Line 224 Column 1: Function build_welcome_frame/1 has no local return
Line 319 Column 1: Function build_initiate_frame/2 has no local return
Line 395 Column 1: Function build_ready_frame/2 has no local return
Line 673 Column 2: Invalid type specification for function chumak_protocol:encode_more_message/3. The success typing is (binary(),_,_) -> {nonempty_binary(),_}
Line 685 Column 2: Invalid type specification for function chumak_protocol:encode_last_message/3. The success typing is (binary(),_,_) -> {nonempty_binary(),_}

src/chumak_pub.erl
Line 128 Column 9: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}

src/chumak_pull.erl
Line 120 Column 9: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}

src/chumak_rep.erl
Line 141 Column 9: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}

src/chumak_router.erl
Line 108 Column 9: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}
Line 125 Column 1: Function terminate_lbs/1 will never be called
Line 131 Column 1: Function terminate_lb/1 will never be called

src/chumak_socket.erl
Line 43 Column 9: The variable ModuleName can never match since previous clauses completely covered the type {'error','invalid_socket_type'}

src/chumak_sub.erl
Line 130 Column 9: The pattern {'error', Info} can never match the type 'empty' | {'out',[any()]}
===> Warnings written to _build/default/24.0.4.dialyzer_warnings
===> Warnings occurred running dialyzer: 45

Add RELEASES.md

Which documents changes made to each release. E.g.:

1.2.0

  • Added blah
  • Did blah
  • Made that good

1.1.1

  • Fixed bug of some sort

ZMTP 1.0/2.0 Backwards Interoperability

I'm supporting some applications that still use libzmq 2.2 and I'm wondering if you would be receptive to adding backwards interoperability with earlier versions of the protocol. After a few passes over the specifications and the chumak codebase, it appears this would entail (at the highest level):

  • updating build_greeting_frame to follow the procedure mentioned in the Backwards Interoperability section of the specification
  • expanding the various encode and decode methods used for framing

chumak_protocol.erl is already approaching 800 lines and I don't yet have a strong idea how much additional code this would require. Before I flew off the handle and tried to bash out the code, I thought it might be wise to check with the maintainers. 😃

From zeromq.org:

We recommend anyone using 3.2.x or 2.2.x to upgrade to 4.1.x.

So the official wisdom seems to be that I should upgrade my applications instead of shoehorning support into this library, but I'm curious what everyone here thinks. Is there any value in this?

Is it possible to connect a dealer - dealer?

I'm trying to make a simple application which has a erlang client and a node server. Both need to be dealers connected and I cant get them to work.

Client

  application:start(chumak),
  {ok, Socket } = chumak:socket(dealer),
  Res = chumak:connect(Socket, tcp, "127.0.0.1", 9000),
  io:format("Connected Socket ~p~n",[Res]),
  Message = [<<"Hello">>, <<"world">>],
  chumak:send_multipart(Socket, Message),
  io:format("Sended multipart message ~p~n",[Message]).

Server

var zmq = require('zeromq');
var socket = zmq.socket('dealer');

socket.on('message', (message) => {
  console.log('received: %s', JSON.stringify(message));
  console.log('message to string ' + message.toString("utf8"));
});

socket.bind('tcp://127.0.0.1:9000', (error) => {
  if (error) {
    console.error("Error binding the socket to the port: " + error);
  }
  console.log('Socket listening on port 9000');
});

If the type of the server socket changes, then something is received, else no message is received.

This is part of a bigger project and the socket types on the server can't be changed.. Am I doing something wrong?

Can't use anything but Nacerl on non-Windows

rebar.config.script only checks whether it can compile the curve libraries on Windows, which is reasonable. But if you aren't on Windows, you can't specify that you know what you're doing and have compiled and installed libsodium so that the other libraries work.

Publish new release

There have been a few nice bugfixes since the last release, would you mind publishing a new release to Hex?

Allow custom curve implementation module

Hi,

I was trying to use chumak and Kcl project (https://hex.pm/packages/kcl) from Elixir to have a completely NIF-free solution, but currently it's not easy to provide your own NaCl compatible module without changing the library.

What do you think of making the interface exposed by chumak_curve_if a behaviour and letting the application provide a custom implementation module at configuration time, or as an optional parameter when instantiating a socket?

Flushing "old" messages

This is a question, so you know I am newer to Elixir and this is the first time I have delved into pure Erlang code.

I was wondering if there is a simple way to flush the un-received messages, or get the most recent message, or get the length of queued messages?

A related question I have is if there are any issues with having connection to say a publisher and not checking the messages?

I am running a zmq publisher (in nodejs) and have the following subscriber:

-- Converted my Elixir to Erlang, hopefully no syntax errors --

pub_socket_url = '127.0.0.1'
pub_socket_port = 3400
{ok, pub_socket} = chumak:socket(:sub)
chumak:connect(pub_socket, :tcp, pub_socket_url, pub_socket_port)
chumak:subscribe(pub_socket, <<"node_publisher">>)
chumak:recv(pub_socket)

This works and I get the expected msg from the publisher, but if I do not continue to execute chumak:recv(pub_socket) while the publisher is continuing to publish msgs, then when I do execute chumak:recv(pub_socket) it is an old msg. I would like to be able to get the most recent msg and flush the rest. Is that possible? Or am I suppose to be continuously checking?

Thx

Attempting to get contribute, having some issues

Hi

I'm trying to look into multipart support as that is the only feature I believe is lacking for me to be able to try using this.

But I come from Elixir so while I grasp most concepts and can figure out most code along the way I'm not very familiar with the tooling. I have no problem running the python samples as I work with Python.

So a few questions that I hope someone has time for :)

Tests do not seem to run

I get this:
===> Verifying dependencies...
===> Compiling chumak
===> Performing EUnit tests...

Finished in 0.284 seconds
0 tests

Similarly the coverage thing reports no coverage.

How do I run a particular example?

I'm trying to run chumak_req.erl but failing :)

Hope you can help get me started.

Specify version of Erlang and rebar3 needed

I think we need to set a specific version of Erlang as requirement, as I had some people run into issues when running old version of Erlang. I had the same problem myself (forgot to document which version it was). It might have been the combination of bad rebar3 version and erlang combined.

I recall 19.1 or 19.2 was not very good for some reason, can't remember why. Might be good idea to set 19.3 or even 20 as min requirement.
Similarly, figure out which rebar3 version we need (we do supply our own rebar3 binary - so make sure it matches the required erlang version)

Delivery feedback when sending multipart message?

Hi,

I'm using the library in a simple implementation of the 7/MDP protocol (https://rfc.zeromq.org/spec/7/) and using ROUTER/DEALER sockets combination.
The issue I'm experiencing is when client is disconnected - is there a way to monitor for disconnect knowing the identity?
Relying on MDP heartbeats also doesn't help as :chumak.send_multipart/2 always returns :ok, also for disconnected parties.

Thank you in advance!

when another side exited, implementations should handle it

In server mode chumak_socket should not exit when a peer shutdown except pair mode.
I think this should be handled in implementation of each patterns, every implementation should decide exit or not.
So the implementations should add another argument in peer_disconected to handle the reason of exit.
But I found a lot of shutdown tuples in chumak_peer and the code of below in chumak_socket

 handle_info({'EXIT', PeerPid, {shutdown, _Reason}}, State) ->
    exit_peer(PeerPid, State),
   {stop, normal, State};

My question is that there is any intended to use shutdown as a special operation to stop chumak_socket ?

"Memory leak" in PubSub pattern

Here is the chumak’s process tree:

{your process goes here}
         |
         +--------> chumak_socket <----------+
                          |                  | handle_info({tcp, _Port, Frame}, State) ->
                          |                  |     decode_and_send_to_parent(Frame, State). 
                          |                  |
                          +----------> chumak_peer (holds the gen_tcp)

chumak_peer reads frames from gen_tcp in active mode (more specifically {active, once}, eventually enqueueing messages in recv_queue), expecting that {your process} calls chumak:recv fast enough to consume all of that.

If your consumer isn’t capable of handling all messages as they arrive at chumak_socket; they will accumulate indefinitely!

Silence :connection_error

Is it possible to silence :connection_error messages?

In my case I am using Chumak via Elixir. In the application I am using a push socket, but the socket to connect to (pull) may not have bound yet etc.

When the pull socket is not bound I get a done of [{:host, 'localhost'}, {:port, 3400}, :connection_error, {:error, :econnrefused}] in the console when developing or in logs.

Example code:

socket_port = 3400
{:ok, socket} = :chumak.socket(:push, 'pushid')
:chumak.connect(socket, :tcp, 'localhost', socket_port)

router-dealer packet loss on dealer reconnect

I don't know if this is chumak or zeromq spec limitation but this happens:

Assume we have router and dealers (the main reason for me for using this pattern is message passing - i have one router and multiple dealers and any of these can send message at any time). Dealers connects to topics on router - each dealer has it's own topic.

Now current implementation in chumak works the way, that if multiple dealers connect to same topic, message sent to one topic is routed in round-robin fashion probably as a form of load balancing.

If something happens on the dealer side that results in reconnect (dealer is restarted) now the router doesn't know that happened and there becomes two sockets available on that same topic (the old one, which is now invalid, and the new one). If you send message to such topic and the invalid socket is chosen, the OS replies that that socket is actually closed (as the dealer side replies that such socket doesn't exist) and that chumak removes the socket from the topic.

Problem is, that this way 1 message is always lost when reconnect of the dealer happens. I understand that this is how sockets behave, you can't know that the socket is actually closed unless you try to send something, but it would be nice that if that happens, the message is resent to another dealer probably (if available) instead of it being silently lost.

Subscriber topic filtering should match only the start not the entire string

According to the zeromq api guide: http://api.zeromq.org/master:zmq-setsockopt

ZMQ_SUBSCRIBE: Establish message filter

The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.
Option value type binary data
Option value unit N/A
Default value N/A
Applicable socket types ZMQ_SUB

The subscription should filter out messages that do not start with the filter.

The current implementation searches for the given filter in the entire message and not just the beginning. The error can be reproduced using the following code:

-module(wuserver).
-export([main/0]).

main() ->
    application:start(chumak),
    {ok, Socket} = chumak:socket(pub),

    case chumak:connect(Socket, tcp, "localhost", 5555) of
        {ok, _BindPid} ->
            io:format("Binding OK wiht Pid: ~p\n", [Socket]);
        {error, Reason} ->
            io:format("Connection Failed for this reason: ~p\n", [Reason]);
        X ->
            io:format("Unhandled reply for bind ~p \n", [X])
    end,
    loop(Socket).


loop(Socket) ->
    Zipcode = rand:uniform(100000),
    Temperature = rand:uniform(135),
    Relhumidity = rand:uniform(50) + 10,

    BinZipCode = erlang:integer_to_binary(Zipcode),
    BinTemperature = erlang:integer_to_binary(Temperature),
    BinRelhumidity = erlang:integer_to_binary(Relhumidity),
    io:format(" SENDING: ~p\n", [[BinZipCode, " ", BinTemperature, " ", BinRelhumidity]]),
    ok = chumak:send(Socket, [BinZipCode, " ", BinTemperature, " ", BinRelhumidity]),
    timer:sleep(100),
    loop(Socket).
-module(wuclient).
-export([main/0]).

main() ->
    application:start(chumak),
    {ok, Socket} = chumak:socket(sub),
    Topic = ["12"],
    chumak:subscribe(Socket, Topic),
    case chumak:bind(Socket, tcp, "localhost", 5555) of
        {ok, _BindPid} ->
            io:format("Binding OK with Pid: ~p\n", [Socket]);
        {error, Reason} ->
            io:foramt("Connection failed for this reason: ~p\n", [Reason]);
        X ->
            io:format("Unhandled reply for bind ~p \n", [X])
    end,
    loop(Socket).

loop(Socket) ->
    {ok, Data1} = chumak:recv(Socket),
    io:format("Received ~p\n", [Data1]),
    loop(Socket).

Once, the code is compiled do the following:

  1. Open two erlang shells.
  2. In first shell start the client with wuclient:main().
  3. In the second shell start the server with wuserver:main().
  4. The result in the client will be something like this:
Binding OK with Pid: <0.191.0>
Received <<"78811 12 15">>
Received <<"27336 120 47">>
Received <<"45253 120 42">>
Received <<"76712 80 28">>
Received <<"7334 135 12">>
Received <<"92121 105 44">>
Received <<"49187 127 48">>
Received <<"58301 135 12">>
  1. We expect only the messages starting with 12 should be shown. But the messages show have 12 somewhere in the text.

Multipart send/recv support for `REQ`

Hello!

I'm working on a project in Erlang and Java where it would be very useful to have multipart message send/recv support for REQ sockets. I saw send_multipart/2 in the docs and was excited, but once I tried it I got the not_implemented_yet error. :'(

I checked the code out a bit, and apparently some other types of sockets already support it, and the code for the ROUTER and DEALER isn't even that different. So I'm wondering why it hasn't been implemented for all socket types: there was no time/interest/&c, or there were problems?

If the issue was time/interest, and the send_multipart code for REQ is basically the same for ROUTER/DEALER, I'd like to try my hand at it, so I can still use it for this project.

typo in src/chumak_router.erl?

Hi there,

I think that line 97 in src/chumak_router.erl:

    NewLBs = chumak_lb:delete(LBs, PeerPid),

is supposed to be

    NewLBs = chumak_lbs:delete(LBs, PeerPid),

With the existing version, a client dropping its connection to the router causes a :lists.delete exception, but with the modified version, it works just fine.

Cheers,

Neil

Chumak simply doesn't build

# git clone https://github.com/zeromq/chumak
Cloning into 'chumak'...
remote: Counting objects: 498, done.
remote: Total 498 (delta 0), reused 0 (delta 0), pack-reused 498
Receiving objects: 100% (498/498), 2.32 MiB | 0 bytes/s, done.
Resolving deltas: 100% (327/327), done.
# cd chumak/
# ./rebar3 compile
===> Verifying dependencies...
===> Compiling chumak
===> Compiling src/chumak_sup.erl failed
src/chumak_sup.erl:25: syntax error before: '{'
src/chumak_sup.erl:30: syntax error before: '{'

src/chumak_sup.erl:16: function init/1 undefined
src/chumak_sup.erl:17: function start_socket/2 undefined
src/chumak_sup.erl:27: spec for undefined function chumak_sup:start_socket/2


Is polling implemented?

I've downloaded all the examples but ACK doesn't respond with any line in the code implementing socket polling. In Python (pyzmq) for multiple sockets I can use a poller:

    # now open up all the sockets
    context = zmq.Context()
    outsub = context.socket(zmq.SUB)
    outsub.bind("tcp://" + myip + ":" + str(args.outsubport))
    outsub.setsockopt(zmq.SUBSCRIBE, b"")
    inreq = context.socket(zmq.ROUTER)  
    inreq.bind("tcp://" + myip + ":" + str(args.inreqport))
    outref = context.socket(zmq.ROUTER)  
    outref.bind("tcp://" + myip + ":" + str(args.outrefport))
    req = context.socket(zmq.ROUTER)  
    req.bind("tcp://" + myip + ":" + str(args.reqport))
    repub = context.socket(zmq.PUB)  
    repub.bind("tcp://" + myip + ":" + str(args.repubport))

    # sort out the poller
    poller = zmq.Poller() 
    poller.register(inreq, zmq.POLLIN)
    poller.register(outsub, zmq.POLLIN)
    poller.register(outref, zmq.POLLIN)
    poller.register(req, zmq.POLLIN)

    # UDP socket setup for broadcasting this server's address 
    cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

    # housekeeping variables
    pulsecheck = datetime.utcnow() + timedelta(seconds = 1)
    alivelist = dict()
    pulsetimeout = 5


    # here must check if a master node is alread running on the network !!!
    # and only run this one if there isn't another. 

    while True: 
        polls = dict(poller.poll(1000))
        if inreq in polls:
            msg = inreq.recv_multipart()
            if msg[1] == b"pulse":           # handle pluse
                ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode())
                if not msg[0] in alivelist.keys():
                    handlechange(msg[0])
                alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout)
        if outsub in polls:
            msgin = outsub.recv_multipart()[0]
            repub.send(msgin) # republish
            msg = unpacker(msgin)
            if isinstance(msg, dict):
                valu = msg.get("value")
                print(".", end = "", flush = True)
            else:
                ansi("green", False, textout = msg)

        if req in polls:
            msg = req.recv_multipart()
            valmsg = validate_request(msg)
            if not valmsg[0]:
                ansi("red", True); print(valmsg[1]); ansi()
            elif len(alivelist) > 0:
                targetnode = random.choice(list(alivelist.keys()))
                inreq.send_multipart([targetnode, packer(valmsg[1])])
                ansi("blue", True, textout = "sent to " + targetnode.decode())
            else:
                ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO")
        if outref in polls:
            msg = outref.recv_multipart()
            destinataire, correlid = msg[1].split(b"/")
            req.send_multipart([destinataire, correlid, msg[2]])

Is there something similar in chumak? Using successfully from Elixir but I want non-blocking receives from a router socket to a dealer socket and I can't seem to see how I would do this using Chumak?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.