bytebeamio / rumqtt Goto Github PK
View Code? Open in Web Editor NEWThe MQTT ecosystem in rust
License: Apache License 2.0
The MQTT ecosystem in rust
License: Apache License 2.0
Something I tested, that affects both rumqttc and previous rumq-client crates is that pinging the broker is happening while the data connection channel is still being used.
From the documentation, the set_keep_alive
method defines:
Set number of seconds after which client should ping the broker if there is no other data exchange
But from testing, pinging is happening while subscribed, receiving data, and while publishing.
Found this because the application errored when not receiving ping response, probably due to limiting in the amount of messages the broker was handling for the client.
As a temporary measure, I just listen to eventloop and match with the error:
match eventloop.poll().await {
Ok(mut notification) => {},
Err(e) => { error!("{:?}", e); }
}
and I increased the keep_alive value so it pings less frequently.
Example:
rumq_client::publish
does not publish anything, it creates a Publish object that would later have to be sent to the eventstream. same with rumq_client::subscribe
It would make more sense to have them as Publish::new(...)
and Subscribe::new(..)
, or just rename them into create_publish
, and create_subscribe
There are numerous such issues that should be addressed before a 1.0 release, to make it easier for newcomers to navigate the library. Lets collect those issues here and discuss them.
Currently the timeout is hardcoded which is not ideal. It should be configurable with a default value and users should be able to override it.
@tekjar , I am taking this up.
Nice to have sync client as well in the docs.
Not clear whether this is possible. But it will be cool to give users of rumqttd the ability to directly subscribe and publish to Broker. e.g. see this Javascript library
Dear all,
we're currently using the rumqtt
crate but have hit kind of a road-block with that, and since that crate is archived we are considering switching to this crate.
The problem we're facing is that we need guaranteed message delivery, even when the software crashes or the system reboots, hence we must persist outgoing messages to disk and remove them only when we receive an acknowledgement from the broker.
There was some discussion in the (locked) issue tracker of rumqtt
where the main author @tekjar stated that they prefer to implement such a functionality outside of the crate. However, we were not able to do this, since there seems to be no way of knowing when a message is ack'ed by the broker.
So the question is: would this be possible to implement using rumq
. Or, even better, does anybody have a tip on how to make this work using the "old" rumqtt
crate? Sadly, we could not ask over there since the issue tracker is locked...
Best wishes, and thank you very much!
Michael
Hi,
Thank you for your hard work. I still have projects using the old rumqtt
and would like to (very) passively maintain it (e.g. bump some dependencies, apply a few trivial merge requests).
Would it be possible for you to transfer ownership of the github repo and the crate to me ?
Set up description:
Rumqttc
client with ca-chain
and client-auth
.First start off with any small payload size. Try with payload size >= 4096, we will have error
ERROR rustls::session > TLS alert received: Message { typ: Alert, version: TLSv1_3, payload: Alert( AlertMessagePayload { level: Fatal, description: InternalError, }, ), }
Sample code sample for client configuration
fn set_tls(mqttoptions: &mut MqttOptions, ca_file: Option<String>, client_file: Option<String>, client_key: Option<String>, use_ssl: i16) {
let mut ca_data: Vec<u8> = Vec::new();
let mut cert_data = Vec::new();
let mut key_data = Vec::new();
// Unwrap or handle?
match ca_file {
Some(file_path) => {
let mut ca_file = File::open(file_path).unwrap();
ca_file.read_to_end(&mut ca_data).unwrap();
},
None => println!("No ca file provided"),
}
mqttoptions.set_ca(ca_data);
match client_file {
Some(file_path) => {
let mut cert_file = File::open(file_path).unwrap();
cert_file.read_to_end(&mut cert_data).unwrap();
},
None => println!("No client_cert privided"),
};
match client_key {
Some(file_path) => {
let mut key_file = File::open(file_path).unwrap();
key_file.read_to_end(&mut key_data).unwrap();
},
None => println!("No client key provided"),
};
if use_ssl == 2 {
mqttoptions.set_client_auth(cert_data, key_data);
}
}
mqttwrk can also be used(please use the forked one mentioned in the link, as that doesnt unwrap
on event loop
poll.
./target/debug/mqttwrk -s <server_addr> -P <server_port> -t 2 -R <path to Ca-Chain> -C <path to client cert> -K <path to client key> -c 1 -p 4096.
I'll use this issue to articulate my thoughts around current design and how the high level client should look like to make the most out of the client with little effort. This issue won't be closed
The current async API looks like this
let (requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqttoptions, requests_rx);
// connect and poll
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
This pattern offers a lot of flexibility
eventloop()
takes request Stream
Stream being a trait allows library to not make any assumptions about request pattern. E.g
Stream
for robustness across rebootsselect!
s internally to tackle request prioritiesconnect
returns a Stream
This stream does everything MQTT. A poll on this stream will do the following internally
NOTE: These are necessary for a robust, persistent mqtt connection. I've seen a lot of implementations ignoring these only to get bitten at some point of time. Even if you are using different clients from other languages, keep these in mind
Now this Stream
can be plugged into async/sync code as they see fit
state separated from user Stream
But the important thing here is that mqtt state is separated from the stream. It takes state as a reference. This allows users to freely create and destroy connections and resume from where we left. This makes reconnections simple. Just wrap the above code in a loop
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
}
Again, how client should reconnect (10 times or infinite) is left to the user.
high level client
All these features are great if you've sophisticated uses but not every one has to understand all the details when they start. They can reach this stage of customization eventually. When they do, the solution is with in the reach.
Getting started with the library should be fun and easy. They don't have to know that they should poll the eventloop Stream
with tokio, read about async rust or why there are 3 (and not 1) steps to start the eventloop
rough sketch of client APIs
This spawns a background thread where eventloop runs. tx
is used send mqtt requests and rx
is
used to receive notifications. Just like a channel
let (tx, rx) = rumq_client::spawn_eventloop(options)
And this if eventloop should run on current thread
let eventloop = rumq_client::eventloop(options);
let (tx, rx) = eventloop.handles();
eventloop.run()
Max packet size is hardcoded (I think here).
I'm receiving Abort(Rumq(PayloadSizeLimitExceeded))
errors.
#99 seems to have broken Publish::set_retain(true). Before this change it works; after it messages which I try to send retained are not.
ECC is less compute intensive which helps in constrained devices.
https://www.leaderssl.com/articles/345-what-is-ecc-and-why-you-should-use-it
Hello, thanks for your effort.
I can't seem to find any of the subj.
I'm considering using this (or alternatives) for simple embedded MQTT client-server communication.
rustls still has some limitations, some old certificates are not supported.
Do you have a plan to add native-tls as a feature?
Just saw that EventLoopError
doesn't implement std::error::Error
. Is this a known limitation or something I can easily add?
When using rumq-client
(version alpha.5
) in Cargo.toml
, it fails to compile due to a bunch of missing functions such as time::throttle
and in tokio
...
Environment: Windows 10
Toolchain: stable-x86_64-pc-windows-msvc
The library is already plenty fast when compared to go paho client but it's nice to get away with this locking API
https://github.com/tekjar/rumq/blob/master/rumq-client/src/eventloop.rs#L99
Results in +5MB/s thoughput
I am creating a Sparkplug™ B client as my first big Rust project. The Sparkplug™ B specification requires LastWill
message be a binary Protocol Buffers type. But the type on rumqttc LastWill
is String
.
Is there any other way to create the connection using a Vec<u8>
or Bytes
or something of that nature as the data type for the message
field of LastWill
struct?
Connecting to an MQTT broker via TLS works on all platforms
Succeeds on Darwin:
2020-08-20 10:01:56.213 - DEBUG - Received a message from MQTT broker: (Some(Connected), None)
Fails on Yocto linux:
2020-08-20 11:53:00.441 - ERROR - Connection error: Network(Io(Custom { kind: InvalidData, error: WebPKIError(UnknownIssuer) }))
It's the exact same code, where we read the CA if present:
let ca = fs::read(&mqttpub_config.ca_pem_path).await.map_err(|e| {
let error = format!(
"Could not read CA file \"{}\": {}",
mqttpub_config.ca_pem_path, e
);
io::Error::new(io::ErrorKind::Other, error)
})?;
mqttoptions.set_ca(ca);
I observed that on both platforms, rustls
is using different TLSv1.3 encrypted extensions:
# OSX
2020-08-20 10:01:56.123 - DEBUG - rustls::client::tls13 - TLS1.3 encrypted extensions: []
# Linux / yocto
2020-08-20 10:01:56.123 - DEBUG - rustls::client::tls13 - TLS1.3 encrypted extensions: [ServerNameAck]
I'm planning to rename this repo to rumqtt. I've waited long enough for my previous employer to respond regarding ownership transfer. I thought to migrate the code when that happens but I'm just going to rename.
Coming to the structure, this is what I've in mind (mostly copied from tokio)
rumqtt
- mqtt4bytes # bare bones no std compatible mqtt 4 serialization and deserialization
- mqtt5bytes # bare bones no std compatible mqtt 5 serialization and deserialization
- rumqttc # client library (alone with high level APIs)
- rumqttd # embeddable broker library
- rumqtt # wraps rumqttc and rumqttd with feature flags
Additionally a rumqtt binary which can do what mosquitto command line utility does + more
rumqtt client {options)
rumqtt broker {options}
rumqtt test
mqtt4bytes
and mqtt5bytes
versions will increase independently while rumqttc
, rumqttd
and rumqtt
will be lock-stepped
Also a (rate limited) public broker will be hosted at broker.rumqtt.com
The Notification enum should not contain the variant Connected, since this can never happen once the stream created. It should not be mixed with the other MQTT stream logic, since It will only happen once, and only happen as the first thing when creating eventloop.
When creating the eventloop or stream this should be reported back in the form of a Result<Connected, ConnectionError>
Example:
let mut eventstream = eventloop(mqtt_options, rx).stream();
// this will always be the first event in a successfull connection
if let Some(Notification::Connected) = eventstream.next() {
while let Some(notif) = evenstream.next() {
match notif {
Notification::Connected {
// this can never happen again, and should thus not be part of this enum
// and having it here kind of bloats the user logic
},
_ => { // all interesting events },
}
}
}
Hi, I'm trying to migrate a client application from rumqtt (that I keep in a local copy, manually updating its dependencies - but I hit a wall today with the "ring" dependency of rustls / webpki).
I think I understand how to handle the async stream by running tokio in a thread, but the problem is the notifications - I need to crossbeam_channel::select!
over multiple channels, one of them being the rumqtt notifications channel.
So far, the only solution I came up with is to run a thread that receives messages from the tokio channel and re-sends them to a crossbeam channel, but it seems dirty. What do you think?
(I really don't want to rewrite everything using tokio::sync::mpsc
)
We had an issue wherein the network we were trying to talk to had a mtu
set to a value less than the payload we were trying to send.
Openssl
, Golang
set the MTU
to minimum threshold whereas RUSTLS
sets mtu
to the maximum possible value in the spec. RUSTLS
however exposes a method to set mtu
. We need to utilize this and have an option to set mtu
that percolates down to RUSTLS
.
References
In https://github.com/tekjar/rumq/blob/8c20d58523bcd5ca74c823a9f20c0f3dc5a04393/rumq-client/src/eventloop.rs#L72-L76, it is alleged that options
, state
and requests
can be accessed and even updated, at least in case of error. However, these fields of MqttState
are private, and none of the component are returned in error variants or anything.
Maybe fn stream
should consume the MqttState
so that it can only by accessed when the stream isn't polled?
Hi there.
I am trying to run the following code (actually taken from the syncpubsub example), just wrapped in a loop (in an attempt reconnect).
I am using rumqttc "0.0.4" and used "cross" to compile on MacOS (via Rust 1.43.1) for ARM, testing on a Rasperry Pi 4.
The code runs fine for a while, consuming and processing MQTT messages, however after about 70 minutes the invoke value resolves in an Error("Stream done"). I tried to wrap the connection.iter in a loop hoping it would reconnect, which seems like it worked. However it does not receive messages from the subscribed topic anymore.
Any idea what I am doing wrong? btw. you can find the whole code here
Snippet:
let (mut client, mut connection) = Client::new(mqtt_options, 10);
client.subscribe(event_topic.as_str(), QoS::AtMostOnce)
.expect("Failed to subscribe to mqtt event topic");
loop {
for (_i, invoke) in connection.iter().enumerate() {
match invoke {
Err(e) => {
error!("mqtt error {}", e);
continue;
},
_ => ()
};
let (inc, _out) = invoke.unwrap();
if inc.is_none() {
continue;
}
match inc.unwrap() {
Incoming::Publish(message) => {
let payload = str::from_utf8(&message.payload);
if payload.is_err() {
error!("Failed to decode mqtt payload {:?}", payload);
continue;
}
let payload = payload.unwrap();
/* some business logic here.. */
},
_ => continue
}
}
}
thx
Network reads and channels reads (for user requests) are orchestrated concurrently with select!
. The function abstracting the above select returns packets to write to network. But this network write is in a synchronous path which might be affecting the throughput as network reads cannot happen during this time. Find ways to fix this
We can have very many config options for Rustls
. Its not practical to use Mqttoptions
for all configs related to Rustls
. A more pragmatic option would be that the use creates his|her own ClientConfig
with all the desired configs.
If I do something like this:
let event_loop = EventLoop::new(mqtt_options, REQUESTS_CAP).await;
let requests_tx = event_loop.handle();
requests_tx.send(Request::Disconnect).await;
It panics at state.rs:138 because it is unimplemented.
I'll send a patch which seems to fix this.
Network::new
just hardcodes the max packet size to 1024, making rumqttc
unusable for many use cases.
Fixed header parsing should have more tests and benchmarks
There is a perf degradation in commit 07614a5
Id = rumqtt-0
Outgoing publishes : Received = 1000000 Throughput = 330141.97 messages/s
Incoming publishes : Received = 0 Throughput = NaN messages/s
Reconnects : 0
vs
commit c099f42
Id = rumqtt-0
Outgoing publishes : Received = 1000000 Throughput = 674308.8 messages/s
Incoming publishes : Received = 0 Throughput = NaN messages/s
Reconnects : 0
I'm using mqttwrk to test this
Steps to reproduce
cd rumqttd && cargo run --release
cd mqttwrk && cargo run --release
As per [MQTT-2.3.1-1] requirement packet identifiers MUST be non-zero. However not only can it be zero as per https://github.com/tekjar/rumq/blob/30725bee1d71de0344fc1530a4e2952a08593621/rumq-core/src/packets.rs#L7
it also defaults to 0 in a few places, e.g. in:
https://github.com/tekjar/rumq/blob/30725bee1d71de0344fc1530a4e2952a08593621/rumq-core/src/packets.rs#L147
@TotalKrill Can you try the below link? I'm still figuring out microsoft teams
https://teams.microsoft.com/join/qu6o42ji1nm9
I can also add by email if that's not a problem
PS: We can switch to a different method of communication if this is too messy
When using eventloop.poll()
inside of async function/spawned task it causes the program to panic. The error is as follows:
thread 'tokio-runtime-worker' panicked at 'removal index (is 0) should be < len (is 0)', src/liballoc/vec.rs:1057:13
The thread should start executing as expected.
Put the async example for the client inside of an async function and spawn it from main
The function
// .. do stuff here to get CA certs
// Create the options for the Mqtt client
let mut opt = MqttOptions::new("server", host, port.parse::<u16>().unwrap());
opt.set_keep_alive(5);
opt.set_ca(ca_cert_buf);
opt.set_client_auth(server_cert_buf, private_key_buf);
let mut eventloop = EventLoop::new(opt, 10).await;
let tx = eventloop.handle();
// .. spawn loop function for communicating with separate message broker..
// .. (tx used here)
loop {
if let Ok((incoming, outgoing)) = eventloop.poll().await {
println!("Incoming = {:?}, Outgoing = {:?}", incoming, outgoing);
};
}
In main itself:
let task = task::spawn(mqtt_run());
Latest commit f94e554d95
First of all: I like the new design - thanks for all the work.
The builders used to create Request
s feel to limiting for certain use-cases.
Here are some examples:
subscribe
allows only to pass a single topic. During mass-subscribe, I have a Vec
of topics already. It would be nice to have a builder which allow me to pass this directly.Publish
uses a Arc<Vec<u8>>
as payload, but the builder does not allow to pass an Arc
. I've got an Arc
already, as I'm publishing the same message in a loop. Would be nice to avoid the clone
s.I'm not really sure what's the best way to design such an API. But maybe wi can use this issue to collect some ideas/use-cases.
The keys and signature generated using Elliptical curve in place of RSA are smaller and hence can boost performance in constrained devices.
@tekjar , I can take this up.
I'm super confused. When pulling in rums-client
as a dependency along with tokio
in a otherwise completely empty project:
[dependencies]
tokio = "0.2"
rumq-client = { git = "https://github.com/tekjar/rumq.git" }
I get a strange compilation error regarding tokio::select
.
~/asdf ‹master*› cargo build
Updating git repository `https://github.com/tekjar/rumq.git`
Updating crates.io index
Compiling rumq-client v0.1.0-alpha.6 (https://github.com/tekjar/rumq.git#b0adb18b)
error[E0432]: unresolved import `tokio::select`
--> /Users/felix/.cargo/git/checkouts/rumq-df81ca706f7bfd29/b0adb18/rumq-client/src/eventloop.rs:6:5
|
6 | use tokio::select;
| ^^^^^^^^^^^^^ no `select` in the root
error: cannot determine resolution for the macro `select`
--> /Users/felix/.cargo/git/checkouts/rumq-df81ca706f7bfd29/b0adb18/rumq-client/src/eventloop.rs:120:21
|
120 | select! {
| ^^^^^^
|
= note: import resolution is stuck, try simplifying macro imports
error: aborting due to 2 previous errors
For more information about this error, try `rustc --explain E0432`.
error: could not compile `rumq-client`.
@mojzu had something similar in #34 . Pinning the tokio
dependency to the latest 0.2.13
ends up with the same error. The toolchain ist latest stable and rumq
builds fine when built standalone.
I tracked down the change that causes the error to #42 which changes nothing regarding deps, versions etc...
Any ideas?
When decoder reads data and returns with EoF due to insufficient data, use this information in the codec to early return instead of parsing again only to get triggered by insufficient data again
Stuff to do before 1.0 release on May 1st week
Related to #64
My editor is set to automatically run rustfmt on save. However, the rumqtt code hasn't been run through rustfmt before, so this always generates a bunch of spurious changes which I have to revert. How would people feel about running rustfmt over everything, to avoid this problem in future?
Alternatively, it might be possible to add a rustfmt.toml which disables it.
I have successfully subscribed to a topic and received a message, but I can't figure out how to get the content of the message?
Hi
There is any example or documentation for how to setup a mqtt broker with authentication handling using rumq-broker
We started using Rumqtt at work last year, and now I see that is has been archived and ownership is transferred here. Since this is in alpha state, is it a rewrite?
Hi, I'm in the process of migrating from the rumqtt client to this library and running into reconnection problems during testing. When I disconnect the client from the network the broker is on I get the following error:
{
"file": "/build/.cargo/registry/src/github.com-1ecc6299db9ec823/rumq-client-0.1.0-alpha.4/src/state.rs",
"level": "ERROR",
"line": 263,
"message": "Error awaiting for last ping response",
"module_path": "rumq_client::state",
"target": "rumq_client::state",
"time": "2020-02-12T14:30:57.027601409+00:00"
}
However when I reconnect the client to the network I continue getting the same error and the client doesn't reconnect. Based on the logs I'm not receiving a Reconnection or Disconnection notification, but with each error I am getting a StreamEnd:
{
"file": "app/src/mqtt/mod.rs",
"level": "DEBUG",
"line": 551,
"message": "notification: StreamEnd(MqttState(AwaitPingResp))",
"module_path": "app::mqtt",
"target": "app::mqtt",
"time": "2020-02-12T15:28:10.837594182+00:00"
}
In case I've done something wrong here is my code for setting up the client options.
fn rumq_options(&self) -> rumq_client::MqttOptions {
let mut options = rumq_client::MqttOptions::new(&self.client_id, &self.host, self.port);
options
.set_keep_alive(10)
.set_throttle(Duration::from_secs(1))
.set_clean_session(false)
.set_request_channel_capacity(128)
.set_notification_channel_capacity(128);
if let Some(user_password) = &self.user_password {
options.set_credentials(&user_password.user, &user_password.password);
}
if let Some(tls) = &self.tls {
options.set_ca(tls.ca_pem.clone());
// mqttoptions.set_client_auth(client_cert, client_key);
}
options
}
And my code for spawning the tasks for sending/receiving.
/// Start asynchronous loops to handle events.
pub async fn start(self) -> (task::JoinHandle<()>, task::JoinHandle<()>) {
let mqtt = Arc::new(self);
let (mut requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqtt.options.rumq_options(), requests_rx);
mqtt.metric_connected().set(1);
mqtt.rumq_subscribe(&mut requests_tx).await;
let mqtt1 = mqtt.clone();
let mut requests_tx1 = requests_tx.clone();
let poll_task = task::spawn(async move {
loop {
let mqtt = mqtt1.clone();
mqtt.poll_handler(&mut requests_tx1).await;
time::delay_for(Duration::from_secs(1)).await;
}
});
let notification_task = task::spawn(async move {
loop {
let mqtt = mqtt.clone();
let mut stream = eventloop.stream();
while let Some(item) = stream.next().await {
mqtt.notification_handler(item, &mut requests_tx).await;
}
time::delay_for(Duration::from_secs(1)).await;
}
});
(poll_task, notification_task)
}
If there's any other information I can provide let me know. Thanks for the help and the great library, migrating was much easier than I expected and the new interface works very well with my other async code 👍
Is it possible to use websockets (both secure and not) with this?
Could you document an example if so?
Thanks for your work!
I am trying to subscribe to a topic with certificates but I am getting this error:
Err(Network(Io(Custom { kind: InvalidData, error: WebPKIError(BadDER) })))
I am not sure I am configuring correctly the MqttOptions. Here is what I did:
let ca: Vec<u8> = fs::read("/etc/mosquitto/certs/mqtt-ca.crt").expect("Something went wrong reading certificate!");
let mut mqttoptions = MqttOptions::new("air", "mysite.com", 8884);
mqttoptions.set_ca(ca);
mqttoptions.set_credentials("my_username", "my_password");
mqttoptions
.set_keep_alive(5)
.set_throttle(Duration::from_secs(2));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("read/+/meas", QoS::ExactlyOnce).unwrap();
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
println!("Notification {} = {:?}", i, notification);
}
For reference, the mosquitto command I am using and which is working.
mosquitto_sub --cafile /etc/mosquitto/certs/mqtt-ca.crt -h mysite.com -p 8884 -u 'my_username' -P 'my_password' -t 'read/+/error' -v
As I am also beginning with Rust, I am not sure the ca should be provided as I did.
Any idea what I could do wrong?
This example demonstrates how to use sync APIs
Please give me feedback if this improves the ease of use. Will be merged by the end of this week if there are no comments. Feel free to tag other people who might be interested
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.