12 Star 53 Fork 17

护国神将-小将 / iot-harbor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

项目暂停维护

请使用新项目

MQTT库

git地址

采用技术

使用开源reactor-netty库,实现MQTT server。集成了springboot autoconfig实现快速注入容器。 框架采用反应式reactor3库,是代码具有低延迟,高吞吐量等特点。

目前实现功能

  • qos 0,1,2完整实现
  • 遗嘱消息,保留消息实现
  • 客户端重连机制
  • 密码校验,以及版本校验
  • 支持ssl加密
  • spring容器支持
  • channel存储,topic存储,保留消息等外部接口支持
  • MQTT 协议同时支持WS/TCP 传输,默认MQTT协议打开WS 8443端口

服务端使用说明

          RsocketServerSession serverSession=TransportServer.create("192.168.100.237",1884)
                  .auth((s,p)->true)
                  .heart(100000)
                  .protocol(ProtocolType.MQTT)
                  .ssl(false)
                  .auth((username,password)->true)
                  .log(true)
                  .messageHandler(new MemoryMessageHandler())
                  .exception(throwable -> System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&"+throwable))
                  .start()
                  .block();
            serverSession.closeConnect("device-1").subscribe();// 关闭设备端
            List<TransportConnection> connections= serverSession.getConnections().block(); // 获取所有链接
 

客户端使用说明

        RsocketClientSession clientSession= TransportClient.create("127.0.0.1",1884)
                  .heart(10000)
                  .protocol(ProtocolType.MQTT) // 指定协议 MQTT 包含 TCP/WS 两个端口 默认WS走的8443     WS协议 仅仅启动TCP协议
                  .ssl(false)  // 开发tls加密
                  .log(true)  // 打印报文日志
                  .onClose(()->{}) // 客户端关闭事件
                  .clientId("Comsumer_3") // 客户端id
                  .password("12") // 密码
                  .username("123") // 用户名
                  .willMessage("123") // 遗嘱消息
                  .willTopic("/lose") // 遗嘱消息topic
                   .willQos(MqttQoS.AT_LEAST_ONCE) // 遗嘱消息qos
                  .exception(throwable -> System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&"+throwable)) // 异常处理
                  .messageAcceptor((topic,msg)->{
                        System.out.println(topic+":"+new String(msg));
                   }) // 消息接收处理
                  .connect()
                  .block();
             clientSession.sub("test").subscribe(); // 订阅
             clientSession.pub("test","Producer_3".getBytes()).subscribe(); // 发布qos0消息
             clientSession.pub("test","Producer_1".getBytes(),1).subscribe();  // 发布qos1消息
             clientSession.pub("test","Producer_1".getBytes(),true,1).subscribe();  // 发布qos1消息 保留消息
            

服务端使用说明

yaml 配置:
iot:
  mqtt:
    server:
      enable: true
      host: 192.168.100.237
      port: 8081
      log: false
      protocol: MQTT
      heart: 100000
      ssl: false
设备校验:实现AuthencationSession接口注入容器即可
异常处理:实现ExceptorAcceptor接口注入容器即可
保留消息处理:实现RsocketMessageHandler接口注入容器即可,默认使用内存。      
            

qq群号: 789331252

关注公众号,输入 物联网 扫码加入微信交流群

image

空文件

简介

基于reactor3实现的mqtt库,代码精简。 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/lxrv587/iot-harbor.git
git@gitee.com:lxrv587/iot-harbor.git
lxrv587
iot-harbor
iot-harbor
master

搜索帮助