kafka核心设计与实践原理

基本概念

img

"

主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力

Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。

分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR集合为空。

leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,图中offset为9的位置即为当前日志文件的LEO,LEO的大小相当于当前日志分区中最后一条消息的offset值加1。分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

img

"

生产者

客户端开发

一个正常的生产逻辑需要具备以下几个步骤:

(1)配置生产者客户端参数及创建相应的生产者实例。
(2)构建待发送的消息。
(3)发送消息。
(4)关闭生产者实例。

原理分析

整体架构

image-20220428115634836

"

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。
在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。

元数据更新

InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。如图:node1的负载最小。选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。

image-20220428135038871

"

bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。

重要的生产者参数

acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。

· acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。

· acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

· acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。

消费者

消费者与消费组

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

img

"

客户端开发

一个正常的消费逻辑需要具备以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例。
(2)订阅主题。
(3)拉取消息并消费。
(4)提交消费位移。
(5)关闭消费者实例。

消费消息

  • Kafka中的消费是基于拉模式的
  • Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集。
  • poll还涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容。

位移提交

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。

在每次调用poll()方法时,它返回的是还没有被消费过的消息集,要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。手动提交可以细分为同步提交和异步提交.

如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

指定位移消费

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。

再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应尽量避免不必要的再均衡的发生。

主题与分区

分区的管理

优先副本的选举

分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。

为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。

日志存储

文件目录布局

一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件

img

"

向Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。为了方便描述,我们将最后一个 LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。

为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为00000000000000000000.log。

从更加宏观的视角上看,Kafka 中的文件不只上面提及的这些文件,比如还有一些检查点文件,当一个Kafka服务第一次启动的时候,默认的根目录下就会创建以下5个文件:

img

"

日志格式的演变

每个分区由内部的每一条消息组成,如果消息格式设计得不够精炼,那么其功能和性能都会大打折扣。比如有冗余字段,势必会不必要地增加分区的占用空间,进而不仅使存储的开销变大、网络传输的开销变大,也会使Kafka的性能下降。反观如果缺少字段,比如在最初的Kafka消息版本中没有timestamp字段,对内部而言,其影响了日志保存、切分策略,对外部而言,其影响了消息审计、端到端延迟、大数据应用等功能的扩展。虽然可以在消息体内部添加一个时间戳,但解析变长的消息体会带来额外的开销,而存储在消息体(参中的value字段)前面可以通过指针偏移量获取其值而容易解析,进而减少了开销(可以查看v1版本),虽然相比于没有 timestamp 字段的开销会大一点。

v0版本

img

"

attributes(1B):消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。

v0版本中一个消息的最小长度(RECORD_OVERHEAD_V0)为crc32+magic+attributes+key length+value length=4B+1B+1B+4B+4B=14B。也就是说,v0版本中一条消息的最小长度为14B,如果小于这个值,那么这就是一条破损的消息而不被接收。

v1版本

Kafka从0.10.0版本开始到0.11.0版本之前所使用的消息格式版本为v1,比v0版本就多了一个timestamp字段,表示消息的时间戳。

img

"

消息压缩

常见的压缩算法是数据量越大压缩效果越好,一条消息通常不会太大,这就导致压缩效果并不是太好。而Kafka实现的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果。在一般情况下,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩。

img

"

v2版本

日志索引

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

偏移量索引

img

"

(1)relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用4 个字节,当前索引文件的文件名即为baseOffset的值。
(2)position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。

消息的偏移量(offset)占用8个字节,也可以称为绝对偏移量。索引项中没有直接使用绝对偏移量而改为只占用4个字节的相对偏移量(relativeOffset=offset-baseOffset),这样可以减小索引文件占用的空间。

示意图如图:

img

"

日志清理

  1. 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
  2. 日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本

磁盘存储

Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。

有关测试结果表明,一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性(顺序)写入速度可以达到600MB/s,而随机写入速度只有100KB/s,两者性能相差6000倍。操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式,否则页缓存很难被禁止。基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。

Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。

Linux系统会使用磁盘的一部分作为swap分区,这样可以进行进程的调度:把当前非活跃的进程调入 swap 分区,以此把内存空出来让给活跃的进程。对大量使用系统页缓存的 Kafka而言,应当尽量避免这种内存的交换,否则会对它各方面的性能产生很大的负面影响。我们可以通过修改vm.swappiness参数(Linux系统参数)来进行调节。vm.swappiness参数的上限为 100,它表示积极地使用 swap 分区,并把内存上的数据及时地搬运到 swap 分区中;vm.swappiness 参数的下限为 0,表示在任何情况下都不要发生交换(vm.swappiness=0的含义在不同版本的 Linux 内核中不太相同,这里采用的是变更后的最新解释),这样一来,当内存耗尽时会根据一定的规则突然中止某些进程。笔者建议将这个参数的值设置为 1,这样保留了swap的机制而又最大限度地限制了它对Kafka性能的影响。

