掘金 后端 ( ) • 2024-04-24 10:15

省流

该文包含rocketMQ消息怎么流转。内容包含 namesrv如何维护topic和broker的关系,收到消息怎么写入到CommitLog,内容又长又臭,需要找什么内容直接根据目录索引定位

上一篇内容为rocketmq简介以及生产者消费者在本地如何工作

下一篇内容为消费者如何消费消息,包含poll和push模式

rocketmq

在此之前我们先clone一下 RocketMQ的源码

根据官网的部署架构图,我们知道在rocketmq是以nameserver来管理调度broker和提供topic路由,我们先来看看nameserver

nameserver

启动nameserver

image.png

我们在rocketmq的源码包中可以看到,namesrv包的main入口在org.apache.rocketmq.namesrv.NamesrvStartup

public static void main(String[] args) {
    main0(args);
    controllerManagerMain();
}

public static NamesrvController main0(String[] args) {
    try {
        //读取命令参数和args后配置给nettyServer和nettyClient,server的端口强制为 9876
        parseCommandlineAndConfigFile(args);
        //创建NameServer控制器
        NamesrvController controller = createAndStartNamesrvController();
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

public static ControllerManager controllerManagerMain() {
    try {
        if (namesrvConfig.isEnableControllerInNamesrv()) {
            return createAndStartControllerManager();
        }
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}


public static NamesrvController createAndStartNamesrvController() throws Exception {

    //构建Nameserver 控制器实例
    NamesrvController controller = createNamesrvController();
    //启动
    start(controller);
    //输出相关信息
    NettyServerConfig serverConfig = controller.getNettyServerConfig();
    String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());
    log.info(tip);
    System.out.printf("%s%n", tip);
    return controller;
}

public static NamesrvController createNamesrvController() {

    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    //初始化控制器
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    //处理中断
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));

    //启动server并链接到别的 nameserver
    controller.start();

    return controller;
}

可以看到启动过程是先构建nameserver控制器,然后调用其初始化方法#initialize,再调用其#start,我们先来看看初始化方法

public boolean initialize() {
    //从文件中加载json配置  ->  {user.home}\namesrv\kvConfig.json
    loadConfig();
    //初始化netty-client,netty-server容器,并配置broker控制器
    initiateNetworkComponents();
    //初始化线程池
    initiateThreadExecutors();
    //注册处理器,根据是否集群进行调整
    registerProcessor();
    //开始定时器,主要的是根据配置的时间扫描不活跃的Broker、每隔10分钟打印一次配置和每秒打印监控内容
    startScheduleService();
    //初始化ssl上下文
    initiateSslContext();
    //初始化rpc,主要用来处理topic
    initiateRpcHooks();
    return true;
}

配置好对应的server后,我们来看看start做了什么

public void start() throws Exception {
    //启动远程Server
    this.remotingServer.start();

    // 处理监听的端口
    if (0 == nettyServerConfig.getListenPort()) {
        nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
    }

    //记录远程服务器地址
    this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
        + ":" + nettyServerConfig.getListenPort()));
    //启动远程客户端
    this.remotingClient.start();

    //ssl
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    //启动路由管理器
    this.routeInfoManager.start();
}

在启动过程中,做的是一些依赖组件的初始化和netty服务的启动

注册处理器

#initialize方法中调用了#registerProcessor()方法

private void registerProcessor() {
    if (namesrvConfig.isClusterTest()) {

        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);
    } else {
        // Support get route info only temporarily  ,暂时支持获取路由信息 for release-5.2.0 version
        //注册到业务[processorTable]请求处理器,ClientRequestProcessor:处理客户端请求, 目前包含获取路由信息
        ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);

        //注册到默认[defaultRequestProcessorPair,processorTable无对应的处理器时,使用该处理器]请求处理器,DefaultRequestProcessor:处理其余Namesrv的请求
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
    }
}

我们可以看到的是,都是调用remotingServer对象的注册处理器方法registerProcessor方法和注册默认处理器方法registerDefaultProcessor,先来看看registerProcessor

public interface RemotingServer extends RemotingService {

    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

在NettyRemotingServer中的实现是

@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    //如果没配置执行池,就使用公用的线程池
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }
    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis);
    //根据请求code注册处理器
    this.processorTable.put(requestCode, pair);
}

而registerDefaultProcessor方法则是直接构建处理器对象复制给defaultRequestProcessorPair属性

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessorPair = new Pair<>(processor, executor);
}

处理请求

我们在remotingServer的start()方法中,配置nettyServer服务的childHandler时加入了这些handler

/**
 * config channel in ChannelInitializer
 *
 * @param ch the SocketChannel needed to init
 * @return the initialized ChannelPipeline, sub class can use it to extent in the future
 */
