Giter VIP home page Giter VIP logo

netty-mqtt-client's Introduction

netty-mqtt-client

1. 介绍

1.1 基本概况

该项目是基于Netty实现的MQTT协议的客户端,创建目的是为了学习和使用MQTT协议及Netty

1.2 技术栈

Java + Netty + MQTT

1.3 组件介绍

MqttConfiguration:MQTT全局配置组件,可支持配置TCP连接参数,代理工厂,拦截器,IO线程数,组件创建器及消息存储器

MqttClientFactory:MQTT客户端工厂,用于创建客户端,只需要传递连接参数,即可根据全局配置创建对应的MQTT客户端

MqttMsgStore:MQTT消息存储器,默认是用内存存储器,如果需要持久化,可自行实现

MqttClient:MQTT客户端,面向用户的API接口。

MqttConnectParameter:MQTT连接参数,通过设置不同的参数,可创建不同的客户端

MqttCallback:MQTT回调器,包含MQTT协议中的所有回调

MqttRetrier:MQTT重试器,重试QOS1和QOS2的消息

MqttDelegateHandler:MQTT消息委托器,即MQTT客户端和Netty之间的桥梁,主要是把MQTT的消息和Netty之间的消息进行转换处理

MqttConnector:MQTT连接器,用于连接MQTTBroker

MqttChannelHandler:MQTT在Netty中的出入栈处理器

MqttMsgIdCache:MQTT消息ID缓存器,用于生成MQTT协议层消息的ID

ObjectCreator:对象创建器,支持自定义创建MqttClient、MqttConnector、MqttDelegateHandler三大组件的实现

ProxyFactory:代理工厂,主要是用于拦截器,支持多种实现,默认使用JDK动态代理的实现

Interceptor:拦截器,仅支持拦截MqttClient、MqttConnector、MqttDelegateHandler三大接口

1.4 特色

1.基于高性能的网络开发框架Netty实现,性能更高

2.支持多个客户端使用同一个线程组,支持配置线程数量,占用的资源更少

3.目前支持MQTT 3.1.1版本(后续会开发5.0版本)

4.支持单向及双向SSL认证

5.支持自定义实现扩展组件

6.支持组件拦截

7.代码全中文注释

8.支持消息持久化(目前支持Redis和内存),仅保存不清理会话且未完成的客户端消息

9.支持遗嘱消息

10.支持QOS等级为:0、1、2

11.支持自定义配置消息重试

1.5 示例

客户端操作

初始化:

        MqttClientFactory mqttClientFactory = new DefaultMqttClientFactory();
        //使用redis消息存储器
        //MqttMsgStore mqttMsgStore = new RedisMqttMsgStore(new JedisPool("127.0.0.1", 6379));
        //mqttClientFactory.setMqttMsgStore(mqttMsgStore);
	//使用内存消息存储器
        MqttMsgStore mqttMsgStore = new MemoryMqttMsgStore();
        mqttClientFactory.setMqttMsgStore(mqttMsgStore);
        //创建连接参数,设置客户端ID
        MqttConnectParameter mqttConnectParameter = new MqttConnectParameter("xzc_test");
        //是否自动重连
        mqttConnectParameter.setAutoReconnect(true);
        //Host
        mqttConnectParameter.setHost("broker.emqx.io");
        //端口
        mqttConnectParameter.setPort(1883);
        //是否使用SSL/TLS
        mqttConnectParameter.setSsl(false);
        //是否清除会话
        mqttConnectParameter.setCleanSession(true);
        //心跳间隔
        mqttConnectParameter.setKeepAliveTimeSeconds(60);
        //连接超时时间
        mqttConnectParameter.setConnectTimeoutSeconds(30);
        //创建一个客户端
        MqttClient mqttClient = mqttClientFactory.createMqttClient(mqttConnectParameter);
        //添加回调器
        mqttClient.addMqttCallback(new DefaultMqttCallback());

连接:

        //阻塞连接至完成或超时
        mqttClient.connect();
	//非阻塞连接
	MqttFutureWrapper connectFuture = mqttClient.connectFuture();
	//阻塞至连接完成
        connectFuture.awaitComplete();

断开连接:

        //阻塞断开连接至完成
        mqttClient.disconnect();
	//非阻塞断开连接
	MqttFutureWrapper disconnectFuture = mqttClient.disconnectFuture();
	//阻塞至断开连接完成
        disconnectFuture.awaitComplete();

关闭客户端:

	//关闭客户端,关闭之后无法再进行操作
	mqttClient.close();

订阅:

        //阻塞订阅至完成
        mqttClient.subscribe("testMqttClient", MqttQoS.AT_LEAST_ONCE);
        //非阻塞订阅
        MqttFutureWrapper subscribeFuture = mqttClient.subscribeFuture("testMqttClient", MqttQoS.AT_LEAST_ONCE);
        //阻塞至订阅完成
        subscribeFuture.awaitComplete();

取消订阅:

        //阻塞取消订阅至完成
        mqttClient.unsubscribe("testMqttClient");
        //非阻塞取消订阅
        MqttFutureWrapper unsubscribeFuture = mqttClient.unsubscribeFuture("testMqttClient");
        //阻塞至取消订阅完成
        unsubscribeFuture.awaitComplete();

