掘金 后端 ( ) • 2024-05-03 15:59

theme: healer-readable highlight: a11y-dark

1. 业务背景

我们的项目中设备数据接收系统从MQTT消息服务订阅到设备数据之后,需要推到大数据平台,以便于大数据平台进行实时计算生产数据报告,给用户展示。所以我们数据接收系统跟大数据平台通信我们需要使用消息队列来异步解耦。

现在主流的MQ有2种:Kafka和RocketMQ。毫无疑问我们项目接收的设备数据量大、频率快,同时后面要基于设备上报的数据做流式数据分析处理,很显然使用Kafka更合适。因为kafka更适合大数据量日志数据收集、流式处理分析,对接Flink等大数据处理工具分析处理生成报告,同时吞吐量高,适用场景比较符合我们的业务场景。RocketMQ适用于可靠性要求高,消息顺序和事务消息、延迟消息等业务处理场景。同时业界主流物联网设备数据收集分析处理在消息队列选型上面也是使用Kafka,所以最终我们选择Kafka作为我们业务场景中的消息队列。

2. 部署Kafka集群

我们部署Kraft模式集群。使用3个节点来部署集群,节点规划如下:

节点 ip 角色 节点1 192.168.56.200 controller,broker 节点2 192.168.56.202 controller,broker 节点3 192.168.56.203 controller,broker

先下载kafka,我们下载kafka_2.13-3.1.0.tgz这个版本。

  1. 修改配置文件
vi config/kraft/server.properties

主要配置内容如下:

  • 配置角色
  • 节点id
  • 配置controller列表
  • 配置log目录(数据存储目录)
  • 本节点监听器地址
  • 对客户端公布的地址

节点1的配置

# 节点角色
process.roles=broker,controller
 
# 节点ID,和节点所承担的角色想关联
node.id=1
 
# controller列表
[email protected]:9093,[email protected]:9093,[email protected]:9093
 
# 本节点监听器地址
listeners=PLAINTEXT://192.168.56.200:9092,CONTROLLER://192.168.56.200:9093
 
# 对客户端公布的地址
advertised.listeners=PLAINTEXT://192.168.56.200:9092

# 日志文件的路径
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas

节点2配置

process.roles=broker,controller
node.id=2
[email protected]:9093,[email protected]:9093,[email protected]:9093
listeners=PLAINTEXT://192.168.56.202:9092,CONTROLLER://192.168.56.202:9093
advertised.listeners=PLAINTEXT://192.168.56.202:9092
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas

节点3配置

process.roles=broker,controller
node.id=3
[email protected]:9093,[email protected]:9093,[email protected]:9093
listeners=PLAINTEXT://192.168.56.203:9092,CONTROLLER://192.168.56.203:9093
advertised.listeners=PLAINTEXT://192.168.56.203:9092
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas

最后在每个节点创建数据目录:/home/xiangguo/kafka_2.13-3.1.0/datas

  1. 格式化存储目录

生成唯一id

bin/kafka-storage.sh random-uuid

当然,我们需要安装jdk,不然执行上面这个脚本会报错。

# 查看Java相关套件
rpm -qa | grep java
# 显示如下
javapackages-tools-3.4.1-11.el7.noarch
python-javapackages-3.4.1-11.el7.noarch
tzdata-java-2024a-1.el7.noarch
java-1.8.0-openjdk-headless-1.8.0.412.b08-1.el7_9.x86_64

# 卸载
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.412.b08-1.el7_9.x86_64

# 系统自带版本已经被卸载
[root@xg-200 kafka_2.13-3.1.0]#  java -version
-bash: /usr/bin/java: 没有那个文件或目录

# 下载jdk8然后安装
jdk-8u361-linux-x64.tar.gz
# 解压到目录/usr/local/java/jdk1.8.0_361

配置环境变量

vi /etc/profile
# java环境变量
export JAVA_HOME=/usr/local/java/jdk1.8.0_361
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin

使修改生效

source /etc/profile

查看版本

[root@xg-200 xiangguo]# java -version
java version "1.8.0_361"
Java(TM) SE Runtime Environment (build 1.8.0_361-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.361-b09, mixed mode)

再次执行生产唯一id,看到生成了唯一id

