theme: healer-readable highlight: a11y-dark
我们的项目需要接收设备模组数据,生成数据报告,并提供给客户查看。设备模组走的是MQTT协议,那么很显然我们需要部署MQTT消息服务,来接收设备端的MQTT连接以及接收设备数据。所以我们得部署MQTT消息服务。除了阿里云、腾讯云等物联网平台之外,我们还可以自己部署MQTT消息服务,在设备接入量很多时成本更低。
1.部署EMQX集群
我们部署服务肯定不能是单机的,所以我们部署MQTT消息服务集群。
- 软件版本选择
- 节点分配
- 安装EMQX
我们在EMQX官网上,根据操作系统版本选择EMQX版本。我们的CentOS7,所以选择了emqx-centos7-4.2.14-x86_64.zip版本。
我们下载安装EMQX。
wget https://www.emqx.com/en/downloads/broker/4.2.14/emqx-centos7-4.2.14-x86_64.zip
unzip emqx-centos7-4.2.14-x86_64.zip
- 安装HAProxy
# 在200机器上安装HAProxy
yum -y install haproxy
- 配置节点
配置202节点
# 配置节点名称
node.name = [email protected]
# 集群策略为static,无需手动添加节点
cluster.discovery = static
# 配置节点列表
cluster.static.seeds = [email protected],[email protected]
# 获取IP地址,需要设置proxy_protocol
listener.tcp.external.proxy_protocol = on
然后配置203节点类似,只是node.name节点名称不一样。
- 配置HAProxy
/etc/haproxy/haproxy.cfg
# 配置代理转发到EMQX节点
frontend frontend_emqx_tcp
bind *:1883
option tcplog
mode tcp
default_backend backend_emqx_tcp
backend backend_emqx_tcp
mode tcp
balance roundrobin
server emqx_node_1 192.168.56.202:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
server emqx_node_2 192.168.56.203:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
# 配置代理转发到控制台节点
frontend frontend_emqx_dashboard
bind *:18083
option tcplog
mode tcp
default_backend backend_emqx_dashboard
backend backend_emqx_dashboard
balance roundrobin
server emqx_node_1 192.168.56.202:18083 check
server emqx_node_2 192.168.56.203:18083 check
- 启动查看集群状态
# 启动
./bin/emqx start
# 查看集群状态
./bin/emqx_ctl cluster status
Cluster status: #{running_nodes =>
['[email protected]','[email protected]'],
stopped_nodes => []}
- 启动HAProxy
service haproxy start
发现HAProxy启动失败,查看服务状态,发现报错:cannot bind socket [0.0.0.0:1883]
[root@xg-200 ~]# service haproxy status
Redirecting to /bin/systemctl status haproxy.service
● haproxy.service - HAProxy Load Balancer
Loaded: loaded (/usr/lib/systemd/system/haproxy.service; disabled; vendor preset: disabled)
Active: failed (Result: exit-code) since 二 2024-04-16 21:23:00 CST; 55s ago
Process: 2365 ExecStart=/usr/sbin/haproxy-systemd-wrapper -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid $OPTIONS (code=exited, status=1/FAILURE)
Main PID: 2365 (code=exited, status=1/FAILURE)
4月 16 21:23:00 xg-200 systemd[1]: Started HAProxy Load Balancer.
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: haproxy-systemd-wrapper: executing /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/ha....pid -Ds
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: [ALERT] 106/212300 (2366) : Starting frontend frontend_emqx_tcp: cannot bind socket [0.0.0.0:1883]
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: [ALERT] 106/212300 (2366) : Starting frontend frontend_emqx_dashboard: cannot bind socket [...0:18083]
我们关闭selinux,然后重启
sed -i s#SELINUX=enforcing#SELINUX=disabled# /etc/selinux/config
最后再次启动HAProxy,成功了!!!
[root@xg-200 ~]# service haproxy start
Redirecting to /bin/systemctl start haproxy.service
[root@xg-200 ~]# service haproxy status
Redirecting to /bin/systemctl status haproxy.service
● haproxy.service - HAProxy Load Balancer
Loaded: loaded (/usr/lib/systemd/system/haproxy.service; disabled; vendor preset: disabled)
Active: active (running) since 二 2024-04-16 21:46:14 CST; 8s ago
Main PID: 2197 (haproxy-systemd)
Tasks: 3
Memory: 3.1M
CGroup: /system.slice/haproxy.service
├─2197 /usr/sbin/haproxy-systemd-wrapper -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid
├─2198 /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -Ds
└─2199 /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -Ds
4月 16 21:46:14 xg-200 systemd[1]: Started HAProxy Load Balancer.
4月 16 21:46:14 xg-200 haproxy-systemd-wrapper[2197]: haproxy-systemd-wrapper: executing /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/ha....pid -Ds
Hint: Some lines were ellipsized, use -l to show in full
访问HAProxy代理的18083端口,就会访问到dashboard
访问dashboard http://192.168.56.200:18083
默认用户名/密码:admin/public
然后我们连接HAProxy代理的1883端口,就会代理转发到EMQX节点。
我们看到203节点上面,接收了一个客户端连接。
我们在创建一个连接,看到连接到了202节点。
至此,我们看到使用HAProxy代理转发到EMQX节点,实现了连接的负载均衡和动态扩容
。
2.连接认证
我们使用HTTP服务认证方式来认证。
- 先开启EMQX的emqx_auth_http认证插件
- 修改认证配置文件
# 修改认证服务器地址、请求参数
vi etc/plugins/emqx_auth_http.conf
# 认证服务接口地址
auth.http.auth_req = http://192.168.56.1:80/mqtt/auth
# 请求post | get
auth.http.auth_req.method = post
# 请求参数
auth.http.auth_req.params = clientid=%c,username=%u,password=%P
同理修改203节点配置文件。
- 最后重启EMQX服务。
./bin/emqx stop
ok
[root@xg-202 emqx]#
[root@xg-202 emqx]# ./bin/emqx start
EMQ X Broker 4.2.14 is started successfully!
- 开发认证服务
单独创建一个工程作为http认证服务。主要是编写认证接口,对设备用户名密码进行验证。然后启动认证服务。
/**
* @description: 执行设备认证
* @author:xg
* @date: 2024/4/18
* @Copyright:
*/
@RestController
@RequestMapping("/mqtt")
@Slf4j
public class AuthController {
private Map<String,String> users;
@PostConstruct
public void init(){
users = new HashMap<>();
users.put("user","123456");
}
@PostMapping("/auth")
public ResponseEntity<?> auth(@RequestParam("clientid") String clientid,
@RequestParam("username") String username,
@RequestParam("password") String password) {
log.info("调用认证服务认证,clientid:{},username:{},password: {}",clientid,username,password);
// 模拟认证逻辑,实际不是这样简单处理
String userPassword = users.get(username);
if (StringUtils.isEmpty(userPassword) || !userPassword.equals(password)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body(HttpStatus.UNAUTHORIZED);
}
return ResponseEntity.ok(HttpStatus.OK);
}
}
- 客户端连接认证
输入用户名、密码,点击连接。看看是否认证成功,之后客户端是否连接成功。 发现不输入用户名、密码都可以直接连接。这不对啊。我们发现auth_http插件被关闭了。启动此插件时报错:
{emqx_auth_http,{different_server,{emqx_auth_http_app,start,[normal,[]]}}}
我们需要在认证配置文件emqx_auth_http.conf中把跟auth.http.acl_req相关的配置(EMQX配置acl权限控制)注释掉,如果不注释后面启动发布订阅MQTT的工程时会报错:
Error connecting or subscribing to [test]
#auth.http.acl_req = http://192.168.56.1:80/mqtt/acl
#auth.http.acl_req.method = get
#auth.http.acl_req.content_type = x-www-form-urlencoded
#auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m
然后重启EMQX,再次开启auth_http插件,成功。
然后,我们输入错误的密码,看到连接失败。
输入正确的用户名密码,连接成功。查看客户端连接信息。
其他认证方式,我们后面再说。
3.设备上报数据,订阅者订阅接收数据
如下图模拟设备上报数据,MQTT服务调用http认证服务执行认证,然后MQTT接收转发,然后订阅系统接收处理数据。
首先创建一个单独工程,用于开发模拟设备发布数据和订阅数据。
需要在SpringBoot集成mqtt。
- 先添加mqtt依赖
<!--mqtt依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
- 加载mqtt配置文件
mqtt配置文件
# mqtt配置
mqtt:
# HAProxy代理的地址(代理均衡转发到EMQX节点)
url: tcp://192.168.56.200:1883
# 认证的用户名密码
username: user
password: 123456
# 多长时间发送一次心跳包以维持连接(设置30s)
keep-alive: 30
connection-timeout: 3000
producerClientId: test-producer
producerQos: 1
consumerClientId: test-consumer
consumerQos: 1
deafultTopic: test
多长时间发送一次心跳包以维持连接 keep-alive: 30,这个配置尤其要注意,不能设置的太短,我们之前设置为3s,启动订阅MQTT的工程一直报错:Timed out as no activity。所以我们设置30s。
**
* @description: 加载mqtt配置文件
* @author:xg
* @date: 2024/4/22
* @Copyright:
*/
@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties implements Serializable {
private static final long serialVersionUID = -1425980007744001158L;
private String url;
private String username;
private String password;
private int keepAlive;
private int connectionTimeOut;
private String producerClientId;
private String producerQos;
private String consumerClientId;
private String consumerQos;
private String consumerTopic;
private int completionTimeout;
private String defaultTopic;
//get、set方法省略
}
- 发布订阅配置
配置连接器
/**
* MQTT消息发布订阅配置
*/
@Configuration
@Slf4j
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
/**
* 连接器
* @return
*/
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间
mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
// 多长时间发送一次心跳包以维持连接(30s)
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
mqttConnectOptions.setAutomaticReconnect(true);
// 设置连接的用户名
mqttConnectOptions.setUserName(mqttProperties.getUsername());
// 设置连接的密码
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
//服务器地址
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
return mqttConnectOptions;
}
配置客户端
/***
* MQTT客户端
* @return
*/
@Bean("mqttClientFactory")
public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions getMqttConnectOptions) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions);
return factory;
}
定义生产者发布通道
/**
* MQTT生产端发布通道
* @return
*/
@Bean("mqttOutboundChannel")
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
定义生产者处理器
/**
* MQTT生产者处理器
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory);
messageHandler.setAsync(true);
return messageHandler;
}
定义消费端订阅通道
/**
* MQTT消费端订阅通道
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = "mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
消费端通道适配器
/**
* MQTT消费端通道适配器
*
* @param channel {@link org.springframework.messaging.MessageChannel}
* @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound(
@Qualifier("mqttInboundChannel") MessageChannel channel,
@Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
adapter.setCompletionTimeout(30000);
adapter.setConverter(new DefaultPahoMessageConverter());
// 0 至多一次,数据可能丢失
// 1 至少一次,数据可能重复
// 2 只有一次,且仅有一次,最耗性能
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(channel);
return adapter;
}
消费端接收处理消息
/**
* 消费端接收处理消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("接收到消息:{}", message.getPayload());
}
};
}
- 编写消息发送网关
发送网关关联输出通道,同时在网关中定义发送方法。
/**
* @description: mqtt网关
* @author:xg
* @date: 2024/4/22
* @Copyright:
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 往topic中发布消息
* @param topic
* @param payload
*/
void send(@Header(MqttHeaders.TOPIC) String topic, String payload);
}
- 模拟设备上报数据
/**
* @description: mqtt测试
* @author:xg
* @date: 2024/4/22
* @Copyright:
*/
@RestController
@RequestMapping("/mqtt")
@Slf4j
public class MqttTestController {
@Value("${mqtt.deafultTopic}")
private String topic;
@Autowired
private MqttGateway mqttGateway;
/**
* 模拟设备上报数据
* @return
*/
@GetMapping("/publishData")
public String publishData() throws InterruptedException {
for(int i = 0; i < 10; i++) {
String data = "data:" + i;
log.info("上报数据:{}, TOPIC:{}", data, topic);
mqttGateway.send(topic, data);
Thread.sleep(5000);
}
return "OK";
}
}
- 验证订阅接收设备数据
验证订阅接收设备数据。我们发现订阅者已经订阅接收到了设备数据。
2024-04-24 22:03:04.730 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:0, TOPIC:test
2024-04-24 22:03:04.738 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:0
2024-04-24 22:03:09.731 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:1, TOPIC:test
2024-04-24 22:03:09.739 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:1
2024-04-24 22:03:14.733 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:2, TOPIC:test
2024-04-24 22:03:14.743 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:2
2024-04-24 22:03:19.734 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:3, TOPIC:test
2024-04-24 22:03:19.743 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:3
2024-04-24 22:03:24.735 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:4, TOPIC:test
2024-04-24 22:03:24.750 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:4
2024-04-24 22:03:29.738 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:5, TOPIC:test
2024-04-24 22:03:29.750 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:5
2024-04-24 22:03:34.740 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:6, TOPIC:test
2024-04-24 22:03:34.749 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:6
2024-04-24 22:03:39.741 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:7, TOPIC:test
2024-04-24 22:03:39.749 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:7
2024-04-24 22:03:44.743 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:8, TOPIC:test
2024-04-24 22:03:44.751 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:8
2024-04-24 22:03:49.743 INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController : 上报数据:data:9, TOPIC:test
2024-04-24 22:03:49.751 INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig : 接收到消息:data:9
4. 总结
本文我们主要:
- 搭建了MQTT消息集群,用于接收转发设备数据,同时实现连接的
负载均衡和动态扩容
- 配置设备接入认证,保障接入的安全性。
- 模拟设备上报数据,订阅接收数据。