掘金 后端 ( ) • 2024-04-16 11:33

一、简介

mica-mqtt 基于 java aio 实现的简单低延迟高性能 的 mqtt 物联网开源组件。 mica-mqtt 更加易于集成到已有服务和二次开发,降低自研物联网平台开发成本。

二、功能

  • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
  • 支持 websocket mqtt 子协议(支持 mqtt.js)。
  • 支持 http rest api,http api 文档详见
  • 支持 MQTT client 客户端。
  • 支持 MQTT server 服务端。
  • 支持 MQTT 遗嘱消息。
  • 支持 MQTT 保留消息。
  • 支持自定义消息(mq)处理转发实现集群。
  • MQTT 客户端 阿里云 mqtt 连接 demo。
  • 支持 GraalVM 编译成本机可执行程序。
  • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
  • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。
  • 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块

三、使用场景

  • 物联网(云端 mqtt broker)
  • 物联网(边缘端消息通信)
  • 群组类 IM
  • 消息推送
  • 简单、易用的 mqtt client 客户端

四、更新记录

v2.2.11 - 2024-04-13

  • ✨ mica-mqtt-client-spring-boot-starter 简化 MqttClientTemplate 构造,方便自定义。
  • ✨ mica-mqtt-client-spring-boot-starter 优化 spring event mqtt client 连接监听。
  • ✨ mica-mqtt-client-spring-boot-starter 优化注解订阅。
  • 🐛 mqtt-client 修复 mqtt5 props 和遗嘱同时配置时连接编码问题。

五、自定义 MqttClientTemplate

在某些场景下我们需要启动多个 mqtt client 来处理和转发多个 mqtt broker 间的数据。 mqtt转发1.jpg 当然如果仅仅做数据处理和转发,建议还是直接 main 方法直接启动 2 个 mica-mqtt client。这样更加简洁,这样不依赖 Spring 打包出来的 jar 才 1M 左右,启动飞快。

/**
 * 2 个 mqtt 服务间,使用 2 个 client 做数据传输
 *
 * @author L.cm
 */
public class MqttClientProxy {

	public static void main(String[] args) {
		MqttClient client1 = MqttClient.create()
			.ip("ip1")
			.port(1883)
			.clientId("clientI")
			.username("admin&admin")
			.password("6b408b34171b0423160dcb8b70decefe")
			.debug()
			.connectSync();

		MqttClient client2 = MqttClient.create()
			.ip("ip2")
			.port(1883)
			.clientId("client2")
			.username("admin&admin")
			.password("6b408b34171b0423160dcb8b70decefe")
			.debug()
			.connectSync();

		String[] topics = new String[]{
			"$share/test/link/product1/+/event/+/post",
			"$share/test/link/product2/+/event/+/post",
			"$share/test/link/product3/+/event/+/post"
		};
		client1.subscribe(topics, MqttQoS.AT_MOST_ONCE, (context, topic, message, payload) -> {
			client2.publish(topic, payload);
		});
	}

}

自定义 MqttClientTemplate Bean:

a. 自定义 MqttClientTemplate bean 2.2.11 开始已简化,老版本建议先升级。

@Configuration
public class OtherMqttClientConfiguration {

	@Bean("mqttClientTemplate1")
	public MqttClientTemplate mqttClientTemplate1() {
		MqttClientCreator mqttClientCreator1 = MqttClient.create()
			.ip("mqtt.dreamlu.net")
			.username("mica")
			.password("mica");
		return new MqttClientTemplate(mqttClientCreator1);
	}

}

b. 修改 starter 自带的 MqttClientTemplate Bean 引入

由于现在加入了一个新的名为 mqttClientTemplate1 MqttClientTemplate,老的 starter 内置的 MqttClientTemplate 引入也需要添加 bean name。

@Autowired
@Qualifier(MqttClientTemplate.DEFAULT_CLIENT_TEMPLATE_BEAN)
private MqttClientTemplate mqttClientTemplate;

c. 新加入的 mqttClientTemplate1 MqttClientTemplate bean 引入

@Autowired
@Qualifier("mqttClientTemplate1")
private MqttClientTemplate mqttClientTemplate;

d. 新加入的 mqttClientTemplate1 注解订阅

注意:由于 @MqttClientSubscribe clientTemplateBean 默认是 MqttClientTemplate.DEFAULT_CLIENT_TEMPLATE_BEAN,所以新增的 mqttClientTemplate1 注解订阅的时候也需要配置。

@MqttClientSubscribe(
    value = "/#", 
    clientTemplateBean = "mqttClientTemplate1"
)
public void sub1(String topic, byte[] payload) {
    logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
}