protected ChannelPipeline configChannel(SocketChannel ch) {
    return ch.pipeline()
        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
        .addLast(defaultEventExecutorGroup,
            encoder,
            new NettyDecoder(),
            distributionHandler,
            new IdleStateHandler(0, 0,
                nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
            connectionManageHandler,
            serverHandler
        );
}

其中,IdleStateHandler和connectionManageHandler用于处理链接的channel,剔除长期闲置的链接,处理请求我们需要来看看serverHandler,serverHandler在prepareSharableHandlers时初始化

private void prepareSharableHandlers() {
    tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
    encoder = new NettyEncoder();
    connectionManageHandler = new NettyConnectManageHandler();
    serverHandler = new NettyServerHandler();
    distributionHandler = new RemotingCodeDistributionHandler();
}

NettyServerHandler是NettyRemotingServer的内部类

@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
        //根据端口找到对应的服务器
        int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
        NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
        if (localPort != -1 && remotingAbstract != null) {
            //掉调用服务处理消息
            remotingAbstract.processMessageReceived(ctx, msg);
            return;
        }
        //如果没找到端口对应的服务 则关闭通道
        // The related remoting server has been shutdown, so close the connected channel
        RemotingHelper.closeChannel(ctx.channel());
    }

    //处理通道读写状态变化
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        if (channel.isWritable()) {
            if (!channel.config().isAutoRead()) {
                channel.config().setAutoRead(true);
                log.info("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}",
                    RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable());
            }
        } else {
            channel.config().setAutoRead(false);
            log.warn("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}",
                RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable());
        }
        super.channelWritabilityChanged(ctx);
    }
}

具体的消息处理processMessageReceived

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) {
    if (msg != null) {
        switch (msg.getType()) {
            //请求命令
            case REQUEST_COMMAND:
                processRequestCommand(ctx, msg);
                break;
            //结果命令
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, msg);
                break;
            default:
                break;
        }
    }
}

我们先看一眼结果命令,主要的是从table里取future后再进行处理

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    //根据请求序列号获取结果
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);

        responseTable.remove(opaque);

        //处理回调
        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, cmd={}, but not matched any request, address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    }
}

请求处理则是

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根据code获取处理器
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    //如果code没有对应的处理器则采用默认处理器
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessorPair : matched;
    //请求序列号
    final int opaque = cmd.getOpaque();
    //处理器为空时,返回请求不受支持的错误
    if (pair == null) {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        writeResponse(ctx.channel(), cmd, response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        return;
    }
    //构建处理处理线程
    Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);

    if (isShuttingDown.get()) {
        if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
                "please go away");
            response.setOpaque(opaque);
            writeResponse(ctx.channel(), cmd, response);
            return;
        }
    }

    //判断是否拒绝请求
    if (pair.getObject1().rejectRequest()) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
            "[REJECTREQUEST]system busy, start flow control for a while");
        response.setOpaque(opaque);
        writeResponse(ctx.channel(), cmd, response);
        return;
    }

    try {
        final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
        // 异步去处理请求
        pair.getObject2().submit(requestTask);
    } catch (RejectedExecutionException e) {
        if ((System.currentTimeMillis() % 10000) == 0) {
            log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                + ", too many requests and system thread pool busy, RejectedExecutionException "
                + pair.getObject2().toString()
                + " request code: " + cmd.getCode());
        }

        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
            "[OVERLOAD]system busy, start flow control for a while");
        response.setOpaque(opaque);
        writeResponse(ctx.channel(), cmd, response);
    } catch (Throwable e) {
        AttributesBuilder attributesBuilder = RemotingMetricsManager.newAttributesBuilder()
            .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(cmd.getCode()))
            .put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED);
        RemotingMetricsManager.rpcLatency.record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
    }
}

来看看request处理线程的run方法做了什么

private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingCommand cmd,
    Pair<NettyRequestProcessor, ExecutorService> pair, int opaque) {
    return () -> {
        Exception exception = null;
        RemotingCommand response;

        try {
            //处理rpcHooks前置
            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            try {
                doBeforeRpcHooks(remoteAddr, cmd);
            } catch (AbortProcessException e) {
                throw e;
            } catch (Exception e) {
                exception = e;
            }

            //没报错的话,调用处理器处理请求
            if (exception == null) {
                response = pair.getObject1().processRequest(ctx, cmd);
            } else {
                //返回错误
                response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, null);
            }

            //处理rpcHooks后置
            try {
                doAfterRpcHooks(remoteAddr, cmd, response);
            } catch (AbortProcessException e) {
                throw e;
            } catch (Exception e) {
                exception = e;
            }

            if (exception != null) {
                throw exception;
            }

            writeResponse(ctx.channel(), cmd, response);
        } catch (AbortProcessException e) {
            response = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
            response.setOpaque(opaque);
            writeResponse(ctx.channel(), cmd, response);
        } catch (Throwable e) {
            log.error("process request exception", e);
            log.error(cmd.toString());

            if (!cmd.isOnewayRPC()) {
                response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                    UtilAll.exceptionSimpleDesc(e));
                response.setOpaque(opaque);
                writeResponse(ctx.channel(), cmd, response);
            }
        }
    };
}

