0%

Kafka

1 Kafka介绍

  • 作为消息队列:Kafka 是一个分布式、支持分区的、多副本的分布式消息系统,可以建立实时流数据管道,以可靠地在系统或应用程序之间获取数据,相比较于其他消息队列,其设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息
  • 作为流式处理平台: Kafka 为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作;在大数据和流计算领域,Kafka 与周边生态系统的兼容性是最好的没有之一

1.1 应用场景

Kafka 被广泛应用于两大场景:

  • 消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据
  • 数据处理:构建实时的流数据处理程序来转换或处理数据流

消息队列

传统的消息队列应用场景无非就那几种:缓存/削峰、解耦和异步通信。

  • 缓存/削峰:有些场景如秒杀、放号,都有一个明显的特点,就是在短时间内,会有大量的数据流冲向系统,有可能超过系统的处理极限,从而导致系统崩溃,不可用。引入 Kafka 作为消息队列,可以将消息先写入 Kafka,控制和优化数据流入系统的速度。

  • 解耦:在使用 RPC 来进行上下游系统调用的情况下,如订单系统和库存系统,如果库存系统挂掉,订单系统会跟着一起躺,如果需要增加一个下游系统,那么订单系统需要跟着改核心逻辑。引入 Kafka 作为消息队列后,订单系统不再直接通知下游系统,而是将消息发往队列,由下游系统自行拉取消息,从而实现系统之间的解耦,,解耦后允许你独立的修改生产和消费的逻辑,互不影响。

  • 异步通信:将一些与核心业务逻辑关系不大的模块抽出来,放入消息队列,这样能保证核心业务不受影响,减少等待时间,提示响应速度。

流处理

Kafka 保存收集流数据,以提供之后对接的流式计算框架进行处理。

  • 大数据实时处理: 由于其高吞吐量和低延迟的特性,Kafka常被用于实时数据流处理场景,作为数据源接入层,对接各种数据处理框架如Apache Storm、Spark Streaming或Flink,用于实时分析和处理海量数据流。
  • 日志聚合与传输: Kafka能够高效收集应用程序日志,作为集中式日志系统,为日志分析和监控提供数据源,支持如ELK Stack(Elasticsearch, Logstash, Kibana)的日志处理流程。
  • 消息队列与微服务集成: 在分布式系统和微服务架构中,Kafka作为消息中间件,实现服务间的异步解耦通信,提高系统的可扩展性和容错性。
  • 网站活动追踪与用户行为分析: 通过捕获并处理用户在网站上的点击流数据,为个性化推荐、用户行为分析等提供数据支持。
  • 数据集成: Kafka可以作为不同数据存储和处理系统之间的桥梁,实现数据的实时同步和迁移,支持数据湖、数据仓库的构建与维护。

Kafka的核心价值在于提供了一种高效、可扩展、耐用的实时数据流处理基础设施,广泛应用于大数据、实时分析、日志处理、消息传递等众多现代数据密集型应用领域。

2 kafka的基础架构

消息队列有两种模型:

  • 点对点模型,消费者主动拉取消息,收到消息后清除队列里的消息
  • 发布订阅模型,使用主题作为消息的通信载体,类似于广播,消费者收到消息后不删除数据,每个消费者之间互相独立,都可以消费到数据