磁盘I/O流程

从编程角度而言,一般磁盘I/O的场景有以下四种。

(1)用户调用标准C库进行I/O操作,数据流为:应用程序buffer→C库标准IObuffer→文件系统页缓存→通过具体文件系统到磁盘。
(2)用户调用文件 I/O,数据流为:应用程序 buffer→文件系统页缓存→通过具体文件系统到磁盘。
(3)用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘。
(4)用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接写磁盘。

img

"

针对不同的应用场景,I/O调度策略也会影响I/O的读写性能,目前Linux系统中的I/O调度策略有4种,分别为NOOP、CFQ、DEADLINE和ANTICIPATORY,默认为CFQ。

NOOP

NOOP算法的全写为No Operation。该算法实现了最简单的FIFO队列,所有I/O请求大致按照先来后到的顺序进行操作。之所以说“大致”,原因是NOOP在FIFO的基础上还做了相邻I/O请求的合并,并不是完全按照先进先出的规则满足I/O请求。

CFQ

CFQ算法的全写为Completely Fair Queuing。该算法的特点是按照I/O请求的地址进行排序,而不是按照先来后到的顺序进行响应。

CFQ是默认的磁盘调度算法,对于通用服务器来说是最好的选择。它试图均匀地分布对/IO带宽的访问。CFQ为每个进程单独创建一个队列来管理该进程所产生的请求,也就是说,每个进程一个队列,各队列之间的调度使用时间片进行调度,以此来保证每个进程都能被很好地分配到I/O带宽。I/O调度器每次执行一个进程的4次请求。缺点是,先来的I/O请求并不一定能被满足,可能会出现“饿死”的情况。

DEADLINE

DEADLINE在CFQ的基础上,解决了I/O请求“饿死”的极端情况。除了CFQ本身具有的I/O排序队列,DEADLINE额外分别为读I/O和写I/O提供了FIFO队列。读FIFO队列的最大等待时间为500ms,写FIFO队列的最大等待时间为5s。FIFO队列内的I/O请求优先级要比CFQ队列中的高,而读FIFO队列的优先级又比写FIFO队列的优先级高。

ANTICIPATORY

CFQ和DEADLINE考虑的焦点在于满足零散I/O请求上。对于连续的I/O请求,比如顺序读,并没有做优化。在DEADLINE的基础上,为每个读I/O都设置了6ms的等待时间窗口。如果在6ms内OS收到了相邻位置的读I/O请求,就可以立即满足。

零拷贝

所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于 Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。

单纯从概念上理解“零拷贝”比较抽象,这里简单地介绍一下它。考虑这样一种常用的情形:你需要将静态内容(类似图片、文件)展示给用户。这个情形就意味着需要先将静态内容从磁盘中复制出来放到一个内存buf中,然后将这个buf通过套接字(Socket)传输给用户,进而用户获得静态内容。这看起来再正常不过了,但实际上这是很低效的流程,我们把上面的这种情形抽象成下面的过程:

img

"

首先调用read()将静态内容(这里假设为文件A)读取到tmp_buf,然后调用write()将tmp_buf写入Socket,如图5-23所示。
在这个过程中,文件A经历了4次复制的过程:
(1)调用read()时,文件A中的内容被复制到了内核模式下的Read Buffer中。
(2)CPU控制将内核模式数据复制到用户模式下。
(3)调用write()时,将用户模式下的内容复制到内核模式下的Socket Buffer中。
(4)将内核模式下的Socket Buffer的数据复制到网卡设备中传送。

img

"

从上面的过程可以看出,数据平白无故地从内核模式到用户模式“走了一圈”,浪费了 2次复制过程:第一次是从内核模式复制到用户模式;第二次是从用户模式再复制回内核模式,即上面4次过程中的第2步和第3步。而且在上面的过程中,内核和用户模式的上下文的切换也是4次。

如果采用了零拷贝技术,那么应用程序可以直接请求内核把磁盘中的数据传输给 Socket,如图

img

"