处理请求的整理流程是

netty接收到对应命令,根据reqeustCode获取对应的处理器,如果没有处理器则使用默认处理器,构建处理线程处理请求,提交给处理器线程池调度,处理完成后写入到对应的ChannelHandlerContext中。

topic和broker以及queue之间的关系管理

在DefaultRequestProcessor的processRequest方法中,定义了一些默认的请求处理方式

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        //设置配置
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        //读取配置
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        //删除配置
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        //数据版本
        case RequestCode.QUERY_DATA_VERSION:
            return this.queryBrokerTopicConfig(ctx, request);
        //注册Broker
        case RequestCode.REGISTER_BROKER:
            return this.registerBroker(ctx, request);
        //注销broker
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        //broker心跳
        case RequestCode.BROKER_HEARTBEAT:
            return this.brokerHeartbeat(ctx, request);
        //获取broker成员组
        case RequestCode.GET_BROKER_MEMBER_GROUP:
            return this.getBrokerMemberGroup(ctx, request);
        //broker集群信息
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.ADD_WRITE_PERM_OF_BROKER:
            return this.addWritePermOfBroker(ctx, request);
        //获取所有topic列表
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return this.getAllTopicListFromNameserver(ctx, request);
        //删除topic信息
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return this.deleteTopicInNamesrv(ctx, request);
        //注册topic
        case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
            return this.registerTopicToNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        //获取集群topic
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            String error = " request type " + request.getCode() + " not supported";
            return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
    }
}

topic管理

根据code名可以看到,注册topic时,调用了本地的registerTopicToNamesrv方法

case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
    return this.registerTopicToNamesrv(ctx, request);


private RemotingCommand registerTopicToNamesrv(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    final RegisterTopicRequestHeader requestHeader =
        (RegisterTopicRequestHeader) request.decodeCommandCustomHeader(RegisterTopicRequestHeader.class);

    TopicRouteData topicRouteData = TopicRouteData.decode(request.getBody(), TopicRouteData.class);
    if (topicRouteData != null && topicRouteData.getQueueDatas() != null && !topicRouteData.getQueueDatas().isEmpty()) {
        this.namesrvController.getRouteInfoManager().registerTopic(requestHeader.getTopic(), topicRouteData.getQueueDatas());
    }

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

具体的是调用了RouteInfoManagerpublic void registerTopic(final String topic, List<QueueData> queueDatas) 方法,在构建NamesrvController时,初始化了RouteInfoManager

private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
    this.topicQueueTable = new ConcurrentHashMap<>(1024);
    this.brokerAddrTable = new ConcurrentHashMap<>(128);
    this.clusterAddrTable = new ConcurrentHashMap<>(32);
    this.brokerLiveTable = new ConcurrentHashMap<>(256);
    this.filterServerTable = new ConcurrentHashMap<>(256);
    this.topicQueueMappingInfoTable = new ConcurrentHashMap<>(1024);
    this.unRegisterService = new BatchUnregistrationService(this, namesrvConfig);
    this.namesrvConfig = namesrvConfig;
    this.namesrvController = namesrvController;
}

registerTopic的具体实现如下:

public void registerTopic(final String topic, List<QueueData> queueDatas) {
    if (queueDatas == null || queueDatas.isEmpty()) {
        return;
    }

    try {
        //加锁
        this.lock.writeLock().lockInterruptibly();
        //如果table里有这个topic则更新他的队列信息
        if (this.topicQueueTable.containsKey(topic)) {
            //取出原有的记录
            Map<String, QueueData> queueDataMap  = this.topicQueueTable.get(topic);
            for (QueueData queueData : queueDatas) {
                // 校验queue所在的broker是否合法
                if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
                    log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
                    return;
                }
                //更新table
                queueDataMap.put(queueData.getBrokerName(), queueData);
            }
            log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
        } else {
            // 校验queue所在的broker是否合法
            Map<String, QueueData> queueDataMap = new HashMap<>();
            for (QueueData queueData : queueDatas) {
                if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
                    log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
                    return;
                }
                //构建队列信息
                queueDataMap.put(queueData.getBrokerName(), queueData);
            }

            //写入topic table
            this.topicQueueTable.put(topic, queueDataMap);
            log.info("Register topic route:{}, {}", topic, queueDatas);
        }
    } catch (Exception e) {
        log.error("registerTopic Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
}

在这一步,根据发送上来的请求会对nameserver的topicQueueTable进行更新。还有另外一个操作请求code

//删除topic信息
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
    return this.deleteTopicInNamesrv(ctx, request);

具体的是

private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final DeleteTopicFromNamesrvRequestHeader requestHeader =
        (DeleteTopicFromNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicFromNamesrvRequestHeader.class);

    if (requestHeader.getClusterName() != null
        && !requestHeader.getClusterName().isEmpty()) {
        this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic(), requestHeader.getClusterName());
    } else {
        this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
    }

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