Kafka 采用的是发布 - 订阅模型。下图可以很好的表述 Kafka 的整个基础架构

  • Producer(生产者) :消息的生产方
  • Consumer(消费者) :消息的消费方
  • Group(消费组) :每个消费者都会有一个 GroupId ,GroupId 相等的话代表他们属于同一个 Group,一个 GroupId 可以看做是一个消费者,它们不会重复的消费同一条消息
  • Broker(代理) :对于 Broker 可以简单理解为一个独立的 Kafka 实例 , 通过分布式架构,Kafka 集群可以横向扩容很多的 Broker ,以增加自己的并发处理能力。以broker.id来区分不同的服务器
  • Topic(主题 : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息 Partition(分区) :Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker。
  • Offset(消息在 Partition 的偏移量) :消息被追加到 Partition 时都会被分配一个 Offset ,Partition 会维护每个 GroupId 的 Offset ,也就是记录不同消费组消费的位置,尽管一个消费组有多个消费者,但它们只能按顺序的消费这个 Partition 中的消息,而不会重复消费
  • Replica(副本) :每个 Partition 都有若干个副本,一个 Leader 和若干个 Follower,主从机制,平时只跟 Leader 打交道,Follower 只负责备份

2.1 Broker的角色

kafka副本机制中,Broker的角色有三种:Controller、Leader(相对分片而言)、Fllower(对于分片而言),作为Controller的Broker会维护一个全局ISR(其他Broker的则维护自己副本的ISR),举个例子:

假设一个3副本的分区 P0,分布在 Broker1(Leader)、Broker2(Follower)、Broker3(Follower)上,Broker0(controller)

  • 局部视角(普通Broker):
    • Broker2 只知道自己作为 P0 的Follower,当前是否成功从Leader拉取数据,落后了多少条。它把这个“自我感觉”报告给Controller。
    • Broker1 作为Leader,也知道 Broker2 和 Broker3 这两个Follower的同步进度(因为它们会来拉数据),它也会将这些“观察结果”报告给Controller。
  • 全局视角(Controller):
    • Controller 同时收到来自 Broker1、Broker2、Broker3 关于分区 P0 的状态报告。
    • 它综合判断:如果 Broker3 的报告显示它已经10秒没有同步数据了(超过 replica.lag.time.max.ms),那么Controller就权威地决定将 Broker3 从 P0 的ISR中移除。
    • 然后,Controller将这个新ISR [1, 2] 写入ZooKeeper,并告诉 Broker1 和 Broker2:“现在 P0 的ISR只有你俩了。”

kafka2当中zk的作用

kafka的zookeeper节点元数据

  • /admin:主要保存kafka当中的核心的重要信息,包括类似于已经删除的topic就会保存在这个路径下面。
  • /brokers:主要用于保存kafka集群当中的broker信息,以及没被删除的topic信息。
  • /cluster: 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号。
  • /config: 集群配置信息。
  • /controller:kafka集群当中的控制器信息,控制器组件(Controller),是Apache Kafka的核心组件。它的作用是所有Broker进行选举时,在该路径下尝试创建一个临时节点。ZooKeeper的保证在该目录下只有一个Broker能创建成功,那么该broker成为controller
  • /controller_epoch:主要用于保存记录controller的选举的次数。
  • /isr_change_notification:isr列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。
  • /latest_producer_id_block:使用/latest_producer_id_block节点来保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。
  • /log_dir_event_notification:主要用于保存当broker当中某些LogDir出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZK当中增加一个通知序号,controller监听到这个节点的变化之后,就会做出对应的处理操作。

以上就是kafka在zk当中保留的所有的所有的相关的元数据信息,这些元数据信息保证了kafka集群的正常运行。

不管是kafka2还是kafka3当中,controller都是必不可少的,通过controller来维护kafka集群的正常运行,例如ISR列表的变更,broker的上线或者下线,topic的创建,分区的指定等等各种操作都需要依赖于Controller。

在kafka2当中,controller的选举需要通过zk来实现,我们没法控制哪些机器选举成为Controller。

而在kafka3当中,我们可以通过配置文件来自己指定哪些机器成为Controller,这样做的好处就是我们可以指定一些配置比较高的机器作为Controller节点,从而保证controller节点的稳健性。使用KRaft模式的话需要配置Process.Roles,否则默认使用Zookeeper

kafka依赖zk所引发的问题

使用了zk的强一致性来选举集群的controller,controller对整个集群的管理至关重要,包括分区的新增,ISR列表的维护,等等很多功能都需要靠controller来实现,然后使用zk来维护kafka的元数据也存在很多的问题以及存在性能瓶颈。

  • 元数据存取困难:每次重新选举的controller需要把整个集群的元数据重新restore,非常的耗时且影响集群的可用性。
  • 元数据更新网络开销大:在ZooKeeper架构中,元数据更新通过Watch机制通知所有Broker,每个Broker都需要拉取全量元数据,导致网络开销随Broker数量线性增长。
  • 网络分区复杂度高:在ZooKeeper架构中,ZooKeeper只管理Controller选举和元数据存储,而不关心Broker之间的数据同步状态,导致在网络分区时,元数据和数据同步状态可能不一致,处理复杂。
  • 脑裂:虽然ZooKeeper能保证所有Watch按顺序触发,但是网络延迟,并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致,可能选出多个领导“大脑”,导致“脑裂”
  • 惊群效应:在ZooKeeper架构中,一个Broker宕机会导致大量Watch事件被触发,每个Watch事件都会导致相应的操作(如分区重选举),这些操作可能同时发生,导致集群资源竞争和性能下降。

kraft

使用kraft能够有效解决上述出现的问题

  • 针对脑裂:Raft通过任期和多数派原则以及两阶段切换成员配置确保了同一时间最多只有一个领导者,从而避免了脑裂。
  • Raft如何解决元数据恢复慢:所有元数据变更都作为日志条目,通过Raft协议复制到所有节点(包括Controller节点),每个节点都维护一个完整的日志序列。同时为了避免日志无限增长,Raft支持快照。节点可以定期将当前状态(即元数据)保存为快照,并截断之前的日志。因此,raft通过日志复制和快照机制,使得元数据恢复只需要增量同步,大大加快了恢复速度。
  • Raft如何解决元数据更新网络开销大的问题: 元数据变更以日志条目的形式在Controller节点之间复制(通常Controller节点数量远小于Broker数量)。只有日志条目(即变更内容)被复制,而不是全量元数据。另外只有领导者负责接收元数据变更请求,并将日志条目复制到其他Controller节点。一旦日志被提交(即被大多数节点复制),领导者就将变更应用到自己的状态机,并将结果返回给客户端。因此,Raft将元数据更新的传播从全量广播改为增量复制,并且只在Controller节点之间进行强一致复制,大大减少了网络开销。

3 Kafka生产者详解

在发送消息的过程中,有两个线程协同工作 —— Main 线程和 Sender 线程。Main 线程负责处理数据,指定发送的位置,然后暂存起来,Sender 线程负责把暂存的数据传输给 Kafka Broker。

3.1 Main 线程详解

Main 线程会创建一个容器,我们暂且称之为记录累加器(RecordAccumulator) ,其默认大小为 32m,这是一个缓冲区,缓冲区内有一个容器ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,用于存放要被发送的数据,其中 Key 为指向分区, Value 存放将要被发送的数据,ProducerBatch 是一个双端队列。

数据在存入缓冲区前,需要经过拦截器、序列化器和分区器的处理:

  • 拦截器在生产端用的较少,我们可以自定义拦截规则拦截数据
  • 序列化器很好理解,我们可以通过指定 key.serializervalue.serializer 参数,来指定如何序列化数据
  • 分区器通过解析参数,确定数据需要发送的分区,发往对应的 ProducerBatch 中
  • 消息累加器(RecordAccumulator):将消息按照分区进行积累,放入批记录(RecordBatch)中,等待发送。

消息累加器:消息累加器是生产者用来缓存消息的内存缓冲区。它的主要作用是将消息按照分区进行分组,每个分区的消息被组织成多个批记录(RecordBatch),以便进行批量发送,提高吞吐量。

每个分区对应一个双端队列(Deque),队列中存放着多个RecordBatch。当主线程发送消息时,会先找到对应分区的队列,然后尝试将消息添加到队列的最后一个RecordBatch中。如果队列为空或者最后一个RecordBatch已经满了,则会创建一个新的RecordBatch,并将消息添加进去。

3.2 sender线程

Sender线程会不断地从消息累加器中获取已经准备好的批记录(RecordBatch),然后将这些批记录通过HTTP请求的方式发送到Kafka集群的broker中。这里会涉及几个问题:

  • 何时拉取数据?
  • 如何确认消息发送成功?
  • 可以同时进行多少个请求?

何时拉取 ProducerBatch 中的数据

在生产者参数中,有一个 batch.size 项,默认是 16k,这个配置项就是控制 ProducerBatch 这个双端队列的大小,当数据累计到配置的值时,Sender 线程就会将里面的数据拉走。 但如果数据一直达不到配置的大小呢?总不能一直不拉取数据吧,这样在使用者看起来,消费者迟迟收不到生产的数据,这是不合理的,因此有另一个配置项 linger.ms,当数据迟迟达不到 batch.size 时,Sender 线程等待了超过 linger.ms 设置的时间,也会拉取数据,linger.ms 的默认值是 0ms,也就是说有数据就会被立即拉走。

如何确认消息发送成功

在生产环境下,消息的发送往往都不是一帆风顺,如网络波动、Kafka Broker 挂掉,等情况都有可能导致消息持久化失败,这就涉及一个问题,在什么情况下 Producer 会认为消息已经发送成功了呢?这里引入一个参数 acks,它有三个可配置的值:

  • acks=0:生产者将不会等待来自服务器的任何确认,该记录将立即添加到缓冲区并视为已发送
  • acks=1(默认值):Leader 会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果 Leader 在确认记录后立即失败,则记录将会丢失
  • acks=all:相当于 acks=-1,Leader 将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证

可以同时进行多少个请求?

这主要取决于生产者的关键配置 max.in.flight.requests.per.connection,它定义了生产者到每个Broker的连接上,最多可以同时有多少个未收到响应的请求。这个参数的默认值是5,也是开启幂等生产功能(enable.idempotence=true)时的允许最大值

生产端配置对消息顺序的影响:

  • 保证顺序:如果你需要确保发送到同一分区的消息严格按照发送顺序写入Broker,必须将此参数设为 1 (启用幂等性后enable.idempotence=true,即使设为5也能保证分区内消息顺序。此时是最大值)。

  • 提升吞吐:增大此值可以进行请求的管道化传输(pipelining),从而提升吞吐量,但可能牺牲顺序。

以下生产者的参数配置

参数名 作用
key.serializervalue.serializer 指定发送消息的 key 和 value 的序列化类型,一定要写类的全限定名
buffer.memory 缓冲区 RecordAccumulator 总大小,默认 32m
batch.size 缓冲区内的批次队列 ProducerBatch 大小,默认 16k
linger.ms 如果数据迟迟未达到 batch.size,Sender 等待 linger.time 之后就会发送数据。默认值是 0ms,表示没 有延迟。生产环境一般设置为 50ms
acks 0:生产者发送过来的数据,不需要等数据落盘应答 1:生产者发送过来的数据,Leader 收到数据后应答 -1(默认值):生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收齐数据后应答
max.in.flight.requests.per.connectionSender 线程缓存的请求数,也是就是允许没有 ack 的请求次数,默认为 5
retries 消息发送失败后重试的次数,如果需要保证数据的顺序性 应该把 Sender 线程缓存的请求数设置为1,否则其他消息可能先发送成功
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms
compression.type 生者者发送数据的时候是否压缩,默认是 none,支持 gzip、snappy、lz4 和 zstd,生产环境一般使用 snappy。

Broker端的配置:请求队列容量 在Broker服务器内部,有一个请求队列(requestQueue)来临时存放待处理的客户端请

  • 核心参数:队列的长度由Broker配置参数 queued.max.requests 控制。
  • 默认值:此参数的默认值为500。当瞬时请求量超过此值,新请求将被阻塞,直到队列中有空位。

如何监控自己的配置合不合适 - 关键监控指标:监控Broker的 RequestQueueTimeMs (请求在队列中平均等待时间)至关重要。如果该值持续很高,说明请求积压,可能需要: - 增加处理线程:调高 num.network.threads 参数,让Broker更快地从队列中取请求处理。 - 优化后端性能:检查 LocalTimeMs(请求实际处理时间)和 RemoteTimeMs(等待其他Broker的时间,例如在 acks=all 时),定位处理瓶颈。

3.3 生成者调优

上面讲了生产者 Producer 在发送消息时的工作流程和原理,也列举了一些生产端高频使用的参数,但这都是「纸上谈兵」,这些参数具体怎么应用呢?所以下面就来聊聊 Kafka 生产者的常用调优手段,主要内容包含:

  • 调优:提高生产者的吞吐量 ✅
  • 调优:保障数据的可靠性 ✅
  • 调优:保证数据的唯一性 ✅
  • 调优:保证数据的有序性 ✅

提高生产者的吞吐量

优化点是send能够批次发送消息给broker。因此重点优化的地方是缓冲区,控制缓冲区的参数有下面这些,我们只需要适当配置下面的参数,就能极大的提高生产者的吞吐量:1buffer.memorybatch.sizelinger.mscompression.type

给出一个我在生产环境用过的配置示例,具体怎么配置还得根据你业务数据大小和量级来尝试:

1
2
3
4
buffer.memory = 128m
batch.size = 64k
linger.ms = 50ms
compression.type = snappy

保障数据的可靠性

保障数据的可靠性是指,我们如何能确保消息一定到达 Kafka 的 Broker,并持久化落盘

Kafka 为了保障数据的安全,做了「冗余」,也就是多副本机制,Leader 负责数据的交互,Follower 负责从 Leader 拉取数据备份,这就涉及到一个同步的问题——Kafka 如何认定副本是保持同步的?

  1. 这就要提到kafka的ISP同步副本集合,这个集合里面记录了所有与 Leader 保持同步的副本集合,也包括 Leader 自己,举个例子:ISR(leader: 0,isr: [0, 1, 2]),代表着有两个副本 1、2Leader 0 保持着同步,一旦 Leader 挂掉,会在 ISR 中重新挑选 Leader。 处。

  2. 生产者会根据 ACKS 的配置来确定数据是否成功落盘,在默认配置 ACKS=1 下,只要 Leader 成功落盘并响应,生产者便视为发送成功,假如 Leader 在响应后,Follower 还没来得及同步,Leader 挂掉,在 Follower 称为新的 Leader 之后,这条消息便丢失了。 保障数据可靠性的前提,就是要设置 ACKS=all,等全部副本分区落盘应答后,再告诉生产者,我已经成功收到消息!当然,分区副本也要设置大于 2 的数量,否则 ACKS=all 和 ACKS=1 没有任何区别。

保证数据发送不重复

kafka使用幂等性和事务保证数据发送不重复:

  • 幂等性指 Producer 不论向 Broker 发送多少次重复的数据,Broker端都只会持久化一条,保证数据不会重复
  • 在启用幂等性的前提下,生产者可以开启事务,在一个事务内发送的数据,要么全部成功,要么全部失败

幂等性是如何判断数据重复的? 具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker 只会持久化一 条。PID是生产者的唯一 ID,Kafka 每次重启都会重新分配;Partition 表示分区号;Sequence Number 是单调自增的消息号。这三个参数组合形成一个主键,相同主键的消息 Kafka 只会持久化一次,这就是幂等性的原理。

根据幂等性的原理可知,它只能保证单分区单会话内数据不重复,假如跨分区了,那就不好使了,如果涉及到这种情况,就可以使用事务

  • 可以通过配置生产者的 enable.idempotence 项来开启幂等性,默认开启。

  • 代码中启用事务,如果中间有某条消息发送失败,所有消息会一起发送失败:

保证数据持久化的有序性

于 Kafka 的架构决定,跨分区是无法保持数据的有序性的,我们只能保证单分区的有序性。

Broker 能缓存最近 5 个来自生产者的请求信息,假如超过五个,前面的请求会被刷掉;而 Sender 线程默认的情况下,为每个 Broker 缓存了 5 个请求,正正好!但是 Sender 线程的缓存请求数是可以通过参数 max.in.flight.requests.per.connection 来改变的。

Kafka 1.X 以后,默认开启幂等性,Sender 线程和 Broker 缓存的请求数也正好,是能保证在单分区下消息的顺序性的

4 分区机制

4.1 为什么要分区

Kafka 为什么要分区? 分区有两个好处:

  • 便于合理使用存储资源:一个主题可以有多个分区,分区可以分布在不同的 Broker 上,海量的数据被分成一块一块存储在不同的服务器,合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度:生产者可以指定分区发送数据,消费者可以指定分区进行消费,达到了类似多线程

4.2 kafka的controller和leader切换(选举)

2.1章节就介绍了kafka中角色和ISR的作用,那么对于controller,以及controller确定好后,分片leader的选举是这样的

  • 在版本3.0前,kafka使用Zookeeper模型进行选举出Controller(Controller通过ZooKeeper的临时节点和Watch机制实现选举),Partition Leader选举由Controller基于ZooKeeper中的ISR列表触发

  • 版本3.0及之后,kafka使用了KRaft模式进行选举,直接使用Raft协议在Controller节点间达成一致性,Controller Leader通过Raft选举产生,所有Partition Leader选举仍由ISR列表选举

为什么raft不用于分区leader的选举?

  • 性能灾难
    • Raft是一种强一致性共识算法,每一次状态变更(比如提议“分区0的新Leader是Broker3”)都需要在多数节点间进行网络往返、日志复制和提交。这个过程延迟很高(通常几十到几百毫秒)。
    • Partition Leader的切换需要极快(目标在秒级甚至毫秒级),以最小化对生产者和消费者的影响。如果每次切换都要走一遍Raft共识流程,集群在故障期间的不可用时间会大大延长。
  • 数据基础不匹配
    • Raft擅长对一份确定的数据达成共识(比如元数据日志)。而Partition Leader选举的依据,哪个副本的数据最新,这是一个持续变化、需要高频采样的指标。
    • 一个Follower可能这一秒落后100条消息,下一秒就追上了。如果每次状态变化都触发一次Raft提案,系统会被海量的、大部分无意义的共识请求淹没。

ISR的选举机制

ISR的集合维护依靠Broker中的一个参数replica.lag.time.max.ms,用于控制 Follower 能落后 Leader 多少时间,落后不超过这个时间的副本 Kafka 就认为它是同步的,落后超过这个时间该 Follower 会从 ISR 中踢出。

若是 Leader 挂了,集合也空了,这个分区就不可用了,因此,生产环境中,合理的配置副本也是非常重要的一项。

  • ISR 不为空,直接从 ISR 中选举
  • ISR 为空,Kafka 也可以从不在 ISR中的存活副本中选举,这个过程称为 Unclean领导者选举

对于第二个选项,可以通过 Broker 端的 unclean.leader.election.enable 参数来控制。 开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性,不推荐,毕竟可用性可以通过其他手段来提升。

4.3 分区的工作流程

生产者分区器根据参数来区分数据要发往的分区:

  • Partition:直接将数据存入对应的分区
  • 没有 PartitionKey:将通过 Key的Hash值 % 主题的分区数 来得到一个 Partition 值
  • 没有 Partition 没有 Key:采用 Sticky Partition(粘性分区器),会随机选择一个分区,并尽可能的一直使用这个分区,待该分区的 ProducerBatch 满了或者已完成,再随机选择其他的分区(不会重复使用上一次的分区)

消费者可以用两种方式消费分区里面的数据:

  • 指定主题,不指定分区:该主题下的所有分区的数据,订阅该主题的消费者都能消费到,但需要注意的是,不同分区之间的数据是不能保证消费顺序的
  • 指定主题,指定分区:可以指定任意分区进行消费

4.4 kafka副本机制

由于Producer和Consumer都只会与Leader角色的分区相连,所以kafka需要以集群的组织形式提供主题下的消息高可用。kafka支持主备复制,所以消息具备高可用和持久性。

高可用保证:

  • 一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。
  • 另一方面,kafka还动态维护一个同步状态的副本集合,只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失

kafka副本数据同步-同步复制和异步复制

Kafka动态维护了一个同步状态的副本的集合(a set of In-Sync Replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息只有被这个集合中的每个节点读取并追加到日志中,才会向外部通知说“这个消息已经被提交”。

消息从leader复制到follower, 我们可以通过决定Producer是否等待消息被提交的通知(ack)来区分同步复制和异步复制。

  • 同步复制流程:
    • 1.producer联系zk识别leader
    • 2.向leader发送消息
    • 3.leadr收到消息写入到本地log
    • 4.follower从leader pull消息
    • 5.follower向本地写入log
    • 6.follower向leader发送ack消息
    • 7.leader收到所有follower的ack消息
    • 8.leader向producer回传ack
  • 异步复制流程: 和同步复制的区别在于,leader写入本地log之后, 直接向client回传ack消息,不需要等待所有follower复制完成。因此异步复制是无法保证leader down掉后消息不会丢失

5 kafka的核心监控指标

监控维度 关键指标 说明与告警阈值参考
消息流量 BytesInPerSec / BytesOutPerSec Broker/主题/分区级的每秒传入/传出字节数。 用途:最直观的流量指标,用于发现瞬时激增。告警:超过历史基线(如2个标准差)或设定阈值。
消息积压 ConsumerLag消费者组的滞后消息数(未消费的消息数)。 用途:监控积压的核心指标。Lag持续增长或超过阈值即表示消费能力不足。告警:根据业务SLA设定,如 > 1000 条或 > 10分钟 的延迟。
消息积压-2 RequestQueueTimeMs (请求在队列中平均等待时间) 至关重要。如果该值持续很高,说明请求积压,可能需要①增加处理线程:调高 num.network.threads 参数,让Broker更快地从队列中取请求处理。②优化后端性能:检查 LocalTimeMs(请求实际处理时间)和 RemoteTimeMs(等待其他Broker的时间,例如在 acks=all 时),定位处理瓶颈。
Broker健康 ActiveControllerCount 活跃Controller数量。 用途:应为1。如果不是1,说明发生了Controller切换(可能原节点故障)。
UnderReplicatedPartitions 未完全同步的分区数。 用途:监控副本健康的关键。理想值为0。大于0表示有副本同步落后,数据可靠性风险升高。告警:> 0 并持续一段时间。
OfflinePartitionsCount 无可用Leader的离线分区数。 用途:理想值为0。大于0表示该分区完全不可读写,是严重故障。告警:> 0。
生产/消费健康 RequestHandlerAvgIdlePercent Broker请求处理线程空闲百分比。 用途:反映Broker的CPU压力。过低(如 < 20%)表示Broker可能成为瓶颈。
Produce/ConsumeRequestTimeMs 生产/消费请求处理时间(P99)。 用途:延迟指标。突增通常意味着磁盘IO、网络或CPU出现问题。