掘金 后端 ( ) • 2024-04-06 16:59

引言


Apache Kafka无疑是一个伟大的发明,可以进行消息削峰、异步处理、大规模流式处理等,如果应用得当,还可以进行高可用的可靠消息投递,进行电商秒杀、金融级别的异步扣库存、异步生成订单、履约等等。Kafka exactly once语义是一个很大的话题,在生产环境如何保证exactly once语义的消息投递?或者是面试官问你这个问题,如果你只是说把生产者设置成acks=all,开启幂等enable.idempotence=true,然后客户端进行去重,那大概率凉凉,下面我们来详细剖析一下Kafka exacly once语义是如何保证的。

1、幂等生产者


首先生产者得保证向broker仅投递一次消息,消息不能丢失,也不能重复。消息不丢失如何保证?Kafka有TCP一样的重传机制,如果producer在规定时间内未收到brocker的ack确认号,则会进行重传,这里涉及producer的三个关键参数:

1.1 acks

acks指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。在默认情况下,Kafka会在首领副本收到消息后向客户端回应消息写入成功。这个参数对写入消息的持久性有重大影响,对于不同的场景,使用默认值可能不是最好的选择。

  • acks=0,则生产者不会等待任何来自broker的响应。 也就是说,如果broker因为某些问题没有收到消息,那么生产者便无从得知,消息也就丢失了。不过,因为生产者不需要等待broker返回响应,所以它们能够以网络可支持的最大速度发送消息,从而达到很高的吞吐量。
  • acks=1,那么只要集群的首领副本收到消息,生产者就会收到消息成功写入的响应。 如果消息无法到达首领副本(比如首领副本发生崩溃,新首领还未选举出来),那么生产者会收到一个错误响应。为了避免数据丢失,生产者会尝试重发消息。不过,在首领副本发生崩溃的情况下,如果消息还没有被复制到新的首领副本,则消息还是有可能丢失。
  • acks=all,那么只有当所有副本全部收到消息时,生产者才会收到消息成功写入的响应。 这种模式是最安全的,它可以保证不止一个broker收到消息,就算有个别broker发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1高,因为生产者需要等待不止一个broker确认收到消息。

你会发现,为acks设置的值越小,生产者发送消息的速度就越快。也就是说,我们通过牺牲可靠性来换取较低的生产者延迟。不过,端到端延迟是指从消息生成到可供消费者读取的时间,这对3种配置来说都是一样的。为什么?

1.2 idempotence

开启acs=all之后,可以保证消息不丢失,也就是at least once语义,如果brocker把消息写入日志,但是由于网络原因,producer未收到ack,此时发生重传,broducer就会持久化两条消息,concumer对应的也会消费到两次,如何避免这种情况?

设置enable.idempotence=true

如果启用了幂等生产者,那么每条消息都将包含生产者ID(PID)和序列号。我们将它们与目标主题和分区组合在一起,用于唯一标识一条消息。broker会用这些唯一标识符跟踪写入每个分区的最后5条消息。 为了减少每个分区需要跟踪的序列号数量,生产者需要将max.inflight.requests设置成5或更小的值(默认值是5), 这个参数指定了生产者在收到服务器响应之前可以发送多少个消息批次。如果broker收到之前已经收到过的消息,那么它将拒绝这条消息,并返回错误。

1.3 幂等生产者的局限性

  • 幂等生产者只能防止因生产者自身的重试机制而导致的消息重复,不管这种重试是由生产者、网络还是broker错误所导致。
  • 如果producer在发出消息后宕机,但未收到ack确认,此时broker已将消息写入日志,producer恢复后又重新发送一次消息,此时生产了新的生产者PID,broker无法去重,造成消息重复。

对于使用同一条消息调用两次producer.send()就会导致消息重复的情况,即使使用幂等生产者也无法避免。这是因为生产者无法知道这两条消息实际上是一样的。对于第二个情况,我们可以通过事务解决,除此之外,事务还能确保事务生产者发送多条消息的原子性(要么都成功,要么都失败)。

2、事务


为保证Streams应用程序的正确性,Kafka中加入了事务机制。为了让流式处理应用程序生成正确的结果,要保证每个输入的消息都被精确处理一次,即使是在发生故障的情况下。一些流式处理应用程序对准确性要求较高,特别是如果处理过程包含了聚合或连接操作,那么事务对它们来说就会非常有用。

如果broker期望消息2后面跟着消息3,但收到了消息27,那么这个时候该怎么办?在这种情况下,broker将返回“乱序”错误。如果使用了不带事务的幂等生产者,则这个错误可能会被忽略。

事务也支持为Consumer设置不同的隔离级别。

2.1 事务解决的问题