在没有传递clusterName的时候执行如下代码

public void deleteTopic(final String topic) {
    try {
        //加写锁
        this.lock.writeLock().lockInterruptibly();
        //删除
        this.topicQueueTable.remove(topic);
    } catch (Exception e) {
        log.error("deleteTopic Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
}

如若传递了clusterName则进行这些操作

public void deleteTopic(final String topic, final String clusterName) {
    try {
        //加写锁
        this.lock.writeLock().lockInterruptibly();
        //获取clusterName的所有broker
        Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
        if (brokerNames == null || brokerNames.isEmpty()) {
            return;
        }
        //获得这个topic绑定的所有队列
        Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
        if (queueDataMap != null) {
            for (String brokerName : brokerNames) {
                //删除队列
                final QueueData removedQD = queueDataMap.remove(brokerName);
                if (removedQD != null) {
                    log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic, removedQD);
                }
            }
            if (queueDataMap.isEmpty()) {
                log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);
                //删除topic
                this.topicQueueTable.remove(topic);
            }
        }
    } catch (Exception e) {
        log.error("deleteTopic Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
}

broker管理

在DefaultRequestProcessor中,处理broker的命令如下

//注册Broker
case RequestCode.REGISTER_BROKER:
    return this.registerBroker(ctx, request);

registerBroker方法如下:

public RemotingCommand registerBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    //构建response,
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
    final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
    final RegisterBrokerRequestHeader requestHeader =
        (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

    //crc32校验
    if (!checksum(ctx, request, requestHeader)) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("crc32 not match");
        return response;
    }

    TopicConfigSerializeWrapper topicConfigWrapper = null;
    List<String> filterServerList = null;

    //计算broker版本
    Version brokerVersion = MQVersion.value2Version(request.getVersion());


    //根据broker版本解包请求计算出topic配置以及过滤的服务器列表
    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
        final RegisterBrokerBody registerBrokerBody = extractRegisterBrokerBodyFromRequest(request, requestHeader);
        topicConfigWrapper = registerBrokerBody.getTopicConfigSerializeWrapper();
        filterServerList = registerBrokerBody.getFilterServerList();
    } else {
        // RegisterBrokerBody of old version only contains TopicConfig.
        topicConfigWrapper = extractRegisterTopicConfigFromRequest(request);
    }

    //注册broker
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        request.getExtFields().get(MixAll.ZONE_NAME),
        requestHeader.getHeartbeatTimeoutMillis(),
        requestHeader.getEnableActingMaster(),
        topicConfigWrapper,
        filterServerList,
        ctx.channel()
    );

    //校验并生成对应的response
    if (result == null) {
        // Register single topic route info should be after the broker completes the first registration.
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("register broker failed");
        return response;
    }

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    //是否返回topic配置
    if (this.namesrvController.getNamesrvConfig().isReturnOrderTopicConfigToBroker()) {
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);
    }

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

