掘金 后端 ( ) • 2024-06-27 16:56

[!前言] 本项目采用spring boot框架,使用netty做网络应用程序框架。下面做全流程分析。项目源码地址:https://github.com/chenqi92/pc-electric-fence.git

报文协议

这个应该是厂商自家定义的报文协议,发过来就是一个txt,协议原文内容如下😂

一、通讯方式
  1. 串口方式: 波特率为9600bps,采用8-N-1格式
  2. 网络方式: PC端做TCP服务器,默认监听端口5000

二、数据包格式
  所有数据都以回车符为结束符,数据内容以空格分隔,所有数据都是有应答。
  
1. 应答(布撤防才会有成功和失败之说,其他的指令都回A 1)
  内容: A 结果
  结果: 0: 失败; 1: 成功 2: 执行成功
注:每条指令都需要回复A 1,并以回车键结束。
2. 心跳
  内容: H 设备编号 通讯方式 设备类型 
  设备编号: 为报警主机编号
  通讯方式: 0: 串口/网络; 2: GPRS
  设备类型: 0: 接警机; 1: 主机
  设备类型为非接警机时,设备跟PC连接建立后,立刻发送心跳包。
  心跳间隔默认为10秒。
举例:
  主机编号为0时:H 0 0 1
  主机编号为1时:H 1 0 1
3. 事件上传
  内容: E 主机编号 防区编号 事件代码 子系统号 月-日-时-分    
  主机编号: 接警机上报时,为接警机编号-通讯机编号-终端设备编号;主机上传时为用户编号。
  防区编号: 为0时,为非防区事件。

总线主机  事件代码参考如下:

      0:  防区布防事件
      1:  防区撤防事件
      2:  防区报警事件
      3:  防区报警恢复事件
      4:  设备被撬事件
      5:  设备被撬恢复事件
      6:  设备欠压事件
      7:  设备欠压恢复事件
      8:  设备连接故障事件
      9:  设备连接恢复事件
      10: 设备布防
      11: 设备撤防
      12: 设备挟持
      13: 通讯机连接故障事件
      14: 通讯机连接恢复事件
      15: 防区旁路事件
      16: 防区旁路恢复事件
2013.11.23
      17: 设备紧急事件
      18: 设备紧急恢复事件
      19: 设备火警事件
      20: 设备火警恢复事件
      21: 防区布防状态
      22: 防区撤防状态
      23: 防区未准备
      24: 防区未准备恢复
      25:留守布防
      26: 防区故障
      27:防区故障恢复
      28: 电话线故障
      29: 电话线故障恢复
      30: 电池故障
      31: 电池恢复
      32: 交流故障
      33: 交流恢复
      34: 围栏断线报警
      35:围栏短路报警
      36: 围栏电压子系统号:为围栏的电压值 (KV)  
      37:触网报警


围栏电压例子
  E 0001 1 36 50//1号主机的1号围栏 电压为5.0KV
      



  子系统号:可选字节,当主机有子系统时,带上子系统;没有时,不用带。


  E 6130-0-1 4 2 0 11-7-13-2 表示6130主机1号模块4防区在11月7号13点2分 
发生报警了

  主机编号: 接警机上报时,为接警机编号-通讯机编号-终端设备编号;
   6130-0-1  : 接警机是6130,即总线主机的系统地址, 
              通讯机编号是0: 表示通讯口1或2的设备,  通讯机编号是1: 表示键盘总线的设备, 
     终端设备编号: 当通讯机编号为0时, 终端设备编号 0-64, 其中0表示主机主板防区, 1-64表示扩展的1-64号设备
             当通讯机编程为1时,表示键盘设备, 128-160 ,128表示主键盘, 129-160表示1-31号分键盘

     4:表示防区4
     2: 表示防区报警事件  (具体请参考 总线主机  事件代码)
     0: 子系统号 固定为0
     11-7-13-2: 11月7号13点2分

 E 6130-1-128 0 11 0 11-7-13-3    表示 6130号主机的128号键盘(即主键盘)撤防操作在11月7号13点3分 


4. 主机控制
  内容: C 主机编号 防区编号 控制类型 密码
  防区编号: 为0时是设备控制,否则是防区控制
  控制类型: 0: 撤防 1: 布防 2: 旁路 3: 解除旁路 4:留守布防  5:高压布防  6:低压布防

  例如:主机编号为6130 ,  