假设有一个简单的流式处理应用程序:它从源主题读取消息,然后可能会对消息做一些处理,再将结果写入另一个主题。我们想要确保处理的每一条消息的结果只被写入一次。那么,哪些地方有可能出错呢?

  1. 应用程序崩溃导致的重复处理。 在从源集群读取并处理了消息之后,应用程序必须做两件事:一是将结果写入输出主题,二是提交已处理的消息的偏移量。假设这两个动作就按照这个顺序发生。如果应用程序在发送结果之后发生崩溃,但偏移量还没有提交,该怎么办?当消费者崩溃时会发生什么。几秒之后,因为没有心跳,所以将触发再均衡,消费者读取的分区将被重新分配给其他消费者。新消费者将从最后提交的偏移量的位置开始读取这些分区的消息。在最后一个提交的偏移量和应用程序发生崩溃那个位置之间的消息将被再次处理,结果也将被再次写入输出主题——这就出现了重复。
  2. producer重启导致的重复投递。 prodercer启用了幂等生产者,在投递完一条消息后还未收到broker的ack就发生了宕机,此时producer重启,然后重新投递这条消息,由于重启后生成了一个随机的PID,导致broker以为是另一个producer发送的消息,无法根据PID去重,造成消息重复投递。
  3. “僵尸”应用程序导致的重复处理。 如果应用程序从Kafka读取了一个消息批次,但还没有开始处理它们就被挂起或与Kafka断开了连接,那么这个时候会发生什么?就像前面的场景一样,在停止发送心跳一段时间之后,应用程序将被认为已经“死亡”,它的分区将被重新分配给消费者群组里的其他消费者。新消费者将重新读取这个消息批次,对其进行处理,并将结果写入输出主题,然后继续。这个时候,之前的应用程序实例(被挂起的那个)可能又恢复过来了:继续处理它最近读取的消息批次,并将结果写入输出主题——这就发生了重复。

事务是如何保证精确一次性的?

  1. 对于第一种应用程序奔溃导致重复处理的情况,Kafka事务引入了原子多分区写入的概念。 它会从一个主题读取数据,对数据进行处理,再将结果写入另一个主题。精确一次处理意味着消费、处理和生产都是原子操作,要么提交偏移量和生成结果这两个操作都成功,要么都不成功。为了支持这种行为,Kafka事务引入了原子多分区写入的概念。我们知道,提交偏移量和生成结果都涉及向分区写入数据,结果会被写入输出主题,偏移量会被写入consumer_offsets主题。如果可以打开一个事务,向这两个主题写入消息,如果两个写入操作都成功就提交事务,如果不成功就中止,并进行重试,那么就会实现我们所追求的精确一次性语义。
  2. 对于第二种producer重启的情况,broker维护了transactional.id和producer.id之间的映射关系。 事务性生产者实际上就是一个配置了transactional.id并用initTransactions()方法初始化的Kafka生产者。与producer.id(由broker自动生成)不同,transactional.id是一个生产者配置参数,在生产者重启之后仍然存在。实际上,transactional.id主要用于在重启之后识别同一个生产者。broker维护了transactional.id和producer.id之间的映射关系,如果对一个已有的transactional.id再次调用initTransactions()方法,则生产者将分配到与之前一样的producer.id,而不是一个新的随机数,这就避免了producer重启导致的重复投递。
  3. 对于第二种僵尸进程的情况,Kafka使用一个递增的epoch进行隔离。 在调用initTransaction()方法初始化事务性生产者时,Kafka会增加与transactional.id相关的epoch。带有相同transactional.id但epoch较小的发送请求、提交请求和中止请求将被拒绝,并返回FencedProducer错误。旧生产者将无法写入输出流,并被强制close(),以防止“僵尸”引入重复记录。

2.2 原子多分区写入的原理

事务引入了事务协调器和两阶段提交来完成原子多分区写入。 当生产者要提交一个事务时,它会发送“提交”消息给事务协调器,事务协调器会将提交标记写入所有涉及这个事务的分区。如果生产者在向部分分区写入提交消息后发生崩溃,该怎么办?Kafka事务使用两阶段提交和事务日志来解决这个问题。

在开始第一个事务之前,生产者需要通过调用initTransaction()来注册自己。这个请求会被发送给一个broker,它将成为这个事务性生产者的事务协调器。就像每一个broker都是部分消费者群组的消费者群组协调器一样,每一个broker都是部分生产者的事务协调器。Kafka需要一个事务日志,这里使用了一个叫作 __transaction_state的内部主题,来记录事务的状态,这个算法会执行如下步骤。

  1. 记录正在执行中的事务,包括所涉及的分区。
  2. 记录提交或中止事务的意图——一旦被记录下来,到最后要么被提交,要么被中止。
  3. 将所有事务标记写入所有分区。
  4. 记录事务的完成情况。

initTransaction()API注册了一个带有新事务ID的协调器或者增加现有事务ID的epoch,用以隔离变成“僵尸”的旧生产者。当epoch增加时,挂起的事务将被中止。

2.3 事务隔离级别

我们通过设置isolation.level参数来控制消费者如何读取以事务方式写入的消息。 如果设置为read_committed,那么调用consumer.poll()将返回属于已成功提交的事务或以非事务方式写入的消息,它不会返回属于已中止或执行中的事务的消息。默认的隔离级别是read_uncommitted,它将返回所有记录,包括属于执行中或已中止的事务的记录。配置成read_committed并不能保证应用程序可以读取到特定事务的所有消息。也可以只订阅属于某个事务的部分主题,这样就可以只读取部分消息。此外,应用程序无法知道事务何时开始或结束,或者哪些消息是哪个事务的一部分。

因此Kafka事务也是支持设置隔离级别,下次用到聊到别再两眼一抹黑了!