来看看具体的注册方法 RouteInfoManager#registerBroker

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final String zoneName,
    final Long timeoutMillis,
    final Boolean enableActingMaster,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        //加写锁
        this.lock.writeLock().lockInterruptibly();

        //初始化或更新集群信息
        Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
        brokerNames.add(brokerName);

        //是否首次注册
        boolean registerFirst = false;
        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        if (null == brokerData) {
            registerFirst = true;
            brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
            this.brokerAddrTable.put(brokerName, brokerData);
        }

        //判断是否旧broker
        boolean isOldVersionBroker = enableActingMaster == null;
        brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
        brokerData.setZoneName(zoneName);

        //取出broker对应的地址map
        Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

        //计算最小brokerId是否变化
        boolean isMinBrokerIdChanged = false;
        long prevMinBrokerId = 0;
        if (!brokerAddrsMap.isEmpty()) {
            prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
        }

        if (brokerId < prevMinBrokerId) {
            isMinBrokerIdChanged = true;
        }

        //主从切换: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
        //去除重复的ip:port
        brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());

        //校验数据版本是否
        String oldBrokerAddr = brokerAddrsMap.get(brokerId);
        if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
            BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));

            if (null != oldBrokerInfo) {
                long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
                long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
                if (oldStateVersion > newStateVersion) {
                    log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                            "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                        clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                    //从live的broker列表中移除被拒绝的broker地址
                    brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                    return result;
                }
            }
        }

        if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
            log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
                topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
            return null;
        }

        String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
        registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

        //是否主节点
        boolean isMaster = MixAll.MASTER_ID == brokerId;

        //是否组要的从节点,条件是: 新版本broker+非主+最小的brokerId
        boolean isPrimeSlave = !isOldVersionBroker && !isMaster
            && brokerId == Collections.min(brokerAddrsMap.keySet());

        if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {

            //topic 列
            ConcurrentMap<String, TopicConfig> tcTable =
                topicConfigWrapper.getTopicConfigTable();

            if (tcTable != null) {
                //转译
                TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
                Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();

                // 从当前broker中删除tcTable中不存在的topic,当前不支持静态主题
                if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
                    final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
                    final Set<String> newTopicSet = tcTable.keySet();
                    final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
                    for (final String toDeleteTopic : toDeleteTopics) {
                        Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
                        final QueueData removedQD = queueDataMap.remove(brokerName);
                        if (removedQD != null) {
                            log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
                        }

                        if (queueDataMap.isEmpty()) {
                            log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
                            topicQueueTable.remove(toDeleteTopic);
                        }
                    }
                }

                //处理topic下的队列
                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                    if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                        topicConfigWrapper.getDataVersion(), brokerName,
                        entry.getValue().getTopicName())) {
                        final TopicConfig topicConfig = entry.getValue();
                        // 主从切换时,Namesrv会将brokerId最小的Slave视为“代理”Master,并将brokerPermission修改为只读。
                        if (isPrimeSlave && brokerData.isEnableActingMaster()) {
                            //擦除只读状态
                            topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                        }
                        //队列信息创建or更新
                        this.createAndUpdateQueueData(brokerName, topicConfig);
                    }
                }

                //计算broker的topic是否变更
                if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                    //the topicQueueMappingInfoMap should never be null, but can be empty
                    for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                        //如果topic不存在,就新建个空map
                        if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                            topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                        }
                        //Note asset brokerName equal entry.getValue().getBname()
                        //插入新值
                        topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                    }
                }
            }
        }

        //构建broker信息并插入live列表
        BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
        BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
            new BrokerLiveInfo(
                System.currentTimeMillis(),
                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
                topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
        if (null == prevBrokerLiveInfo) {
            log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
        }

        if (filterServerList != null) {
            if (filterServerList.isEmpty()) {
                this.filterServerTable.remove(brokerAddrInfo);
            } else {
                this.filterServerTable.put(brokerAddrInfo, filterServerList);
            }
        }

        //如果不是主节点,就返回主节点信息和hd地址信息
        if (MixAll.MASTER_ID != brokerId) {
            String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
            if (masterAddr != null) {
                BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
                BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
                if (masterLiveInfo != null) {
                    result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                    result.setMasterAddr(masterAddr);
                }
            }
        }

        if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
            notifyMinBrokerIdChanged(brokerAddrsMap, null,
                this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }

    return result;
}

这一步主要的是根据注册请求去更新对应的topic-broker-queue的关系,我们在上一篇中知道,创建client会去同步topic的路由信息

获取topic route

在Namesrv的#initialize方法中,调用#registerProcessor时,注册了RequestCode.GET_ROUTEINFO_BY_TOPIC的处理器ClientRequestProcessor,ClientRequestProcessor实现了NettyRequestProcessor,来看看他的#processRequest方法

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
    final RemotingCommand request) throws Exception {
    return this.getRouteInfoByTopic(ctx, request);
}

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    //计算nameserver是否启动完成
    boolean namesrvReady = needCheckNamesrvReady.get() && System.currentTimeMillis() - startupTimeMillis >= TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());

    if (namesrvController.getNamesrvConfig().isNeedWaitForService() && !namesrvReady) {
        log.warn("name server not ready. request code {} ", request.getCode());
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("name server not ready");
        return response;
    }

    //打包路由信息
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {
        //路由信息获取成功,不再需要去人namesrv是否ready
        if (needCheckNamesrvReady.get()) {
            needCheckNamesrvReady.set(false);
        }

        //获取顺序消息的topic
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        //根据版本组装返回内容
        byte[] content;
        Boolean standardJsonOnly = Optional.ofNullable(requestHeader.getAcceptStandardJsonOnly()).orElse(false);
        if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || standardJsonOnly) {
            content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
                SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                SerializerFeature.MapSortField);
        } else {
            content = topicRouteData.encode();
        }

        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

从源码中,可以得知是具体的操作是根据topic获取routeData,并去处理相关配置,来看看pickupTopicRouteData做了什么

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    List<BrokerData> brokerDataList = new LinkedList<>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try {
        this.lock.readLock().lockInterruptibly();
        //取出topic对应的队列
        Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
        if (queueDataMap != null) {
            //写入队列信息
            topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
            foundQueueData = true;

            Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());

            //计算broker信息和过滤的服务器信息
            for (String brokerName : brokerNameSet) {
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    continue;
                }
                BrokerData brokerDataClone = new BrokerData(brokerData);

                brokerDataList.add(brokerDataClone);
                foundBrokerData = true;
                if (filterServerTable.isEmpty()) {
                    continue;
                }
                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                    BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
                    List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
                    filterServerMap.put(brokerAddr, filterServerList);
                }

            }
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    } finally {
        this.lock.readLock().unlock();
    }

    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

    //如果找到broker和queue信息
    if (foundBrokerData && foundQueueData) {

        //设置topic对应的队列信息
        topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));

        if (!namesrvConfig.isSupportActingMaster()) {
            return topicRouteData;
        }

        if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
            return topicRouteData;
        }

        if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
            return topicRouteData;
        }

        boolean needActingMaster = false;

        //计算是否缺失master节点的信息
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            if (brokerData.getBrokerAddrs().size() != 0
                && !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                needActingMaster = true;
                break;
            }
        }

        if (!needActingMaster) {
            return topicRouteData;
        }

        //找出master
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
            if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
                continue;
            }

            // No master
            for (final QueueData queueData : topicRouteData.getQueueDatas()) {
                if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
                    if (!PermName.isWriteable(queueData.getPerm())) {
                        final Long minBrokerId = Collections.min(brokerAddrs.keySet());
                        final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
                        brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
                    }
                    break;
                }
            }

        }

        return topicRouteData;
    }

    return null;
}

