掘金 后端 ( ) • 2024-04-25 09:57

theme: healer-readable highlight: a11y-dark

我们的项目需要接收设备模组数据,生成数据报告,并提供给客户查看。设备模组走的是MQTT协议,那么很显然我们需要部署MQTT消息服务,来接收设备端的MQTT连接以及接收设备数据。所以我们得部署MQTT消息服务。除了阿里云、腾讯云等物联网平台之外,我们还可以自己部署MQTT消息服务,在设备接入量很多时成本更低。

1.部署EMQX集群

我们部署服务肯定不能是单机的,所以我们部署MQTT消息服务集群。

  1. 软件版本选择
软件 版本 EMQX v4.2.5 HAProxy v2.2+
  1. 节点分配
节点ip 节点描述 节点作用 192.168.56.200 HAProxy 做代理转发,以实现连接数据发送的负载均衡和动态扩展 192.168.56.202 EMQX节点1 MQTT消息服务节点 192.168.56.203 EMQX节点2 MQTT消息服务节点
  1. 安装EMQX

我们在EMQX官网上,根据操作系统版本选择EMQX版本。我们的CentOS7,所以选择了emqx-centos7-4.2.14-x86_64.zip版本。

我们下载安装EMQX。

wget https://www.emqx.com/en/downloads/broker/4.2.14/emqx-centos7-4.2.14-x86_64.zip
unzip emqx-centos7-4.2.14-x86_64.zip
  1. 安装HAProxy
# 在200机器上安装HAProxy
 yum -y install haproxy
  1. 配置节点

配置202节点

# 配置节点名称
node.name = [email protected]

# 集群策略为static,无需手动添加节点
cluster.discovery = static

# 配置节点列表
cluster.static.seeds = [email protected],[email protected]

# 获取IP地址,需要设置proxy_protocol
listener.tcp.external.proxy_protocol = on

然后配置203节点类似,只是node.name节点名称不一样。

  1. 配置HAProxy
/etc/haproxy/haproxy.cfg

# 配置代理转发到EMQX节点
frontend frontend_emqx_tcp
    bind *:1883
    option tcplog
    mode tcp
    default_backend backend_emqx_tcp

backend backend_emqx_tcp
    mode tcp
    balance roundrobin
    server emqx_node_1 192.168.56.202:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
    server emqx_node_2 192.168.56.203:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
    
# 配置代理转发到控制台节点
frontend frontend_emqx_dashboard
    bind *:18083
    option tcplog
    mode tcp
    default_backend backend_emqx_dashboard

backend backend_emqx_dashboard
    balance roundrobin
    server emqx_node_1 192.168.56.202:18083 check
    server emqx_node_2 192.168.56.203:18083 check
  1. 启动查看集群状态
# 启动
./bin/emqx start
# 查看集群状态
./bin/emqx_ctl cluster status

Cluster status: #{running_nodes =>
                      ['[email protected]','[email protected]'],
                  stopped_nodes => []}
  1. 启动HAProxy
service haproxy start

发现HAProxy启动失败,查看服务状态,发现报错:cannot bind socket [0.0.0.0:1883]

[root@xg-200 ~]#  service haproxy status
Redirecting to /bin/systemctl status haproxy.service
● haproxy.service - HAProxy Load Balancer
   Loaded: loaded (/usr/lib/systemd/system/haproxy.service; disabled; vendor preset: disabled)
   Active: failed (Result: exit-code) since 二 2024-04-16 21:23:00 CST; 55s ago
  Process: 2365 ExecStart=/usr/sbin/haproxy-systemd-wrapper -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid $OPTIONS (code=exited, status=1/FAILURE)
 Main PID: 2365 (code=exited, status=1/FAILURE)

4月 16 21:23:00 xg-200 systemd[1]: Started HAProxy Load Balancer.
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: haproxy-systemd-wrapper: executing /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/ha....pid -Ds
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: [ALERT] 106/212300 (2366) : Starting frontend frontend_emqx_tcp: cannot bind socket [0.0.0.0:1883]
4月 16 21:23:00 xg-200 haproxy-systemd-wrapper[2365]: [ALERT] 106/212300 (2366) : Starting frontend frontend_emqx_dashboard: cannot bind socket [...0:18083]

我们关闭selinux,然后重启

sed -i s#SELINUX=enforcing#SELINUX=disabled# /etc/selinux/config

最后再次启动HAProxy,成功了!!!

[root@xg-200 ~]# service haproxy start
Redirecting to /bin/systemctl start haproxy.service
[root@xg-200 ~]# service haproxy status
Redirecting to /bin/systemctl status haproxy.service
● haproxy.service - HAProxy Load Balancer
   Loaded: loaded (/usr/lib/systemd/system/haproxy.service; disabled; vendor preset: disabled)
   Active: active (running) since 二 2024-04-16 21:46:14 CST; 8s ago
 Main PID: 2197 (haproxy-systemd)
    Tasks: 3
   Memory: 3.1M
   CGroup: /system.slice/haproxy.service
           ├─2197 /usr/sbin/haproxy-systemd-wrapper -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid
           ├─2198 /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -Ds
           └─2199 /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -Ds