发送消息:

        //阻塞发送消息至完成
        mqttClient.publish("hello world!".getBytes(StandardCharsets.UTF_8), "testMqttClient", MqttQoS.EXACTLY_ONCE);
        //非阻塞发送消息
        MqttFutureWrapper publishFuture = mqttClient.publishFuture("hello world!".getBytes(StandardCharsets.UTF_8), "testMqttClient", MqttQoS.EXACTLY_ONCE);
        //阻塞至发送消息完成
        publishFuture.awaitComplete();

拦截器操作

支持拦截的接口:MqttClient、MqttConnector、MqttDelegateHandler

使用方式:

​ 1.实现拦截器接口Interceptor

​ 2.类上添加注解@Intercepts,并在type值中添加支持拦截的接口

​ 3.在intercept方法中进行拦截

​ 4.调用Invocation的proceed()执行目标方法

代码:

@Intercepts(type = {MqttClient.class})
public class LogInterceptorExample implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object target = invocation.getTarget();
        Object[] args = invocation.getArgs();
        Method method = invocation.getMethod();
        //执行目标方法
        Object returnObj = invocation.proceed();
        LogUtils.info(LogInterceptorExample.class, "拦截目标:" + target.getClass().getSimpleName() + ",拦截方法:" + method.getName() + ",拦截参数:" + Arrays.toString(args) + ",拦截返回值:" + returnObj.toString());
        return returnObj;
    }
}

更多示例请参考包:com.github.netty.mqtt.client.example

1.6 配置参数说明

MqttConfiguration配置参数:

字段/方法 类型 默认值 说明
proxyFactory ProxyFactory JdkProxyFactory 代理工厂,用于创建三大组件(MqttClient、MqttConnector、MqttDelegateHandler)的代理对象
maxThreadNumber int 1 处理IO的最大线程数即NioEventLoopGroup中的线程数量,多个客户端时可以设置为多个
mqttClientObjectCreator ObjectCreator MqttClientObjectCreator MQTT客户端的对象创建器
mqttConnectorObjectCreator ObjectCreator MqttConnectorObjectCreator MQTT连接器的对象创建器
mqttDelegateHandlerObjectCreator ObjectCreator MqttDelegateHandlerObjectCreator MQTT委托处理器的对象创建器
mqttMsgStore MqttMsgStore MemoryMqttMsgStore MQTT消息存储器
option(ChannelOption option, Object value) ChannelOption、Object Netty中的TCP连接参数
addInterceptor(Interceptor interceptor) Interceptor 拦截器,用于拦截MqttClient、MqttConnector、MqttDelegateHandler

注意:MqttClientFactory中的配置,会放入到MqttConfiguration中。

MqttConnectParameter配置参数:

字段/方法 类型 默认值 说明
clientId String 客户端ID,不能为null,也不能重复
mqttVersion MqttVersion MQTT_3_1_1 客户端版本号
host String localhost MQTTBroker的host
port int 1883 MQTTBroker的端口
username String MQTT的连接账号
password char[] MQTT的连接密码
willMsg MqttWillMsg MQTT的遗嘱消息
retryIntervalMillis long 1000毫秒 消息重试器的重试间隔,单位毫秒
retryIntervalIncreaseMillis long 1000毫秒 每次消息重试失败时,增大其重试间隔值,单位毫秒
retryIntervalMaxMillis long 15000毫秒 重试间隔的最大值,单位毫秒
keepAliveTimeSeconds int 30秒 MQTT心跳间隔,单位秒
keepAliveTimeCoefficient BigDecimal 0.75 MQTT心跳间隔系数,由于某些Broker读超时时间为心跳间隔时间,中间发报文需要时间,可能在网络不好的情况下会导致超时,所以增加该系数,即发送心跳的时间为 心跳间隔 * 系数 ,默认0.75
connectTimeoutSeconds long 30秒 MQTT连接超时时间,单位秒
autoReconnect boolean false 是否自动重连
cleanSession boolean true 是否清理会话
ssl boolean false 是否开启SSL/TLS
rootCertificateFile File 根证书文件
clientPrivateKeyFile File 客户端私钥文件,双向SSL时需要
clientCertificateFile File 客户端证书文件,双向SSL时需要

注意:在SSL相关的参数中,rootCertificateFile不是必须的,前提是 Broker 的证书是权威CA认证的话就不需要,如果是自签名的证书就需要该文件;并且在双向认证中,如果你使用的是jks或pkcs后缀的文件(私钥和证书的结合体),那么请将其转换为证书和私钥两个文件。

2. 其它

2.1 注意事项

1.需要JDK版本1.8及以上

2.日志需要导入日志框架,如果没有日志框架,则会在控制台打印日志

3.如需打包为jar包,可自行用maven插件或maven命令打包

2.2 issue

如果产生问题,请提issue

格式:问题描述+复现代码示例

netty-mqtt-client's People

Contributors

xzc-coder avatar zizaimotou avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

netty-mqtt-client's Issues

双向验证——急急急

作者大大你好,我已经用了你的这个客户端包,公司今天要求双向验证,但是我不知道怎么改代码加啊,急急急,端午后就要用了。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.