零拷贝技术通过DMA(Direct Memory Access)技术将文件内容复制到内核模式下的Read Buffer 中。不过没有数据被复制到 Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。

深入服务端

时间轮

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

img

"

第一层的时间轮tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。
对于350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的TimerTaskList。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的TimerTaskList。注意到在到期时间为[400ms,800ms)区间内的多个任务(比如446ms、455ms和473ms的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的TimerTaskList的超时时间为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历40ms之后,此时这个任务又被“察觉”,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。

img

"

延时操作

img

"

控制器

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。

控制器的选举及异常恢复

Kafka中的控制器选举工作依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

img

"

其中version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 ZooKeeper 中不存在/controller节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

ZooKeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。
controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。由此可见,Kafka 通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。

控制器在选举成功之后会读取 ZooKeeper 中各个节点的数据来初始化上下文信息(ControllerContext),并且需要管理这些上下文信息。比如为某个主题增加了若干分区,控制器在负责创建这些分区的同时要更新上下文信息,并且需要将这些变更信息同步到其他普通的broker 节点中。不管是监听器触发的事件,还是定时任务触发的事件,或者是其他事件,都会读取或更新控制器中的上下文信息,那么这样就会涉及多线程间的同步。如果单纯使用锁机制来实现,那么整体的性能会大打折扣。针对这一现象,Kafka 的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到 LinkedBlockingQueue 中,最后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output,先入先出)的原则顺序处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全,具体可以参考图6-14。

img

"

如果broker 在数据变更前是控制器,在数据变更后自身的 brokerid 值与新的 activeControllerId 值不一致,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。

分区leader的选举

分区leader副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为OfflinePartitionLeaderElectionStrategy。这种策略的基本思路是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。

举个例子,集群中有3个节点:broker0、broker1和broker2,在某一时刻具有3个分区且副本因子为3的主题topic-leader的具体信息如下:

img

"

此时关闭broker0,那么对于分区2而言,存活的AR就变为[1,2],同时ISR变为[2,1]。此时查看主题topic-leader的具体信息(参考如下),分区2的leader就变为了1而不是2。

img

"

深入客户端

分区分配策略

Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。

RangeAssignor分配策略

RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

假设上面例子中2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

img

"

RoundRobinAssignor分配策略

RoundRobinAssignor分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor分配策略对应的 partition.assignment.strategy 参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor。

如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。