这个方法从topicQueueTable中取出对应的队列信息,去计算对应的主节点和要过滤的节点,然后打包成对应的数据结构后返回,在这一步就可以拿到topic对应的队列的路由信息

接收消息

broker

image.png

图来源RocketMQ 4.X 文档初识RocketMQ

我们从所了解的内容知道,RocketMQ的最小服务节点是Broker,Quqeue是Broker提供的服务能力,要想知道消息如何存储,我们从broker入手

image.png

从broker的项目结构,我们可以看到,他的启动入口是BrokerStartup

image.png

从这我们可以看到他的启动方式和namesrv类似,都是先构建控制器,然后调用启动方法,来看看BrokerController#start

public void start() throws Exception {

    this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

    if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
        isIsolated = true;
    }

    if (this.brokerOuterAPI != null) {
        //broker外部api服务,基于netty
        this.brokerOuterAPI.start();
    }


    //启动基础服务,包含消息仓库、消息处理服务器、状态监控服务器等
    startBasicService();

    if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
        changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
        this.registerBrokerAll(true, false, true);
    }

    scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
        @Override
        public void run0() {
            try {
                if (System.currentTimeMillis() < shouldStartTime) {
                    BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                    return;
                }
                if (isIsolated) {
                    BrokerController.LOG.info("Skip register for broker is isolated");
                    return;
                }
                //注册所有的的topic和队列
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                BrokerController.LOG.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

    if (this.brokerConfig.isEnableSlaveActingMaster()) {
        //定时向其他的broker心跳
        scheduleSendHeartbeat();

        scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run0() {
                try {
                    //定时从namesrv同步代理成员组
                    BrokerController.this.syncBrokerMemberGroup();
                } catch (Throwable e) {
                    BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
                }
            }
        }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
    }

    if (this.brokerConfig.isEnableControllerMode()) {
        //定时心跳
        scheduleSendHeartbeat();
    }

    if (brokerConfig.isSkipPreOnline()) {
        startServiceWithoutCondition();
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                //定时刷新数据
                BrokerController.this.brokerOuterAPI.refreshMetadata();
            } catch (Exception e) {
                LOG.error("ScheduledTask refresh metadata exception", e);
            }
        }
    }, 10, 5, TimeUnit.SECONDS);
}

start方法做了一系列的初始化包括:

  • 实例化消息存储服务
  • 实例化消息处理器
  • 实例化服务状态监控器
  • rocket特性服务定时任务
  • 定时心跳服务

消息存储

消息存储依赖于MessageStore实现,在BrokerController的初始化时,调用了BrokerController#initializeMessageStore方法去构建消息存储服务

public boolean initializeMessageStore() {
    boolean result = true;
    try {

        //根据配置实例化默认的消息存储引擎
        DefaultMessageStore defaultMessageStore;
        if (this.messageStoreConfig.isEnableRocksDBStore()) {
            defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
        } else {
            defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
        }

        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            DLedgerRoleChangeHandler roleChangeHandler =
                new DLedgerRoleChangeHandler(this, defaultMessageStore);
            ((DLedgerCommitLog) defaultMessageStore.getCommitLog())
                .getdLedgerServer().getDLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
        }

        this.brokerStats = new BrokerStats(defaultMessageStore);

        // 加载存储插件,包含各种消息计数器和监控
        MessageStorePluginContext context = new MessageStorePluginContext(
            messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
        this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
        this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        if (messageStoreConfig.isTimerWheelEnable()) {
            this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
            TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
            this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
            this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
            this.messageStore.setTimerMessageStore(this.timerMessageStore);
        }
    } catch (IOException e) {
        result = false;
        LOG.error("BrokerController#initialize: unexpected error occurs", e);
    }
    return result;
}

我们基于DefaultMessageStore来看看消息是如何写入到CommitLog,在DefaultMessageStore的实例化方法中,根据enableDLegerCommitLog的设置值去实例化对应的CommitLog对象,默认的是CommitLog

