【深入理解Kafka系列】第六章 __consumer_offsets(位移主题)
1、__consumer_offsets
消费者位移提交的内容最终会保存到Kafka的内部主题__consumer_offsets中,和你创建的其他主题一样,位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它;当集群中第一次有消费者消费消息时会自动创建主题__consumer_offsets ,它的副本因子还受 offsets. topic.replication. factor 参数的约束,这个参数的默认值为3,分区数可以通过 offsets .topic. num.partitions 参数设置,默认为 50 。
与消费位移对应的消息只定义了key和value字段的具体内容,它不依赖于具体版本的消息格式,做到了与具体的消息格式无关。
key 和 value 中都包含了 version字段,这个用来标识具体的 key 和 value 的版本信息,不同的版本对应的内容格式可能并不相同 。就目前版本而言 , key 和 value 的 version 值都为1。
- key:除了 version 字段还有 group 、 topic 、 partition 字段,分别表示消费组的 groupId 、 主题名称和分区编号。虽然 key 中包含了4个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的 。
- value:除 version 宇段外,其余的offset 、metadata、commit_timestamp、expire_timestamp 宇段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳 。其 中 offset 和 metadata 与OffsetCommitRequest 请求体中的 offset 和 metadata 对应,而 expiretimestamp 和OffsetCommitRequest 请求体中的 retention_time 也有关联, commit_timestamp 值与offsets.retention.minutes 参数值之和即为 expire_timestamp (默认情况下)。
2、OffsetCommitRequest 和OffsetCommitResponse
客户端提交消费位移是使用OffsetCommitRequest 请求实现的, OffsetCommitRequest 结构如图:
请求体第一层中的 group id 、 generation_id 和 member_id 表示消费者的一些信息, retention_time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1 。也就是说,按照 broker 端的配置 offsets.retention.minutes 来确定保留时长。offsets .retention minutes 的默认值为 10080,即 7 天,超过这个时间后消费位移的信息就会被删除(使用墓碑消息和日志压缩策略) 。
在处理完消费位移之后, Kafka 返回 OffsetCommitResponse 给客户端,各个域的字段含义可以根据前面内容推断出,所以不在赘述。
3、删除位移主题中的过期消息
Kafka 使用Compact 策略(压缩)来删除位移主题中的过期消息。对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。
Kafka 提供后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。如果位移主题无限膨胀占用过多磁盘空间的问题,可以去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。