举个例子,假设消费组内有3个消费者(C0、C1和C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

img

"

StickyAssignor分配策略

我们再来看一下StickyAssignor分配策略,“sticky”这个单词可以翻译为“黏性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

(1)分区的分配要尽可能均匀。
(2)分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor分配策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂得多。

消费者协调器和组协调器

了解了Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。

旧版消费者客户端的问题

消费者协调器和组协调器的概念是针对新版的消费者客户端而言的,Kafka 建立之初并没有它们。旧版的消费者客户端是使用ZooKeeper的监听器(Watcher)来实现这些功能的。

每个消费组(<group>)在ZooKeeper中都维护了一个/consumers/<group>/ids路径,在此路径下使用临时节点记录隶属于此消费组的消费者的唯一标识(consumerIdString),consumerIdString由消费者启动时创建。
与/consumers/<group>/ids同级的还有两个节点:owners和offsets,/consumers/<group>/owner 路径下记录了分区和消费者的对应关系,/consumers/<group>/offsets路径下记录了此消费组在分区中对应的消费位移。

img

"

每个消费者在启动时都会在/consumers/<group>/ids 和/brokers/ids 路径上注册一个监听器。当/consumers/<group>/ids路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当/brokers/ids路径下的子节点发生变化时,表示broker出现了增减。这样通过ZooKeeper所提供的Watcher,每个消费者就可以监听消费组和Kafka集群的状态了。

这种方式下每个消费者对ZooKeeper的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致Kafka工作在一个不正确的状态。与此同时,这种严重依赖于ZooKeeper集群的做法还有两个比较严重的问题。

(1)羊群效应(Herd Effect):所谓的羊群效应是指ZooKeeper中一个被监听的节点变化,大量的 Watcher 通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
(2)脑裂问题(Split Brain):消费者进行再均衡操作时每个消费者都与ZooKeeper进行通信以判断消费者或broker变化的情况,由于ZooKeeper本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。

再均衡的原理

新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。

ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:
· 有新的消费者加入消费组。
· 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
· 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
· 消费组所对应的GroupCoorinator节点发生了变更。
· 消费组内所订阅的任一主题或者主题的分区数量发生变化。

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。

第一阶段(FIND_COORDINATOR)

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点

Kafka 在收到 FindCoordinatorRequest 请求之后,会根据 coordinator_key(也就是groupId)查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回其相对应的node_id、host和port信息。

img

"
  1. 先根据消费组groupId的哈希值计算__consumer_offsets中的分区编号
  2. 找到对应的__consumer_offsets中的分区之后,再寻找此分区leader副本所在的broker节点,该broker节点即为这个groupId所对应的GroupCoordinator节点

第二阶段(JOIN_GROUP)

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

img

"

JoinGroupRequest中的group_protocols域为数组类型,其中可以囊括多个分区分配策略

选举消费组的leader

GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有 leader,那么第一个加入消费组的消费者即为消费组的 leader。如果某一时刻 leader 消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了,相关代码如下:

img

"
选举分区分配策略

每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定,而是根据各个消费者呈报的分配策略来实施。最终的分配策略基本上可以看作被各个消费者支持的最多的策略

第三阶段(SYNC_GROUP)

leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案,如图:

img

"

服务端在收到消费者发送的SyncGroupRequest请求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator会将从 leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka的__consumer_offsets主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。

当消费者收到所属的分配方案之后开启心跳任务,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。

第四阶段(HEARTBEAT)

进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的__consumer_offsets主题中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。

消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。

__consumer_offsets剖析

一般情况下,当集群中第一次有消费者消费消息时会自动创建主题__consumer_offsets,不过它的副本因子还受offsets.topic.replication.factor参数的约束。客户端提交消费位移是使用 OffsetCommitRequest 请求实现的,OffsetCommitRequest 的结构如图

img

"

请求体第一层中的group_id、generation_id和member_id在前面的内容中已经介绍过多次了,retention_time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1。也就是说,按照 broker 端的配置 offsets.retention.minutes 来确定保留时长。offsets.retention.minutes的默认值为10080,即7天,超过这个时间后消费位移的信息就会被删除(使用墓碑消息和日志压缩策略)。注意这个参数在2.0.0版本之前的默认值为1440,即1天,很多关于消费位移的异常也是由这个参数的值配置不当造成的。有些定时消费的任务在执行完某次消费任务之后保存了消费位移,之后隔了一段时间再次执行消费任务,如果这个间隔时间超过offsets.retention.minutes的配置值,那么原先的位移信息就会丢失,最后只能根据客户端参数 auto.offset.reset 来决定开始消费的位置,遇到这种情况时就需要根据实际情况来调配offsets.retention.minutes参数的值。

同消费组的元数据信息一样,最终提交的消费位移也会以消息的形式发送至主题__consumer_offsets,与消费位移对应的消息也只定义了 key 和 value 字段的具体内容,它不依赖于具体版本的消息格式,以此做到与具体的消息格式无关。

img

"

事务

Kafka从0.11.0.0版本开始引入了幂等和事务这两个特性,以此来实现EOS(exactly once semantics,精确一次处理语义)。

幂等

生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。

开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence设置为true即可(这个参数的默认值为false)

为了实现生产者的幂等性,Kafka为此引入了producer id(以下简称PID)和序列号(sequence number)这两个概念,分别对应 v2 版的日志格式中RecordBatch的producer id和first seqence这两个字段。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将<PID,分区>对应的序列号的值加1。

broker端会在内存中为每一对<PID,分区>维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_new<SN_old+1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new>SN_old+1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction()等方法的调用都会抛出IllegalStateException的异常。

引入序列号来实现幂等也只是针对每一对<PID,分区>而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。

事务

幂等性并不能跨多个分区运作,而事务[1];)可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

为了实现事务,应用程序必须提供唯一的 transactionalId,这个 transactionalId 通过客户端参数transactional.id来显式设置,参考如下:

img

"

事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将 enable.idempotence 设置为 true

transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。另外,为了保证新的生产者启动后具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch

从生产者的角度分析,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactionalId 的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。
而从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:消息归档,通过seek()方法消费导致消息遗漏等

在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行 commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了 abortTransaction()方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。

日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT和ABORT,分别用来表征事务已经成功提交或已经被成功中止。KafkaConsumer 可以通过这个控制消息来判断对应的事务是被提交了还是被中止了,然后结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端应用

img

"

为了实现事务的功能,Kafka还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的TransactionCoordinator,所有的事务逻辑包括分派 PID 等都是由 TransactionCoordinator 来负责实施的。TransactionCoordinator 会将事务状态持久化到内部主题__transaction_state 中。下面就以最复杂的consume-transform-produce的流程为例来分析Kafka事务的实现原理。