image.png

其中CommitLog实现了Swappable接口,DefaultMessageStore实现了MessageStore

image.png

image.png

MessageStore接口定义了消息存储的接口

/**
 * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
 */
public interface MessageStore {

我们来看看他的异步存储消息的方法#asyncPutMessage,因为同步方法#putMessage也是调用异步方法

@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    return waitForPutResult(asyncPutMessage(msg));
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {

    //处理消息hook
    for (PutMessageHook putMessageHook : putMessageHookList) {
        PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
        if (handleResult != null) {
            return CompletableFuture.completedFuture(handleResult);
        }
    }

    //消息合法性校验
    if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
        && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
        LOGGER.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
    }

    if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
        Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
        if (!QueueTypeUtils.isBatchCq(topicConfig)) {
            LOGGER.error("[BUG]The message is an inner batch but cq type is not batch cq");
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
    }

    //当前时间
    long beginTime = this.getSystemClock().now();
    //调用commitLog写入消息
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

    putResultFuture.thenAccept(result -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        //如果写人消息的时差超过500毫秒,则输出告警日志
        if (elapsedTime > 500) {
            LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
                elapsedTime, msg.getTopic(), msg.getBody().length);
        }
        //更新消息写入延迟计算器
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        //如果消息写入失败  更新失败次数
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

    return putResultFuture;
}

从asyncPutMessage源码中可知,具体的发送是调用commitLog.asyncPutMessage(msg)

来看看CommitLog的具体实现

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //设置消息写入时间
    if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
        msg.setStoreTimestamp(System.currentTimeMillis());
    }
    // Set the message body CRC (consider the most appropriate setting on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    if (enabledAppendPropCRC) {
        // delete crc32 properties if exist
        msg.deleteProperty(MessageConst.PROPERTY_CRC32);
    }
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    msg.setVersion(MessageVersion.MESSAGE_VERSION_V1);
    boolean autoMessageVersionOnTopicLen =
        this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
    if (autoMessageVersionOnTopicLen && topic.length() > Byte.MAX_VALUE) {
        msg.setVersion(MessageVersion.MESSAGE_VERSION_V2);
    }

    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setBornHostV6Flag();
    }

    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setStoreHostAddressV6Flag();
    }

    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    //更新消息水位
    updateMaxMessageSize(putMessageThreadLocal);
    String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    //获取commitLog的追加写入入口
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

    long currOffset;
    if (mappedFile == null) {
        currOffset = 0;
    } else {
        currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }

    //需要写成功多少副本
    int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
    boolean needHandleHA = needHandleHA(msg);

    if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
        if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
        }
        if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
            // -1 means all ack in SyncStateSet
            needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
        }
    } else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
        int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
            this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
        needAckNums = calcNeedAckNums(inSyncReplicas);
        if (needAckNums > inSyncReplicas) {
            // Tell the producer, don't have enough slaves to handle the send request
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
        }
    }

    //lock队列状态
    topicQueueLock.lock(topicQueueKey);
    try {

        boolean needAssignOffset = true;
        if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
            && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
            needAssignOffset = false;
        }
        if (needAssignOffset) {
            defaultMessageStore.assignOffset(msg);
        }

        //处理消息构建putMsgContext
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
        PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);

        //lock要写入消息的队列
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                msg.setStoreTimestamp(beginLockTimestamp);
            }

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                if (isCloseReadAhead()) {
                    setFileReadMode(mappedFile, LibC.MADV_RANDOM);
                }
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
            }

            //追加消息写入commitLog文件
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            //处理吸入状态
            switch (result.getStatus()) {
                case PUT_OK:
                    //写入成功
                    onCommitLogAppend(msg, result, mappedFile);
                    break;
                case END_OF_FILE:
                    //写入达到上线,重建新文件
                    onCommitLogAppend(msg, result, mappedFile);
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                    }
                    if (isCloseReadAhead()) {
                        setFileReadMode(mappedFile, LibC.MADV_RANDOM);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
                        onCommitLogAppend(msg, result, mappedFile);
                    }
                    break;
                //以下为写入异常,具体的是返回对应的错误
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }
        // Increase queue offset when messages are successfully written
        if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
            this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
        }
    } catch (RocksDBException e) {
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    } finally {
        topicQueueLock.unlock(topicQueueKey);
    }

    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

    return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}

再来看看mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);做了什么

image.png

具体的实现方法是AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext)

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
    PutMessageContext putMessageContext) {
    assert messageExt != null;
    assert cb != null;

    //当前写到哪了
    int currentPos = WROTE_POSITION_UPDATER.get(this);

    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = appendMessageBuffer().slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
            // traditional batch message
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBatch) messageExt, putMessageContext);
        } else if (messageExt instanceof MessageExtBrokerInner) {
            // traditional single message or newly introduced inner-batch message
            //单消息看这里,具体的是 处理成commitLog的内容格式 put到byteBuffer
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBrokerInner) messageExt, putMessageContext);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