C 6130-1-128 0 0 123456    表示 控制6130号主机的128号键盘(即主键盘)撤防
C 6130-1-128 0 1 123456    表示 控制6130号主机的128号键盘(即主键盘)布防

C 6130-0-1 0 1 123456    表示 控制6130号主机的1号设备布防
C 6130-0-1 1 1 123456    表示 控制6130号主机的1号设备防区1 布防

C 6130-0-1 0 0 123456    表示 控制6130号主机的1号设备撤防
C 6130-0-1 1 0 123456    表示 控制6130号主机的1号设备防区1 撤防


5. 输出点控制
  内容: O 主机编号 输出点类型 输出点号 控制类型 控制时间
  输出点类型: 0: 输出点 1: 灯   2: LED
  控制类型: 0:断开  1:闭合

O 6130-0-1 0 1 1 10    表示 控制6130号主机的1号设备的1号输出闭合10秒
O 6130-0-1 0 1 0 0      表示 控制6130号主机的1号设备的1号输出断开

协议接收考虑

对方给了一个现场如下配置 image.png

因为不可预知对方的主机类型,从报文来看最好的方式肯定是做一个TCP服务端来接收数据,让对方配合修改客户端的发送地址即可。

开发一个服务端

开发准备

因为服务器装的jdk1.8所有spring boot也不能选择3.0+,使用的maven作为依赖管理,pom内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lyc</groupId>
    <artifactId>pc-electric-fence</artifactId>
    <version>0.0.1</version>
    <name>pc-electric-fence</name>
    <description>电子围网报警数据接收</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>8</java.version>
        <influx.version>2.24</influx.version>
        <allbs-influx.version>2.0.2</allbs-influx.version>
        <netty.version>4.1.111.Final</netty.version>
        <spring-boot.version>2.7.18</spring-boot.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.version>3.8.1</maven.compiler.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.allbs</groupId>
            <artifactId>allbs-influx</artifactId>
            <version>${allbs-influx.version}</version>
        </dependency>
        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>${influx.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <profiles>
        <profile>
            <id>dev</id>
            <properties>
                <!-- 环境标识,需要与配置文件的名称相对应 -->
                <profiles.active>dev</profiles.active>
            </properties>
            <activation>
                <!-- 默认环境 -->
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <id>prod</id>
            <properties>
                <profiles.active>prod</profiles.active>
            </properties>
        </profile>
    </profiles>

    <build>
        <finalName>${project.name}-${project.version}</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>
        <pluginManagement>
            <plugins>
                <!--spring boot 默认插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring-boot.version}</version>
                    <configuration>
                        <executable>true</executable>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.version}</version>
                <configuration>
                    <target>${maven.compiler.target}</target>
                    <source>${maven.compiler.source}</source>
                    <encoding>UTF-8</encoding>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

其中allbs-influx这个包是我自己封装的处理influxDb存和取操作的,如果没有需要可以去除,包括下面的influxdb-java

项目结构如下

pc-electric-fence
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── lyc
│   │   │           └── pcelectricfence
│   │   │               ├── constant
│   │   │               │   └── CommonConstant.java
│   │   │               ├── enums
│   │   │               │   ├── CommunicationMode.java
│   │   │               │   ├── ControlType.java
│   │   │               │   ├── DeviceEnum.java
│   │   │               │   ├── DeviceType.java
│   │   │               │   ├── EventType.java
│   │   │               │   ├── OutputControlType.java
│   │   │               │   ├── OutputPointType.java
│   │   │               │   └── ResponseType.java
│   │   │               ├── netty
│   │   │               │   ├── NettyServer.java
│   │   │               │   ├── NettyServerHandlerInitializer.java
│   │   │               │   └── ProtocolHandler.java
│   │   │               ├── properties
│   │   │               │   └── NettyServerProperties.java
│   │   │               ├── utils
│   │   │               │   └── CommandParserUtil.java
│   │   │               └── PcElectricFenceApplication.java
│   │   └── resources
│   │       ├── application-dev.yml
│   │       ├── application-prod.yml
│   │       ├── application.yml
│   │       └── logback-spring.xml
│   └── test
│       └── java
│           └── com
│               └── lyc
│                   └── pcelectricfence
│                       └── PcElectricFenceApplicationTests.java
├── README.md
├── pom.xml
└── 协议.md

文件内容一览

