掘金 后端 ( ) • 2024-06-21 10:00

在 RocketMQ 中,消费者消费消息的过程涉及多个步骤,其中包括从 ConsumeQueue 中获取消息索引,然后根据索引从 CommitLog 中读取消息。以下是详细的步骤说明:

output (2).png

消费者消费消息的详细过程

  1. 消费者拉取消息请求

    • 消费者向 Broker 发送拉取消息的请求,指定要消费的 Topic 和 Queue,以及消费的偏移量。
  2. Broker 处理拉取请求

    • Broker 接收到消费者的拉取请求后,根据请求的 Topic 和 Queue 定位到对应的 ConsumeQueue 文件。
  3. 从 ConsumeQueue 获取消息索引

    • Broker 根据消费者提供的消费偏移量,从 ConsumeQueue 文件中读取消息索引。ConsumeQueue 文件中的每条记录都是固定大小的(20 字节),包含以下信息:
      • 消息在 CommitLog 中的物理偏移量(8 字节)
      • 消息大小(4 字节)
      • 消息的 Tag hashcode(8 字节,用于消息过滤)
  4. 根据索引从 CommitLog 中读取消息

    • Broker 根据从 ConsumeQueue 中读取到的 CommitLog 偏移量和消息大小,从 CommitLog 文件中读取实际的消息数据。
  5. 返回消息给消费者

    • Broker 将从 CommitLog 中读取到的消息数据返回给消费者。

消息拉取示例

假设消费者要消费 TopicA 的 Queue0 中的消息,消费者提供的消费偏移量为 100。

消费过程详细步骤

  1. 消费者发送拉取请求

    • 消费者发送请求:
      拉取消息请求:
      - Topic: TopicA
      - QueueId: 0
      - Offset: 100
      
  2. Broker 定位到对应的 ConsumeQueue

    • Broker 根据 TopicA 和 QueueId 0 定位到对应的 ConsumeQueue 文件。
  3. 从 ConsumeQueue 读取消息索引

    • Broker 根据偏移量 100,从 ConsumeQueue 文件中读取第 100 条记录,假设读取到的索引信息如下:
      - CommitLog Offset: 12345678
      - Message Size: 128
      - Tag Hashcode: 987654321
      
  4. 从 CommitLog 读取消息

    • Broker 根据 CommitLog 偏移量 12345678 和消息大小 128,从 CommitLog 文件中读取实际的消息数据。
  5. 返回消息给消费者

    • Broker 将读取到的消息数据返回给消费者,消费者完成消息消费。

代码示例

以下是一个简化的 Java 代码示例,展示了如何从 ConsumeQueue 中读取消息索引并从 CommitLog 中读取消息的过程:

import java.nio.ByteBuffer;

public class MessageConsumer {
    public static void main(String[] args) {
        // 示例:读取 ConsumeQueue
        int queueOffset = 100;
        ByteBuffer consumeQueueBuffer = getConsumeQueueBuffer("TopicA", 0, queueOffset);
        
        // 从 ConsumeQueue 读取消息索引
        long commitLogOffset = consumeQueueBuffer.getLong();
        int messageSize = consumeQueueBuffer.getInt();
        long tagHashcode = consumeQueueBuffer.getLong();
        
        // 示例:从 CommitLog 读取消息
        ByteBuffer commitLogBuffer = getCommitLogBuffer(commitLogOffset, messageSize);
        byte[] messageBytes = new byte[messageSize];
        commitLogBuffer.get(messageBytes);
        
        // 处理消息
        String message = new String(messageBytes);
        System.out.println("Consumed Message: " + message);
    }

    private static ByteBuffer getConsumeQueueBuffer(String topic, int queueId, int offset) {
        // 模拟从 ConsumeQueue 文件中读取索引数据
        ByteBuffer buffer = ByteBuffer.allocate(20);
        buffer.putLong(12345678L);  // CommitLog Offset
        buffer.putInt(128);         // Message Size
        buffer.putLong(987654321L); // Tag Hashcode
        buffer.flip();
        return buffer;
    }