[root@xg-200 kafka_2.13-3.1.0]# bin/kafka-storage.sh random-uuid
sQNj3XoUQv6BHx_KCpnx4w

使用此id格式化存储目录(每个节点都要执行)

bin/kafka-storage.sh format -t sQNj3XoUQv6BHx_KCpnx4w -c /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties

# 看到格式化完成
Formatting /home/xiangguo/kafka_2.13-3.1.0/datas

同理,在另外两个节点执行格式化。

  1. 配置kafka环境变量
vi /etc/profile
# KAFKA环境变量
export KAFKA_HOME=/home/xiangguo/kafka_2.13-3.1.0
export PATH=$PATH:$KAFKA_HOME/bin

使环境变量生效

source /etc/profile

另外两个节点,同理设置环境变量。

  1. 创建启动/停止脚本
vi /usr/bin/kafka

输入内容如下

#! /bin/bash

if [ $# -lt 1 ]; then
	echo "No Args Input..."
	exit
fi

case $1 in
"start") {
	for i in 192.168.56.200 192.168.56.202 192.168.56.203; do
		echo " --------启动 $i Kafka-------"
		ssh $i "source /etc/profile;kafka-server-start.sh -daemon /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties"
	done
} ;;
"stop") {
	for i in 192.168.56.200 192.168.56.202 192.168.56.203; do
		echo " --------停止 $i Kafka-------"
		ssh $i "source /etc/profile;kafka-server-stop.sh"
	done
} ;;
*)
	echo "Input Args Error..."
	;;
esac

然后添加执行权限

chmod +x /usr/bin/kafka

启动/停止集群

kafka start/stop

启动kafka节点之后,过段时间服务会挂掉,如下9092端口都没有了。

[root@xg-200 ~]# netstat -anptu | grep 9092
tcp6       0      0 192.168.56.200:9092     :::*                    LISTEN      8211/java
[root@xg-200 ~]# netstat -anptu | grep 9092

查看下kafka日志,排查下问题。我们不以demo方式启动以此来查看日志。

kafka-server-start.sh /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties

果然,我们发现了关键的报错信息。意思是我们的集群id不一致。

Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse

我们把各个节点的datas目录下面的meta.properties文件中的cluster.id都统一设置为节点1中的cluster.id。

# 节点3的配置
#Wed May 01 14:25:08 CST 2024
cluster.id=sQNj3XoUQv6BHx_KCpnx4w
version=1
node.id=3

然后再次启动3个节点,此时我们发现3个节点启动运行正常,也没有中途挂掉的问题。

[2024-05-02 09:35:40,712] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
[2024-05-02 09:35:40,728] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
[2024-05-02 09:35:40,728] INFO [Controller 1] QuorumController#beginShutdown: shutting down event queue. (org.apache.kafka.queue.KafkaEventQueue)
[2024-05-02 09:35:40,729] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Shutting down socket server (kafka.network.SocketServer)
[2024-05-02 09:35:40,766] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Shutdown completed (kafka.network.SocketServer)
[2024-05-02 09:35:40,767] INFO [data-plane Kafka Request Handler on Broker 1], shutting down (kafka.server.KafkaRequestHandlerPool)
[2024-05-02 09:35:40,770] INFO [data-plane Kafka Request Handler on Broker 1], shut down completely (kafka.server.KafkaRequestHandlerPool)
[2024-05-02 09:35:40,770] INFO [ExpirationReaper-1-AlterAcls]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,935] INFO [ExpirationReaper-1-AlterAcls]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,936] INFO [ExpirationReaper-1-AlterAcls]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,937] INFO [ThrottledChannelReaper-Fetch]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,433] INFO [ThrottledChannelReaper-Fetch]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,434] INFO [ThrottledChannelReaper-Fetch]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,434] INFO [ThrottledChannelReaper-Produce]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Produce]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Produce]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Request]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,435] INFO [ThrottledChannelReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,436] INFO [ThrottledChannelReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,436] INFO [ThrottledChannelReaper-ControllerMutation]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,436] INFO [ThrottledChannelReaper-ControllerMutation]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,437] INFO [ThrottledChannelReaper-ControllerMutation]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,437] INFO [Controller 1] closed event queue. (org.apache.kafka.queue.KafkaEventQueue)
[2024-05-02 09:35:44,445] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)