4月 16 21:46:14 xg-200 systemd[1]: Started HAProxy Load Balancer.
4月 16 21:46:14 xg-200 haproxy-systemd-wrapper[2197]: haproxy-systemd-wrapper: executing /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/ha....pid -Ds
Hint: Some lines were ellipsized, use -l to show in full

访问HAProxy代理的18083端口,就会访问到dashboard

访问dashboard http://192.168.56.200:18083
默认用户名/密码:admin/public

然后我们连接HAProxy代理的1883端口,就会代理转发到EMQX节点。

image.png 我们看到203节点上面,接收了一个客户端连接。

image.png

我们在创建一个连接,看到连接到了202节点。

image.png

至此,我们看到使用HAProxy代理转发到EMQX节点,实现了连接的负载均衡和动态扩容

2.连接认证

我们使用HTTP服务认证方式来认证。

  1. 先开启EMQX的emqx_auth_http认证插件

image.png

  1. 修改认证配置文件
# 修改认证服务器地址、请求参数
vi etc/plugins/emqx_auth_http.conf

# 认证服务接口地址
auth.http.auth_req = http://192.168.56.1:80/mqtt/auth

# 请求post | get
auth.http.auth_req.method = post

# 请求参数
auth.http.auth_req.params = clientid=%c,username=%u,password=%P

同理修改203节点配置文件。

  1. 最后重启EMQX服务。
 ./bin/emqx stop
ok
[root@xg-202 emqx]#
[root@xg-202 emqx]# ./bin/emqx start
EMQ X Broker 4.2.14 is started successfully!
  1. 开发认证服务

单独创建一个工程作为http认证服务。主要是编写认证接口,对设备用户名密码进行验证。然后启动认证服务。

/**
 * @description: 执行设备认证
 * @author:xg
 * @date: 2024/4/18
 * @Copyright:
 */
@RestController
@RequestMapping("/mqtt")
@Slf4j
public class AuthController {

    private Map<String,String> users;

    @PostConstruct
    public void init(){
        users = new HashMap<>();
        users.put("user","123456");
    }

    @PostMapping("/auth")
    public ResponseEntity<?> auth(@RequestParam("clientid") String clientid,
                                  @RequestParam("username") String username,
                                  @RequestParam("password") String password) {
        log.info("调用认证服务认证,clientid:{},username:{},password: {}",clientid,username,password);
        // 模拟认证逻辑,实际不是这样简单处理
        String userPassword = users.get(username);
        if (StringUtils.isEmpty(userPassword) || !userPassword.equals(password)) {
            return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body(HttpStatus.UNAUTHORIZED);
        }
        return ResponseEntity.ok(HttpStatus.OK);
    }
}
  1. 客户端连接认证

image.png 输入用户名、密码,点击连接。看看是否认证成功,之后客户端是否连接成功。 发现不输入用户名、密码都可以直接连接。这不对啊。我们发现auth_http插件被关闭了。启动此插件时报错:

{emqx_auth_http,{different_server,{emqx_auth_http_app,start,[normal,[]]}}}

我们需要在认证配置文件emqx_auth_http.conf中把跟auth.http.acl_req相关的配置(EMQX配置acl权限控制)注释掉,如果不注释后面启动发布订阅MQTT的工程时会报错:

Error connecting or subscribing to [test]
#auth.http.acl_req = http://192.168.56.1:80/mqtt/acl
#auth.http.acl_req.method = get
#auth.http.acl_req.content_type = x-www-form-urlencoded
#auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m

然后重启EMQX,再次开启auth_http插件,成功。

然后,我们输入错误的密码,看到连接失败。

image.png

输入正确的用户名密码,连接成功。查看客户端连接信息。

image.png

image.png

其他认证方式,我们后面再说。

3.设备上报数据,订阅者订阅接收数据

如下图模拟设备上报数据,MQTT服务调用http认证服务执行认证,然后MQTT接收转发,然后订阅系统接收处理数据。

发布订阅.png

首先创建一个单独工程,用于开发模拟设备发布数据和订阅数据。

需要在SpringBoot集成mqtt。

  1. 先添加mqtt依赖
<!--mqtt依赖-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  1. 加载mqtt配置文件

mqtt配置文件

# mqtt配置
mqtt:
  # HAProxy代理的地址(代理均衡转发到EMQX节点)
  url: tcp://192.168.56.200:1883
  # 认证的用户名密码
  username: user
  password: 123456
  # 多长时间发送一次心跳包以维持连接(设置30s)
  keep-alive: 30
  connection-timeout: 3000
  producerClientId: test-producer
  producerQos: 1
  consumerClientId: test-consumer
  consumerQos: 1
  deafultTopic: test

多长时间发送一次心跳包以维持连接 keep-alive: 30,这个配置尤其要注意,不能设置的太短,我们之前设置为3s,启动订阅MQTT的工程一直报错:Timed out as no activity。所以我们设置30s。

**
 * @description: 加载mqtt配置文件
 * @author:xg
 * @date: 2024/4/22
 * @Copyright:
 */
