Kafka
Kafka
Kafka 是否会弄丢数据?
主要取决于我们如何使用。
消费端弄丢了数据?
- 只要关闭自动提交 offset ,在处理完之后自己手动提交 offset ,就可以保证数据不会丢。可能会有重复消费,需要保证幂等性
Broker 弄丢了数据?
- replication.factor 参数:这个值必须大于 1要求每个 partition 必须有至少 2 个副本。
- min.insync.replicas 参数:这个值必须大于 1 ,要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队
- 在 Producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
- 在 Producer 端设置 retries=MAX,这个是要求一旦写入失败,就无限重试,卡在这里了
- 生产环境就是按照上述要求配置的,可以保证在 leader 所在 Broker 发生故障,进行 leader 切换时,数据不会丢失。
生产者会不会弄丢数据?
- 按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
Kafka 如何保证消息的顺序性?
方式一,Consumer ,对每个 Partition 内部单线程消费,单线程吞吐量太低,一般不会用这个。
方式二,Consumer ,拉取到消息后,写到 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue 。然后,对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
实际情况也不太需要考虑消息的顺序性,基本没有业务需要。
ZooKeeper 在 Kafka 中起到什么作用?
Broker、Producer、Consumer 和 Zookeeper 的交互。
- Broker 在 ZooKeeper 中的注册。
- Topic 在 ZooKeeper 中的注册。
- Consumer 在 ZooKeeper 中的注册。
- Consumer 负载均衡。
相应的状态存储到 Zookeeper 中。
-
Producer 负载均衡。
- Producer 从 Zookeeper 拉取 Topic 元数据,从而能够将消息发送负载均衡到对应 Topic 的分区中
-
记录消费进度 Offset 。
-
记录 Partition 与 Consumer 的关系。
数据存储模型
Topic 为 test ,Partition 为 0 ,所以文件目录是 test-0 ,目录下面会对应多个日志分段(LogSegment)
LogSegment 文件由两部分组成,分别为 .index 文件和 .log 文件,分别表示为 segment 索引文件和数据文件
为什么不能以 Partition 作为存储单位?
- 如果就以 Partition 为最小存储单位,可以想象,当 Kafka Producer 不断发送消息,必然会引起 Partition 文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响
网络模型
Kafka 基于高吞吐率和效率考虑,并没有使用第三方网络框架,而且自己基于 Java NIO 封装的
1)KafkaClient ,单线程 Selector 模型。
2)KafkaServer(Kafka Broker ),多线程 Selector 模型。
基于发布与订阅的消息系统
特点
1、同时为发布和订阅提供高吞吐量
- 每秒可以生产约 25 万消息(50MB),每秒处理 55 万消息(110MB)。
2、可进行持久化操作
- ETL
- replication
3、分布式系统,易于向外扩展。
4、消息被处理的状态是在 Consumer 端维护,而不是由 Broker 端维护。当失败时,能自动平衡
- 消息是否被处理完成,是通过 Consumer 提交消费进度给 Broker ,而不是 Broker 消息被 Consumer 拉取后,就标记为已消费。
- 当 Consumer 异常崩溃时,可以重新分配消息分区到其它的 Consumer 们,然后继续消费。
支持 online 和 offline 的场景。
设计要点
1)吞吐量
-
1、数据磁盘持久化:消息不在内存中 Cache ,直接写入到磁盘,充分利用磁盘的顺序读写性能。
-
2、zero-copy:减少 IO 操作步骤
-
3、数据批量发送
-
4、数据压缩
-
5、Topic 划分为多个 Partition ,提高并行度。
- Kafka 以 Topic 来进行消息管理
- 每个 Topic 包含多个 Partition
- 每个 Partition 对应一个逻辑 log
- log有多个 segment 文件组成。
- 每个 segment 中存储多条消息,消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射。
- 每个 Partition 在内存中对应一个 index ,记录每个 segment 中的第一条消息偏移。
2)负载均衡
- 1、Producer 根据用户指定的算法,将消息发送到指定的 Partition 中
- 2、Topic 存在多个 Partition ,每个 Partition 有自己的replica ,每个 replica 分布在不同的 Broker 节点上。多个Partition 需要选取出 Leader partition ,Leader Partition 负责读写,并由 Zookeeper 负责 fail over 。
- 3、相同 Topic 的多个 Partition 会分配给不同的 Consumer 进行拉取消息,进行消费。
3)拉取系统
-
Consumer 非常适合采取 pull 的方式消费数据
- 简化 Kafka 设计。
- Consumer 根据消费能力自主控制消息拉取速度。
- Consumer 根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等。
4)可扩展性
- 通过 Zookeeper 管理 Broker 与 Consumer 的动态加入与离开。
几个重要的基本概念
RocketMQ 从 Kafka 演化而来。
命名服务
- Kafka 使用 Zookeeper 作为命名服务
- RocketMQ 自己实现了一个轻量级的 Namesrv
首领分区
- Kafka Broker 的每个分区都有一个首领分区,RocketMQ 没有首领分区一说
拉取消息方式
- Kafka Consumer 使用 poll 的方式拉取消息
- RocketMQ Consumer 提供 poll 的方式的同时,封装了一个 push 的方式,也是基于 poll 的方式的封装。
为什么要分区
为了负载均衡,从而能够水平拓展
Topic 只是逻辑概念,面向的是 Producer 和 Consume,而 Partition 则是物理概念
有了 Partition 概念以后,Kafka 会根据一定的算法将 多个Partition 尽可能均匀的分布到不同的 Broker(服务器)上
发布与拉取
- 当 Producer 发布消息时,Producer 客户端可以采用 random、key-hash 及轮询等算法选定目标 Partition
- 当 Consumer 拉取消息时,Consumer 客户端可以采用 Range、轮询 等算法分配 Partition ,从而从不同的 Broker 拉取对应的 Partition 的 leader 分区。
Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力
应用场景
消息队列
行为跟踪
元信息监控
日志收集
流处理
事件源
持久性日志(Commit Log)
Kafka 消息发送和消费的简化流程是什么
Producer ,根据指定的 partition 方法(round-robin、hash等),将消息发布到指定 Topic 的 Partition 里面
- Producer 只和 Partition 的 leader 进行交互。
Kafka 集群,接收到 Producer 发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
Consumer ,从 Kafka 集群 pull 数据,并控制获取消息的 offset 。至于消费的进度,可手动或者自动提交给 Kafka 集群
- 最后一个 offset 由 ZooKeeper 保存
发送模式
-
同步,默认是同步的方式,如果需要确保消息的可靠性,必须设置为同步
-
异步
- 可以是 Producer 以 batch 的形式 push 数据,这样会极大的提高 Broker的性能,但是这样会增加丢失数据的风险。
Consumer 消费消息时,向 Broker 发出“fetch”请求去消费特定分区的消息
API
-
high-level API
- 屏蔽了每个 Topic 的每个 Partition 的 offset 管理,Broker 失败转移、以及增减 Partition 时 Consumer 时的负载均衡
-
low-level API (Simple Consumer API )
- API 控制更灵活,例如消息重复读取,消息 offset 跳读,Exactly Once 原语
- API 更复杂,offset 不再透明,需要自己管理,Broker 自动失败转移需要处理,增加 Consumer、Partition、Broker 需要自己做负载均衡。
消息格式
message
-
offset
-
message size
-
CRC32
- 用crc32校验message
-
“magic”
- 表示本次发布Kafka服务程序协议版本号
-
“attributes”
- 表示为独立版本、或标识压缩类型、或编码类型
-
length
- 表示key的长度,当key为-1时,K byte key字段不填
-
K byte key
- 可选
-
value bytes payload
- 表示实际消息数据
副本机制
Kafka 的副本机制,是多个 Broker 节点对其他节点的 Topic 分区的日志进行复制
在 Kafka 中并不是所有的副本都能被拿来替代主副本,所以在 Kafka 的Leader 节点中维护着一个 ISR
- 节点必须和 Zookeeper 保持连接。
- 在同步的过程中这个副本不能落后主副本太多。
AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合
- ISR = Leader + 没有落后太多的副本。
- AR = OSR + ISR 。
HW 和 LEO 。
- HW(高水位 HighWatermark),是 Consumer 能够看到的此 Partition 的位置。
- LEO(logEndOffset),是每个 Partition 的 log 最后一条 Message 的位置。
- HW 能保证 Leader 所在的 Broker 失效,该消息仍然可以从新选举的Leader 中获取,不会造成消息丢失
当 Producer 向 Leader 发送数据时,可以通过request.required.acks 参数来设置数据可靠性的级别
-
1
- 意味着 Producer 在 ISR 中的 Leader 已成功收到的数据并得到确认后发送下一条 message 。如果 Leader 宕机了,则会丢失数据。
-
0
- 意味着 Producer 无需等待来自 Broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-
-1
- Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 Leader 时(其他节点都和 Zookeeper 断开连接,或者都没追上),这样就变成了 acks=1 的情况。