然后,我们停止掉服务,使用上面的启动脚本一键启动/停止kafka集群服务。

kafka start/stop

启动成功,端口9092已经起来。

[root@xg-200 datas]# kafka start
 --------启动 192.168.56.200 Kafka-------
[email protected]'s password:
 --------启动 192.168.56.202 Kafka-------
[email protected]'s password:
 --------启动 192.168.56.203 Kafka-------
[email protected]'s password:
[root@xg-200 datas]# netstat -anptu | grep 9092
tcp6       0      0 192.168.56.200:9092     :::*                    LISTEN      4804/java

另外2个节点服务也已经启动成功。

[root@xg-202 datas]#  netstat -anptu | grep 9092
tcp6       0      0 192.168.56.202:9092     :::*                    LISTEN      3421/java

3. 简单测试验证

首先,创建topic

kafka-topics.sh --create --topic test_topic --bootstrap-server 192.168.56.200:9092

查看topic

kafka-topics.sh --describe --topic test_topic --bootstrap-server 192.168.56.200:9092

查看到的信息如下

[root@xg-200 datas]# kafka-topics.sh --describe --topic test_topic --bootstrap-server 192.168.56.200:9092
Topic: test_topic       TopicId: dH-NJGShTPKLmrsU7Gw4jQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test_topic       Partition: 0    Leader: 3       Replicas: 3     Isr: 3

生产消息

kafka-console-producer.sh  --topic test_topic --bootstrap-server 192.168.56.200:9092
>hello
>aaa
>bbb
>ccc

消费消息

kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server 192.168.56.200:9092

hello
aaa
bbb
ccc

4. 与SpringBoot整合以及发布订阅

  1. 引入spring-kafka依赖
<!--kafka依赖-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  1. 配置
spring:
  kafka:
    # 指定kafka server的地址,集群中间,逗号隔开
    bootstrap-servers: 192.168.56.200:9092,192.168.56.202:9092,192.168.56.203:9092
    producer:
      # 消息重发的次数
      retries: 3
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

  1. 消息生产

使用异步回调,避免消息发送阻塞等待。

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    /**
     * 发送消息到topic
     */
    public static final String TOPIC_TEST = "topic.test";


    /**
     * 消费者组
     */
    public static final String CONSUMER_GROUP = "consumer.group";


    /**
     * 发送消息
     * @param msg
     */
    public void send(Object msg) {
        String msgString = JSONUtil.toJsonStr(msg);
        log.info("发送消息:{}", msgString);

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 发送失败的处理
                log.error("生产者发送消息失败,topic:{}, throwable: {}", TOPIC_TEST, throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                // 成功的处理
                log.info("生产者发送消息成功, topic:{}, SendResult:{}",TOPIC_TEST, stringObjectSendResult.toString());
            }
        });
    }
}
  1. 消息消费

从topic订阅接收、处理消息。

@Component
@Slf4j
public class KafkaConsumer {

    /**
     * 从topic订阅接收、处理数据
     * @param record
     * @param ack
     * @param topic
     */
    @KafkaListener(topics = TOPIC_TEST, groupId = CONSUMER_GROUP)
    public void handle(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者组:{} 从topic:{} 订阅接收、处理消息:{}", CONSUMER_GROUP, topic, msg);
            ack.acknowledge();
        }
    }

}
  1. 发布订阅验证

生产者发送测试数据

@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class AppTest {

    @Resource
    private KafkaProducer kafkaProducer;


    /**
     * kafka生产者发送消息
     */
    @Test
    public void sendMsg() throws InterruptedException {
        for(int i = 0; i<10; i++) {
            kafkaProducer.send("测试数据: " + i);
            Thread.sleep(1000);
        }
        
    }
}

消费到数据。

2024-05-03 10:34:03.034  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 0
2024-05-03 10:34:04.024  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 1
2024-05-03 10:34:05.030  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 2
2024-05-03 10:34:06.024  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 3
2024-05-03 10:34:07.029  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 4
2024-05-03 10:34:08.025  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 5
2024-05-03 10:34:09.029  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 6
2024-05-03 10:34:10.033  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 7
2024-05-03 10:34:11.030  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 8
2024-05-03 10:34:12.033  INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer          : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 9