    private static ByteBuffer getCommitLogBuffer(long offset, int size) {
        // 模拟从 CommitLog 文件中读取消息数据
        ByteBuffer buffer = ByteBuffer.allocate(size);
        buffer.put("This is a test message".getBytes()); // 示例消息内容
        buffer.flip();
        return buffer;
    }
}

总结

  • 从 ConsumeQueue 读取索引:消费者的消费请求首先从 ConsumeQueue 文件中读取消息索引,这些索引包含消息在 CommitLog 中的物理偏移量和消息大小。
  • 从 CommitLog 读取消息:根据消息索引中的偏移量和大小,从 CommitLog 文件中读取实际的消息数据。
  • 返回消息给消费者:Broker 将读取到的消息数据返回给消费者进行消费。

这种设计使得消息的查找和读取效率非常高,充分利用了顺序读写的性能优势。


Topic、 QueueId 和 Offset

在 RocketMQ 中,消费者消费消息时,需要使用 QueueId 和 Offset 来确定从哪里开始拉取消息。以下是详细的步骤,说明 QueueId 和 Offset 是如何确定和获取的。

QueueId 的确定

当消费者开始消费一个 Topic 时,需要知道这个 Topic 下面有哪些 Queue。以下是步骤:

  1. 获取 Topic 下的 Queue 列表
    • 消费者启动时,会向 Broker 或 NameServer 请求获取指定 Topic 下的所有 Queue 列表。每个 Queue 都有一个唯一的 QueueId。

Offset 的获取

消费者需要知道每个 Queue 的消费进度(Offset),以便继续从上次消费的位置开始。Offset 的获取过程如下:

  1. 首次消费

    • CONSUME_FROM_LAST_OFFSET:如果消费者配置为从最新位置开始消费(默认配置),会从每个 Queue 的末尾开始消费。
    • CONSUME_FROM_FIRST_OFFSET:如果消费者配置为从最早位置开始消费,会从每个 Queue 的开头开始消费。
    • CONSUME_FROM_TIMESTAMP:还可以配置为从指定时间点开始消费。
  2. 非首次消费

    • 存储在 Broker 端的消费进度:RocketMQ 提供了一个集群模式,在该模式下,消费者的消费进度(Offset)是由 Broker 端管理的(consumerOffset.json)。消费者启动时,会从 Broker 端获取上次消费的进度。
    • 存储在本地的消费进度:在某些情况下,消费者也可以将消费进度存储在本地磁盘或数据库中。消费者启动时,从本地存储中读取上次消费的进度。

消费消息的过程

以下是详细步骤,描述了消费者如何确定 QueueId 和 Offset,并消费消息:

  1. 启动消费者

    • 消费者启动时,会向 NameServer 请求获取指定 Topic 下的所有 Queue 列表。
    • 消费者会根据配置或本地存储获取每个 Queue 的消费进度(Offset)。
  2. 发送拉取消息请求

    • 消费者根据获取到的 QueueId 和 Offset,向 Broker 发送拉取消息的请求。
    • 请求包含要拉取的 Topic、QueueId 和起始 Offset。
  3. Broker 处理请求

    • Broker 接收到请求后,根据 Topic 和 QueueId 定位到对应的 ConsumeQueue 文件。
    • 根据 Offset 从 ConsumeQueue 文件中读取消息索引。
    • 根据消息索引中的 CommitLog 偏移量,从 CommitLog 文件中读取实际的消息数据。
  4. 返回消息给消费者

    • Broker 将读取到的消息数据返回给消费者。
  5. 更新消费进度

    • 消费者处理完消息后,会更新本地存储的消费进度。
    • 定期或在某些条件下(例如消费批次结束),消费者会将最新的消费进度同步到 Broker。

代码示例