CommonConstant

一个常量定义,只有influxdb需要储存的表名和日期格式化

public interface CommonConstant {

    /**
     * 电子围栏数据表
     */
    String INFLUXDB_DATABASE_MEASUREMENT = "cl_electronic_patrol_alarm";

    /**
     * 时间格式
     */
    String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";

    /**
     * 报警信息中的时间格式
     */
    String ALARM_DATETIME_PATTERN = "yyyy年MM月dd日HH时mm分";
}

所有的枚举

这个没有什么好说的就是根据报文协议抽出来的东西,下面就不列了,有需要的可以到文末的源码上去看

NettyServer

主要是用来启动项目时启动server监听指定端口,还有就是编码器和解码器加上接收数据处理的handler,详细代码为如下,基本每行都有注解,就不一一阐述了。

package com.lyc.pcelectricfence.netty;

import com.lyc.pcelectricfence.properties.NettyServerProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * 类 NettyServer
 *
 * @author ChenQi
 * @date 2024/6/20
 */
@Slf4j
@Component
public class NettyServer {

    @Resource
    private NettyServerProperties nettyServerProperties;

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

    /**
     * boss 线程组,用于服务端接受客户端的连接
     */
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    /**
     * worker 线程组,用于服务端接受客户端的数据读写
     */
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();

    @Resource
    private ProtocolHandler protocolHandler;

    /**
     * Netty Server Channel
     */
    private Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // 创建 ServerBootstrap 对象,用于 Netty Server 启动
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 作为分隔符的数据包,防止粘包,虽然报文说是回车分隔,实际上没有,所以未使用,下面的代码注释掉同理
//        ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
        serverBootstrap.group(bossGroup, workerGroup)
                // 指定 Channel 为服务端 NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                // 端口
                .localAddress(new InetSocketAddress(nettyServerProperties.getPort()))
                // 服务端接收队列的大小
                .option(ChannelOption.SO_BACKLOG, 1024)
                // TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 允许较小的数据包的发送,降低延迟
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // 空闲检测
                        ch.pipeline().addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS));