image-20220503100242094

"

可靠性探究

副本剖析

Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。

失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即under-replicated分区。

img

"

一般有两种情况会导致副本失效:

· follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。
· follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。

ISR的伸缩

Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为“isr-expiration”和“isr-change-propagation”。

isr-expiration任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的/brokers/topics/<topic>/partition/<parititon>/state节点中。

当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation任务会周期性(固定值为 2500ms)地检查 isrChangeSet,如果发现isrChangeSet中有ISR集合的变更记录,那么它会在ZooKeeper的/isr_change_notification路径下创建一个以 isr_change_开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000),并将isrChangeSet中的信息保存到这个节点中。

随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW,注意这里并不是和leader副本的LEO相比。ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/<topic>/partition/<parititon>/state节点和isrChangeSet,之后的步骤就和ISR收缩时的相同。

Leader Epoch的介入

如果leader副本发生切换,那么同步过程又该如何处理呢?在0.11.0.0版本之前,Kafka使用的是基于HW的同步机制,但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题。

在某一时刻,B中有2条消息m1和m2,A从B(leader副本)中同步了这两条消息,此时A和B的LEO都为2,同时HW都为1;之后A再向B中发送请求以拉取消息,FetchRequest请求中带上了A的LEO信息,B在收到请求之后更新了自己的HW为2;B中虽然没有更多的消息,但还是在延时一段时间之后(延时拉取)返回FetchResponse,并在其中包含了HW信息;最后A根据FetchResponse中的HW信息更新自己的HW为2。

img

"

可以看到整个过程中两者之间的HW同步有一个间隙,在A写入消息m2之后(LEO更新为2)需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为2。如果在这个时候A宕机了,那么在A重启之后会根据之前HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会将m2这条消息删除,此时A只剩下m1这一条消息,之后A再向B发送FetchRequest请求拉取消息。

img

"

此时若B 再宕机,那么 A 就会被选举为新的leader。B 恢复之后会成为follower,由于follower副本HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为1。这样一来m2这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。

img

"

Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leader epoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leader epoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch=>StartOffset>,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leader epoch变更时,会将对应的矢量对追加到这个文件中

下面我们再来看一下引入 leader epoch 之后如何应付前面所说的数据丢失和数据不一致的场景。(LE是LeaderEpoch的缩写,当前A和B中的LE都为0)

img

"

同样 A 发生重启,之后 A 不是先忙着截断日志而是先发送 OffsetsForLeaderEpochRequest请求给 B(OffsetsForLeaderEpochRequest ,其中包含 A 当前的LeaderEpoch值),B作为目前的leader在收到请求之后会返回当前的LEO(LogEndOffset,注意图中LE0和LEO的不同),与请求对应的响应为OffsetsForLeaderEpochResponse

img

"

A在收到2之后发现和目前的LEO相同,也就不需要截断日志了。之后B发生了宕机,A成为新的leader,那么对应的LE=0也变成了LE=1,对应的消息m2此时就得到了保留。之后不管B有没有恢复,后续的消息都可以以LE1为LeaderEpoch陆续追加到A中。

img

"

再来看一下leader epoch如何应对数据不一致的场景。如图所示,当前A为leader,B为follower,A中有2条消息m1和m2,而B中有1条消息m1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为新的leader。

img

"

之后B写入消息m3,并将LEO和HW更新至2,如图所示。注意此时的LeaderEpoch已经从LE0增至LE1了。

img

"

紧接着A也恢复过来成为follower并向B发送OffsetsForLeaderEpochRequest请求,此时A的LeaderEpoch为LE0。B根据LE0查询到对应的offset为1并返回给A,A就截断日志并删除了消息m2,如图所示。之后A发送FetchRequest至B请求来同步数据,最终A和B中都有两条消息m1和m3,HW和LEO都为2,并且LeaderEpoch都为LE1,如此便解决了数据不一致的问题。

img

"

为什么不支持读写分离

主读从写可以均摊一定的负载却不能做到完全的负载均衡,比如对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力,而绝大多数压力还是在主节点上。而在Kafka中却可以达到很大程度上的负载均衡,而且这种均衡是在主写主读的架构上实现的。我们来看一下Kafka的生产消费模型,如图所示。

img

"

在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下Kafka都能做到很大程度上的负载均衡。总的来说,Kafka 只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。为此,Kafka 又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于 Kafka 优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宜之计。