0%

Kafka 手记

Kafka 架构

  • Topic 消息主题,用来业务上区分不同类型的消息。
  • Partition 物理分区,是一个有消息组成的队列,消息的 offset 记录着它在某个 Partition 下的位置;每个 Topic 下都有若干 PartitionPartitio 上消息被消费 Kafka 并不关心,保留指定时间后删除或者 Partition 文件过大时删除
    • Leader Kafka 这里 Leader 是针对某个 Partition 而言,负责和 Producer、Consumer 交互
    • FollowerLeader 一样针对 Partitio 的概念,只负责同步备份数据,做 Leader 的备胎
  • Broker Kafka 服务端,即一个 Kafka 实例,会将自己的信息、Topic 信息注册到 ZK 上。
  • Producer Consumer 消息生产者和消费者,没什么好多说的。Consumer 也会向 ZK 注册自己的信息用来负载均衡.
    • ConsumerGroup 消费者组,一条消息只会被同一组内的一个 Consumer 消费,可以被不同组的 Consumer 消费;这个配置决定了 Kafka 表现为广播模式还是单播模式。

Kafka 重要术语

  • ISR In-Sync Replicas 同步中的 Partition 集合,其中包括 Leader 本身,刨除了落后过多的副本。
  • HW HighWatermark 对于 Leader 的概念,Consumer 可以看到在 Partition 上的最新消息 offset。
  • LEO logEndOffset,是每个 Partition 的 log 最后一条 Message 的位置。Leader 上 LEO >= HW。

Kafka 技术细节

Kafka 速度快

  • 零拷贝技术。避免用户态和内核态的多次切换,Java 使用 TransferTo() 来直接从操作系统输入端 Buffer 中转移到输出端
  • 顺序写入磁盘 WAL,顺序写入可以显著地增加磁盘千倍的性能,尤其对于机械硬盘来说。
  • 应用层面,支持数据的批量发送和批量获取

消息发送

Producer 发送:

  1. 消息发送时会触发一个线程拉取集群元数据,相当于一个懒初始化的过程
  2. 消息发送是同步/异步取决于 producer.type=sync/async
  3. Producer 现将消息进行序列化处理
  4. 选择消息对应投递的 Partition
    1. 如果消息指定了投递的 Partition 则直接选择该 Partition
    2. 如果消息存在 Key 值,则 Hash(Key)%Partitions 选出 Partition
    3. 轮询选择 Partition
  5. 检查消息大小限制
  6. 异步发送的情况,对消息绑定对调函数
  7. 加入缓存等待发送。异步模式采用批次发送,当批次超过 batch.size(默认 16KB) 或是消息滞留时间超过 linger.time 就会发送出去。单条消息超过 batch.size 会单独做一个批次处理

Broker 收到后:

  1. Leader Partition 会写入本地 Log
  2. 响应 Follower,Follower 也会把消息进行持久化,然后给 Leader ACK
  3. Leader 收到 ISR (In-Sync Replicas) 的 ACK 后增加 HW
  4. 返回 Producer ACK。取决于 Producer request.required.acks 配置是否需要 ISR 全部同步、还是 Leader 持久化完成、还是不关心,分别配置为 -1, 1, 0

消息消费

  • 对于超过 Partition 数量的 Consumer 会无法进行消息消费
  • 如果 Consumer 少于 Partition,部分 Consumer 会消费多个 Partition
  • 如果一个 Consumer 同时消费多个 Partion,无法保证单个 Consumer 消费消息的顺序;如果 Topic 整体存在多个 Partition,则无法保证全局的消息消费顺序

消息存储

  • 消息存储在 -.log 和 .index 文件中,index 用来快速索引消息在 log 文件中的位置
  • Kafka 由于每个 Partition 都会产生日志文件,当 Partition 过多时候批量写入会退化得不再那么顺序…

刷盘策略

三个参数控制 Kafka 刷盘行为:

1
2
3
log.flush.interval.messages 
log.flush.interval.ms
log.flush.scheduler.interval.ms // 默认 3000 ms

默认 3s 定时刷盘,在 Kafka 架构下多副本基本保障了数据不丢失,特别是 request.required.acks=-1 配置下。所以是否需要实时刷盘 log.flush.interval.messages=1 需要结合业务慎重决定。毕竟实时刷盘性能降低太多,同时 Kafka 也不是那么适合在线业务。

谢谢支持!