//                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(16 * 1024, false, delimiter));
                        // 解码器,因为客户端发送的是字符串所以直接用String即可,如果是其他的需要对应的解码
                        ch.pipeline().addLast(new StringDecoder());
                        // 跟编码器同理
                        ch.pipeline().addLast(new StringEncoder());
                        // 添加连接事件处理
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                log.info("{}远程客户端连接!", ctx.channel().remoteAddress());
                                super.channelActive(ctx);
                            }
                        });
                        // 处理收到报文的方法
                        ch.pipeline().addLast(protocolHandler);
                    }
                });

        // 绑定端口,并同步等待成功,即启动服务端
        ChannelFuture future = serverBootstrap.bind().sync();
        if (future.isSuccess()) {
            channel = future.channel();
            log.info("netty服务端已启动,启动端口为{}", nettyServerProperties.getPort());
        }
    }

    @PreDestroy
    public void stop() {
        log.info("{} 服务主动断开连接!", channel.localAddress());
        // 关闭 Netty Server
        if (channel != null) {
            channel.close();
        }
        // 优雅关闭两个 EventLoopGroup 对象
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

最后是处理的handler

这个类中包含的东西不多,只是集成SimpleChannelInboundHandler并重写channelRead0exceptionCaught。一个是用来处理报文的方法,一个是用来监听异常的方法,其他代码都是根据报文内容定制。

唯一需要注意的就是注解@ChannelHandler.Sharable这个注解的作用是标识一个 ChannelHandler 实例可以在多个ChannelPipeline 中共享使用。通常,ChannelHandler 实例是不可共享的,因为它们在处理不同的连接时可能会维护一些特定于连接的状态信息。而 @ChannelHandler.Sharable 的存在则表示这个 ChannelHandler 实例是无状态的,或者其状态对于多个连接是安全的,可以共享使用。简单点说就是为了让你这个服务端能够被多个客户端连上。

package com.lyc.pcelectricfence.netty;

import cn.allbs.influx.InfluxTemplate;
import com.lyc.pcelectricfence.enums.CommunicationMode;
import com.lyc.pcelectricfence.enums.DeviceType;
import com.lyc.pcelectricfence.enums.ResponseType;
import com.lyc.pcelectricfence.utils.CommandParserUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

import static com.lyc.pcelectricfence.constant.CommonConstant.INFLUXDB_DATABASE_MEASUREMENT;

/**
 * 类 ProtocolHandler
 *
 * @author ChenQi
 * @date 2024/6/20
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class ProtocolHandler extends SimpleChannelInboundHandler<String> {

    @Resource
    private InfluxTemplate influxTemplate;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("Received message: {}", msg);
        String[] parts = msg.trim().split(" ");
        switch (parts[0]) {
            case "A":
                handleResponse(ctx, parts);
                break;
            case "H":
                handleHeartbeat(ctx, parts);
                break;
            case "E":
                handleEventUpload(ctx, parts);
                break;
            case "C":
                handleControl(ctx, parts);
                break;
            case "O":
                handleOutputControl(ctx, parts);
                break;
            default:
                // Handle unknown command
                break;
        }
        // 发送回复消息
        ctx.writeAndFlush("A 1\r\n");
    }

    /**
     * 处理响应
     *
     * @param ctx   ChannelHandlerContext
     * @param parts 消息分割后的数组
     */
    private void handleResponse(ChannelHandlerContext ctx, String[] parts) {
        int result = Integer.parseInt(parts[1]);
        ResponseType responseType = ResponseType.values()[result];
        log.debug("Response: {}", responseType.getDescription());
    }

    /**
     * 处理心跳
     *
     * @param ctx   ChannelHandlerContext
     * @param parts 消息分割后的数组
     */
    private void handleHeartbeat(ChannelHandlerContext ctx, String[] parts) {
        int deviceNumber = Integer.parseInt(parts[1]);
        CommunicationMode communicationMode = CommunicationMode.values()[Integer.parseInt(parts[2])];
        DeviceType deviceType = DeviceType.values()[Integer.parseInt(parts[3])];
        log.debug("Heartbeat - Device Number: {}, Communication Mode: {}, Device Type: {}", deviceNumber, communicationMode.getDescription(), deviceType.getDescription());
    }

    /**
     * 处理事件上传
     *
     * @param ctx   ChannelHandlerContext
     * @param parts 消息分割后的数组
     */
    private void handleEventUpload(ChannelHandlerContext ctx, String[] parts) {
        // 事件上传,储存至influxDb
        String command = String.join(" ", parts);
        Map<String, Object> map = CommandParserUtil.parseEventUploadCommand(command);
        log.debug("Event Upload Command: {}", map);
        Map<String, String> tags = new HashMap<>();
        tags.put("type", "E");
        tags.put("typeName", "事件上传");
        // 储存至influxDb
        influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map);
    }

    /**
     * 处理控制
     *
     * @param ctx   ChannelHandlerContext
     * @param parts 消息分割后的数组
     */
    private void handleControl(ChannelHandlerContext ctx, String[] parts) {
        // 处理控制 TODO
        String command = String.join(" ", parts);
        Map<String, Object> map = CommandParserUtil.parseHostControlCommand(command);
        log.debug("Control Command: {}", map);
        Map<String, String> tags = new HashMap<>();
        tags.put("type", "C");
        tags.put("typeName", "主机控制");
        // 储存至influxDb
        influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map);
    }

    /**
     * 处理输出控制
     *
     * @param ctx   ChannelHandlerContext
     * @param parts 消息分割后的数组
     */
    private void handleOutputControl(ChannelHandlerContext ctx, String[] parts) {
        // 输出控制
        String command = String.join(" ", parts);
        Map<String, Object> map = CommandParserUtil.parseOutputControlCommand(command);
        log.debug("Output Control Command: {}", map);
        // 储存至influxDb
        Map<String, String> tags = new HashMap<>();
        tags.put("type", "O");
        tags.put("typeName", "输出点控制");
        influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException) {
            log.info("Read timeout occurred. {}", ctx.channel());
        } else {
            log.info(cause.getLocalizedMessage());
        }
        ctx.close();
    }
}

最后需要注意的问题

因为这个项目中并未使用spring-boot-start-web,也就是没有tomcat或者undertow等web框架,所以实际server.port本来设置的6767并未使用,后来我一想,不如直接当作netty server的监听地址吧,所以NettyServerProperties直接使用了本来配给netty server的5000端口作为监听端口,所以你会看到项目的md中有6767这个端口但又未实际使用。如果你的项目有web框架,肯定不能这么干,需要将netty sever重新配置一个另外的端口。