以下是一个简化的 Java 代码示例,展示了消费者如何确定 QueueId 和 Offset,并消费消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.Set;

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 订阅 Topic
        consumer.subscribe("TopicA", "*");

        // 获取 Topic 下的 Queue 列表
        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicA");
        for (MessageQueue mq : messageQueues) {
            System.out.println("QueueId: " + mq.getQueueId());
        }

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 获取 QueueId 和 Offset
                    int queueId = msg.getQueueId();
                    long offset = msg.getQueueOffset();
                    
                    // 处理消息
                    String messageBody = new String(msg.getBody());
                    System.out.println("QueueId: " + queueId + ", Offset: " + offset + ", Message: " + messageBody);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

总结

  • QueueId 确定:消费者启动时,通过向 NameServer 或 Broker 请求获取指定 Topic 下的所有 Queue 列表,每个 Queue 有唯一的 QueueId。
  • Offset 获取:消费者启动时,从 Broker 或本地存储中获取每个 Queue 的消费进度(Offset),首次消费可以从最早或最新位置开始。
  • 消费过程:消费者根据 QueueId 和 Offset 向 Broker 发送拉取消息请求,Broker 返回消息数据,消费者处理消息并更新消费进度。

通过这种设计,RocketMQ 实现了高效的消息消费机制,保证了消息的可靠传递和高效处理。

consumerOffset.json

在 RocketMQ 中,存储在 Broker 端的消费进度是存储在一个名为 consumerOffset.json 的文件中。这个文件存储了消费者组的消费进度,用于记录每个消费组在每个队列中的消费位置(即 Offset)。

消费进度文件的位置和结构

文件位置

consumerOffset.json 文件通常存储在 Broker 的 store/config 目录下。例如,假设 RocketMQ 的存储目录为 /home/rocketmq/store,那么消费进度文件的位置就是:

/home/rocketmq/store/config/consumerOffset.json

文件结构

consumerOffset.json 文件采用 JSON 格式来存储消费进度信息。文件的结构大致如下:

{
    "offsetTable": {
        "groupA@TopicA": {
            "0": 100,
            "1": 200
        },
        "groupB@TopicB": {
            "0": 150,
            "1": 250
        }
    }
}
  • offsetTable:是一个包含所有消费组和 Topic 的对象。
  • 每个键是由消费组和 Topic 组成的字符串,格式为 groupName@topicName
  • 每个值是一个包含队列 ID 和对应消费进度(Offset)的对象。

消费进度的更新

消费者在消费消息时,会不断更新消费进度,并定期将最新的消费进度同步到 Broker。更新和同步过程如下:

  1. 消费消息:消费者拉取并成功消费消息后,更新本地的消费进度。
  2. 定期同步:消费者会定期(默认每 5 秒)将本地的消费进度同步到 Broker。
  3. Broker 更新文件:Broker 接收到消费者的进度同步请求后,会将新的消费进度写入 consumerOffset.json 文件中。

示例代码

以下是一个简化的 Java 代码示例,展示了消费者如何更新和同步消费进度:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 订阅 Topic
        consumer.subscribe("TopicA", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 获取 QueueId 和 Offset
                    int queueId = msg.getQueueId();
                    long offset = msg.getQueueOffset();
                    
                    // 处理消息
                    String messageBody = new String(msg.getBody());
                    System.out.println("QueueId: " + queueId + ", Offset: " + offset + ", Message: " + messageBody);

                    // 更新消费进度
                    // 在实际实现中,RocketMQ 会自动管理和同步消费进度
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

在实际应用中,RocketMQ 会自动管理和同步消费进度,开发者无需手动处理进度更新和同步。

总结

  • 存储位置:Broker 端的消费进度存储在 store/config/consumerOffset.json 文件中。
  • 文件结构:采用 JSON 格式,包含每个消费组和 Topic 的消费进度。
  • 进度更新:消费者消费消息后会更新本地进度,并定期同步到 Broker,Broker 会将进度写入 consumerOffset.json 文件。

通过这种机制,RocketMQ 能够有效管理和追踪消费者的消费进度,保证消息的可靠传递和消费。