Giter VIP home page Giter VIP logo

mqttclient.rx's Introduction

MQTT Client Rx

NuGet Badge

System.Reactive

Please star this project if you find it useful. Thank you!

Credits

This project is based on MQTTnet by Christian Kratky. Without his work, this library would not be. All this library really is, is a Rx wrapper around MQTTnet.

Why this library

MQTT and Reactive Extensions (aka. ReactiveX or just Rx) are a perfect for each other! Rx is an API for asynchronous programming with observable streams, while MQTT is a protocol that produces asynchronous streams.

Version 3.2 and later

This version introduced some breaking changes. See examples below for how to use this version. It also includes a number of stability and performance improvements and bug fixes. And, the code have been cleaned up quite a bit.

How to use

Using the library is easy. Rx makes is easy to write code in a declarative manager which in IMHO is more elegant that the alternatives.

House keeping

First some house keeping. The library is very flexible and is created using Interface Driven Development.

To use this library you need to start by creating four classes that each implement these four interfaces:

  • MQTTMessage
  • IClientOptions
  • ITopicFilter
  • IWillMessage

Like this:

internal class MQTTMessage : IMQTTMessage
{
    public string Topic { get; internal set; }
    public byte[] Payload { get; internal set; }
    public QoSLevel QualityOfServiceLevel { get; internal set; }
    public bool Retain { get; internal set; }
}
internal class Options : TlsOptions, IClientOptions
{
    public Uri Uri { get; internal set; }
    public string UserName { get; internal set; }
    public string Password { get; internal set; }
    public string ClientId { get; internal set; }
    public bool CleanSession { get; internal set; }
    public TimeSpan KeepAlivePeriod { get; internal set; }
    public TimeSpan DefaultCommunicationTimeout { get; internal set; }
    public ProtocolVersion ProtocolVersion { get; internal set; }
    public ConnectionType ConnectionType { get; internal set; }
}
internal  class TlsOptions : ITlsOptions
{
    public bool UseTls { get; internal set; }
    public IEnumerable<byte[]> Certificates { get; internal set; }
    public bool IgnoreCertificateChainErrors { get; internal set; }
    public bool IgnoreCertificateRevocationErrors { get; internal set; }
    public bool AllowUntrustedCertificates { get; internal set; }
}
internal class WillMessage : IWillMessage
{
    public string Topic { get; internal set; }
    public byte[] Payload { get; internal set; }
    public QoSLevel QualityOfServiceLevel { get; internal set; }
    public bool Retain { get; internal set; }
}

Implemting these classes is easily done, and by not using concrete classes in the library you have the flexibility to how to use and implement these classes in your own project as you see fit.

Observing MQTT

Now you are ready to start using this library.

Here is an example of using the MQTT Client Rx:

Using

using IMQTTClientRx.Model;
using MQTTClientRx.Service;
var mqttService = new MQTTService();

var mqttClientOptions = new Options
{
	Uri = new Uri("mqtt://test.mosquitto.org:1883"), //Test server
	ConnectionType = ConnectionType.Tcp
};

var topic1 = new TopicFilter
{
    QualityOfServiceLevel = QoSLevel.AtMostOnce,
    Topic = "PP/#" // You might want to try something else if there is nothing is published to this topic in the test server at the time of testing this.
};

var topic2 = new TopicFilter
{
    QualityOfServiceLevel = QoSLevel.AtMostOnce,
    Topic = "EFM/#" // You might want to try something else if there is nothing is published to this topic in the test server at the time of testing this.
};

ITopicFilter[] topicFilters = 
{

    topic1,
    topic2
};

var MQTTService = mqttService.CreateObservableMQTTClient(mqttClientOptions, willMessage:null, topicFilters:topicFilters); //The topic filters are optional you can subscribe to the topics you want to monitor later.

_disposable = MQTTService.observableMessage.Subscribe(
    msg =>
    {
        // Just some colour coding to make it easier to see what topic is what
        if (msg.Topic.Contains("PP"))
        {
            Console.ForegroundColor = ConsoleColor.Yellow;
        }
        else
        {
            Console.ForegroundColor = ConsoleColor.Blue;
        }
        
        Console.WriteLine($"{Encoding.UTF8.GetString(msg.Payload)}, " +
                            $"{msg.QualityOfServiceLevel.ToString()}, " +
                            $"Retain: {msg.Retain}, " +
                            $"Topic: {msg.Topic}");
    },
    ex =>
    {
        // If an exception happens they can be manager here
        Console.WriteLine($"{ex.Message} : inner {ex.InnerException.Message}");
    },
    () =>
    {
        // When the observable completes this will run
        // Example: The observable will complete if the connection is ended by the serter
        Console.WriteLine("Completed...");
    });;

// IMPORTANT. The is new in version 3.2 and later. You have to connect to the MQTT Server.
await MQTTService.client.ConnectAsync();

Connecting to Websocket

var mqttClientOptions = new Options
{
	Uri = new Uri("ws://broker.mqttdashboard.com:8000/mqtt"), //Test server
	ConnectionType = ConnectionType.WebSocket
};

Using TLS

The library supports TLS and the option to Ignore certain types of errors and allow untrusted certificates which is powerful for testing purposes and should be used with care in non-test cases.

var mqttClientOptions = new Options
{
	Uri = new Uri("mqtts://test.mosquitto.org:8883"), //Test server
	UseTls = true,
	IgnoreCertificateChainErrors = true,
	IgnoreCertificateRevocationErrors = true,
	AllowUntrustedCertificates = true,
	ConnectionType = ConnectionType.Tcp
};

Subscribing to more Topic Filters

Subscribing to other Topic Filters is easy:

MQTTService.client.SubscribeAsync("My/NewFilter");

You can subscribe to Topic filters after creating the MQTTService.

Unsubscribing to Topic Filteres

Unsubscribing to other Topic Filters is easy:

MQTTService.client.UnsubscribeAsync("My/NewFilter");

You can unsubscribe to Topic filters after creating the MQTTService.

Publish

Publishing is easy too:

var newMessage = new MQTTMessage
{
    Payload = Encoding.UTF8.GetBytes("Hello MQTT"),
    QualityOfServiceLevel = QoSLevel.AtMostOnce,
    Retain = false,
    Topic = "MQTTClientRx/Test"
};

await MQTTService.client.PublishAsync(newMessage);

You can publish messages after creating the MQTTService.

mqttclient.rx's People

Contributors

1iveowl avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.