掘金 后端 ( ) • 2024-04-24 10:40

pulsar consumer重新消费注意事项和规范

Consumer重新消费注意事项和规范

  • Consumer没有开启重试队列情况:创建consumer需要设置ackTimeout,并且业务需要严格处理消费成功ACK和异常ACK,成功consumer.acknowledge(msg),异常consumer.negativeAcknowledge(msg)。

  • Consumer开启重试队列情况:需要设置ackTimeout,如果没有设置ackTimeout,模式使用UnAckedMessageTrackerDisabled里面全是空实现,如果没有设置ackTimeout,并且异常没有使用consumer.negativeAcknowledge(msg),那么这条消息会一直等到当前消费端重新才能重新消费。

消费端流程

image.png

消息重新消费源码分析

Consumer ACK分为三种情况,正常ACK处理逻辑、否定应答ACK处理、未确认消息处理,Consumer对这三种ACK的处理分别在各个Tracker中,管理ACK、重投递、否定应答。

  • 正常ACK处理逻辑

1、使用:consumer.acknowledge(msg)

2、分析:

和其他消息系统不一样,pulsar支持一个Partition被多个消费者消费。

确认消息的模式:

1、单条消息确认

2、累计消息确认

3、批消息中的单个消息确认:开启acknowledgmentAtBatchIndexLevelEnabled=true可以确认一批消息中某几个,如果不开启只能消费完整个Batch中的消息再确认消息,否则Broker会把Batch中的消息重新投递一次。

这三种都属于正常的消息确认模式,正常的确认不会直接发送给Broker,是把请求转交给Consumer中的AcknowledgmentsGroupingTracker对象来处理,它只是一个接口,包含两个实现,一个是持久化订阅的,另一个是非持久化订阅的,非持久化订阅的Tracker实现为空。持久化订阅实现PersistentAcknowledgmentsGroupingTracker。

为了保证确认的性能,并避免Broker收到非常高的确认请求,Tracker中默认支持批量确认,即使单条消息的确认,也会先进入队列,然后一批发完Broker,可以通过参数acknowledgementGroupTimeMicros设置为0,Consumer每次都会立即发送确认请求。单条确认请求会放入一个Set中,默认每次100ms或者堆积超过1000就发送一批确认。消息确认请求都异步发送。

PersistentAcknowledgmentsGroupingTracker

addAcknowledgment添加确认消息,会判断是不是批量,这里只分析单条消息,最终会经过一系列方法到达addAcknowledgent的重载方法

里面也是会判断单条和批量来回掉不通的方法,这里单条doIndividualAck里面只是将消息放入到本地pendingIndividualAcks队列中,然后通过grpc写入到服务端。Broker端暂时没分析,会在后续分析。

注意事项:这里因为默认也是批量确认,上述两个条件之内,客户端发生异常,内存中的确认消息丢失,会导致消费端重新启动时重复消费。

  • 否定应答ACK处理

1、使用:consumer.negativeAcknowledge(msg)

2、分析:

NegativeAcksTracker

调用consumer.negativeAcknowledge时,会讲消息通过add添加到NegativeAcksTracker的 Timer.newTimeout Netty实现的时间轮中。

请求转交给NegativeAcksTracker,不会立即请求Broker,Tracker中记录每条消息及其延迟的时间,复用了PulsarClient的时间轮,一个时间刻度模式是33ms,默认延迟1分钟,到达延迟时间Tracker会抽取到期的消息,并发请求给Broker要求重新投递这些消息。消息存在内存,如果有大量消息需要延迟建议使用reconsumeLater。

时间轮每隔一分钟会调用triggerRedelivery,里面redeliverUnacknowledgedMessages方法就是通过Netty向Broker发送请求

  • 未确认消息处理

第一种场景:Consumer获取消息后一直不确认会怎样?这分两种情况,第一种业务侧已经调用receive方法,或者已经回调了正在异步等待的接受者,此时消息会进入unAckedMessageTracker。里面维护了一个时间轮,时间轮总刻度是固定的,每次调度都会移除队列中的第一个刻度,并新增一个刻度放入队列尾,队列第一个刻度的消息会被清理掉,重新投递给Broker发送指定命令,在Broker侧使用pendingAck集合记录每个推送给消费者但是未确认的消息,避免多次投递消息。

注意:当重新投递消息,如果消费者不使用Shared订阅类型,那么是无法重新投递单条消息的,只能把这个消费者所有已接受但未确认的消息全部重新投递。

第二种场景:消费者做了预拉取,但是还没有调用任何receive方法,此时消息会一直堆积在Consumer的预拉取队列中,Broker侧会把这些消息标记为pengdingAck状态。

注意:从第二种场景处理方式,消费者设置ReceiveQueueSize的时候要慎重,避免大量消息堆积在某一个Consumer的预拉取队列中,其他消费者没有消息可消费。

第一种场景未确认消息处理:在ConsumerBase实例化的时候会有上面一段代码,里面会根据ackTimeout是否为空来实例化不通的Tracker来处理不同配置下的ACK。

分为三种Tracker,UnAckedTopicMessageRedeliveryTracker、UnAckedTopicMessageTracker、UnAckedMessageTrackerDisabled。

UnAckedMessageTrackerDisabled:没有任何实现,当我们创建消费端时候没有设置ACK超时时间ackTimeoutMillis,那么就使用当前tracker,消费端不存在消费超时,并且消费异常如果没有设置否定应答ACK,那么当前消息会一直存在Consumer本地消费端内存,和Broker端的会将消息标记为pendingAck状态,等下下一次Consumer客户端重启,那么Broker会将这部分消息重新投递。

UnAckedTopicMessageTracker继承UnAckedMessageTracker在ConsumerBase实例化的时候会实例化UnAckedMessageTracker

在UnAckedMessageTracker实例化时候会判断是否开启重试队列策略,没有进行下面操作

UnAckedMessageTracker会根据设置的ackTimeoutMillis创建一个重试队列,这也是一个时间轮,根据超时时间来对未正常ACK的消息进行重试,重试方式和未确认消息处理一样向Broker发送重新发送消息。

UnAckedTopicMessageRedeliveryTracker:当前是根据既设置了超时时间和设置了重试队列的情况,会使用重试队列进行重试,会在重试队列中分析。所以即使设置重试队列也要消费端也要设置消费超时时间,重试会在后续分享