从源码可知写commitLog的是 cb.doAppend,具体的是写到byteBuffer中

image.png

来看看byteBuffer怎么来的

ByteBuffer byteBuffer = appendMessageBuffer().slice();


protected ByteBuffer appendMessageBuffer() {
    this.mappedByteBufferAccessCountSinceLastSwap++;
    return writeBuffer != null ? writeBuffer : this.mappedByteBuffer;
}

先来看看writeBuffer是个啥

image.png

在DefaultMappedFile的init方法中

public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    //
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}


//transientStorePool.borrowBuffer()
public ByteBuffer borrowBuffer() {
    ByteBuffer buffer = availableBuffers.pollFirst();
    if (availableBuffers.size() < poolSize * 0.4) {
        log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
    }
    return buffer;
}

writeBuffer是在初始化时,通过预加载生成的bytebuff,当他为null时,拿到的是mappedByteBuffer,mappedByteBuffer是个什么呢,在writeBuffer的注释中描述到

如果writeBuffer不为空,消息将首先放到这里,然后重新放到FileChannel。

在私有的init方法中,对mappedByteBuffer进行了初始化

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    UtilAll.ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        //就是这里,将内存中的一块区域映射成文件 就是 0拷贝中的 mmap
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

总结

自此,我们已经知道了,broker在创建的时候会去构建好各种服务来维护broker的相关信息(queue,topic这些之间的关系),初始化后会启动一系列的心跳去向nameserver同步信息和状态,消息存储为单独的messageStore模块,接收到消息后写入到commitLog中(追加到文件末尾,超过一定容量(1G)切换另外一个文件),在写到CommitLog的过程中是优先写bytebuffer如果bytebuffer为空,则直接写到从内存mmap出来的文件上

消息接收处理器 SendMessageProcessor

SendMessageProcessor实现了NettyRequestProcessor,具体的我们来看他的processRequest方法

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext sendMessageContext;
    //根据请求code进行处理
    switch (request.getCode()) {
        //消息回文
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        //默认处理 即接收消息
        default:
            
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
            RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
            //构建消息context
            sendMessageContext = buildMsgContext(ctx, requestHeader, request);
            
            //处理消息hook before
            try {
                this.executeSendMessageHookBefore(sendMessageContext);
            } catch (AbortProcessException e) {
                final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                errorResponse.setOpaque(request.getOpaque());
                return errorResponse;
            }

            RemotingCommand response;
            clearReservedProperties(requestHeader);

            //批量消息
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                    (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
            } else {
                //单消息
                response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                    (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
            }

            return response;
    }
}

这里对消息进行处理后,调用sendMessage方法进行消息处理

public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {

    final RemotingCommand response = preSend(ctx, request, requestHeader);
    if (response.getCode() != -1) {
        return response;
    }

    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    final byte[] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }
    
    //转换成内部的消息对象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());

    String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    if (uniqKey == null || uniqKey.length() <= 0) {
        uniqKey = MessageClientIDSetter.createUniqID();
        oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
    }

    MessageAccessor.setProperties(msgInner, oriProps);

    CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
    if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) {
        if (StringUtils.isBlank(msgInner.getKeys())) {
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark("Required message key is missing");
            return response;
        }
    }

    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);

    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

    // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    } else {
        sendTransactionPrepareMessage = false;
    }

    long beginTimeMillis = this.brokerController.getMessageStore().now();

    
    //是否异步发送
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
        CompletableFuture<PutMessageResult> asyncPutMessageFuture;
        if (sendTransactionPrepareMessage) {
            asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }

        final int finalQueueIdInt = queueIdInt;
        final MessageExtBrokerInner finalMsgInner = msgInner;
        asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
            RemotingCommand responseFuture =
                handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
                    ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
            if (responseFuture != null) {
                doResponse(ctx, request, responseFuture);
            }

            // record the transaction metrics, responseFuture == null means put successfully
            if (sendTransactionPrepareMessage && (responseFuture == null || responseFuture.getCode() == ResponseCode.SUCCESS)) {
                this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
            }

            sendMessageCallback.onComplete(sendMessageContext, response);
        }, this.brokerController.getPutMessageFutureExecutor());
        // Returns null to release the send message thread
        return null;
    } else {
        PutMessageResult putMessageResult = null;
        if (sendTransactionPrepareMessage) {
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            //调用store写消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
        // record the transaction metrics
        if (putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) {
            this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
        }
        sendMessageCallback.onComplete(sendMessageContext, response);
        return response;
    }
}

在这里就是转换和校验消息参数后调用store进行处理

总结

从上面的源码中,我们可以知道,broker接收消息的整个流程是

  • 消息处理器接收消息
  • 转换和校验消息
  • 调用messageStore写入消息
  • store对消息进行编码
  • 判断是否有缓存可以先进行写入,若无则写入到文件中