@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties implements Serializable {
    private static final long serialVersionUID = -1425980007744001158L;

    private String url;

    private String username;

    private String password;

    private int keepAlive;

    private int connectionTimeOut;

    private String producerClientId;

    private String producerQos;

    private String consumerClientId;

    private String consumerQos;

    private String consumerTopic;

    private int completionTimeout;

    private String defaultTopic;

    //get、set方法省略
}
  1. 发布订阅配置

配置连接器

/**
 * MQTT消息发布订阅配置
 */
@Configuration
@Slf4j
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    /**
     * 连接器
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间
        mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
        // 多长时间发送一次心跳包以维持连接(30s)
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        mqttConnectOptions.setAutomaticReconnect(true);
        // 设置连接的用户名
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        // 设置连接的密码
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        //服务器地址
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});

        return mqttConnectOptions;
    }

配置客户端

/***
 * MQTT客户端
 * @return
 */
@Bean("mqttClientFactory")
public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions getMqttConnectOptions) {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions);
    return factory;
}

定义生产者发布通道

/**
 *  MQTT生产端发布通道
 * @return
 */
@Bean("mqttOutboundChannel")
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

定义生产者处理器

/**
 * MQTT生产者处理器
 *
 * @return {@link org.springframework.messaging.MessageHandler}
 */
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory);
    messageHandler.setAsync(true);
    return messageHandler;
}

定义消费端订阅通道

/**
 * MQTT消费端订阅通道
 *
 * @return {@link org.springframework.messaging.MessageChannel}
 */
@Bean(name = "mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
}

消费端通道适配器

/**
 * MQTT消费端通道适配器
 *
 * @param channel {@link org.springframework.messaging.MessageChannel}
 * @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
 * @return {@link org.springframework.integration.core.MessageProducer}
 */
@Bean
public MessageProducer inbound(
        @Qualifier("mqttInboundChannel") MessageChannel channel,
        @Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
    adapter.setCompletionTimeout(30000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    // 0 至多一次,数据可能丢失
    // 1 至少一次,数据可能重复
    // 2 只有一次,且仅有一次,最耗性能
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(channel);
    return adapter;
}

消费端接收处理消息

/**
 * 消费端接收处理消息
 * @return
 */
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            log.info("接收到消息:{}", message.getPayload());
        }
    };
}
  1. 编写消息发送网关

发送网关关联输出通道,同时在网关中定义发送方法。

/**
 * @description: mqtt网关
 * @author:xg
 * @date: 2024/4/22
 * @Copyright:
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 往topic中发布消息
     * @param topic
     * @param payload
     */
    void send(@Header(MqttHeaders.TOPIC) String topic, String payload);
}
  1. 模拟设备上报数据
/**
 * @description: mqtt测试
 * @author:xg
 * @date: 2024/4/22
 * @Copyright:
 */
@RestController
@RequestMapping("/mqtt")
@Slf4j
public class MqttTestController {

    @Value("${mqtt.deafultTopic}")
    private String topic;

    @Autowired
    private MqttGateway mqttGateway;

    /**
     * 模拟设备上报数据
     * @return
     */
    @GetMapping("/publishData")
    public String publishData() throws InterruptedException {

        for(int i = 0; i < 10; i++) {
            String data = "data:" + i;
            log.info("上报数据:{}, TOPIC:{}", data, topic);
            mqttGateway.send(topic, data);
            Thread.sleep(5000);
        }
        return "OK";
    }

}
  1. 验证订阅接收设备数据

验证订阅接收设备数据。我们发现订阅者已经订阅接收到了设备数据。

2024-04-24 22:03:04.730  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:0, TOPIC:test
2024-04-24 22:03:04.738  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:0
2024-04-24 22:03:09.731  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:1, TOPIC:test
2024-04-24 22:03:09.739  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:1
2024-04-24 22:03:14.733  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:2, TOPIC:test
2024-04-24 22:03:14.743  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:2
2024-04-24 22:03:19.734  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:3, TOPIC:test
2024-04-24 22:03:19.743  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:3
2024-04-24 22:03:24.735  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:4, TOPIC:test
2024-04-24 22:03:24.750  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:4
2024-04-24 22:03:29.738  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:5, TOPIC:test
2024-04-24 22:03:29.750  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:5
2024-04-24 22:03:34.740  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:6, TOPIC:test
2024-04-24 22:03:34.749  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:6
2024-04-24 22:03:39.741  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:7, TOPIC:test
2024-04-24 22:03:39.749  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:7
2024-04-24 22:03:44.743  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:8, TOPIC:test
2024-04-24 22:03:44.751  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:8
2024-04-24 22:03:49.743  INFO 78596 --- [nio-8080-exec-8] o.r.controller.MqttTestController        : 上报数据:data:9, TOPIC:test
2024-04-24 22:03:49.751  INFO 78596 --- [: test-consumer] org.redismysql.config.MqttConfig         : 接收到消息:data:9

4. 总结

本文我们主要:

  1. 搭建了MQTT消息集群,用于接收转发设备数据,同时实现连接的负载均衡和动态扩容
  2. 配置设备接入认证,保障接入的安全性。
  3. 模拟设备上报数据,订阅接收数据。