本文介绍如何升级kafka以及kafka各版本的日志格式
kafka 升级步骤
- 去线上机器kafka根目录下查询版本,得知旧版本号为 1.0.1
$ find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' kafka_2.11-1.0.1-javadoc.jar
- 在server.property中增加配置项,使用新版本(2.3.0)二进制文件在新机器上以此配置启动kafka broker进程
inter.broker.protocol.version= 1.0.1 log.message.format.version= 1.0.1
- 将部分topic 迁移到新机器上 ,观察其影响一周。
- 如果第3步 kafka新版本无异常表现良好,则将所有topic迁移到新机器上,停掉所有老机器上的broker
- 修改server.property中的配置项,逐台以此配置重启所有broker
inter.broker.protocol.version= 2.3.0 log.message.format.version= 2.3.0
kafka 新特性 static membership
kafka 旧版本的成员关系可以理解为动态成员关系,新的静态成员关系本质是为了减少消费者组重平衡,待补充。
kafka manager监控指标解释
- Brokers Skewed%: Percentage of brokers having more partitions than the average 如果代理的分区数大于给定主题上每个代理的平均分区数,则代理就会发生倾斜。 eg . 2 brokers、4 partitions , 如果有个分区为 3> 4 / 2,则 broker 就发生了倾斜
- Broker Spread% : Percentage of cluster brokers having partitions from the topic brokers spread 是集群中具有给定主题分区的代理的百分比。 eg . 3个brokers共享2个partitions,因此存在66%的brokers有这个主题的分区
- Under Replicated %: Percentage of partitions having a missing replica 不同步副本百分比
- Brokers Leader Skew %: Percentage of brokers having more partitions as leader than the average
kafka 至今共经历了三个版本变化
- v0 (0.10.0以前)
- v1 (0.10.0~0.11.0) 增加了 Timestamp和lz4压缩类型
- v2 (0.11.0~2.2.3) 消息格式相交于以前发生了巨大变化,新增了(ProducerId,ProducerEpoch,Headers等),编码方面(部分)也使用varint代替int等
- 目前log版本为0.10.1,如果将其升级到2.2.3 ,broker在发送响应到旧版本消费者之前转换到之前的格式,将会导致broker负载变大,为避免转换,需将日志格式保留。
- 如需升级到新版本,需先升级broker ,然后快速升级 producer和consumer发送消费新格式的消息
记录批处理的磁盘格式(0.10.0以前)
MessageSet (Version: 0) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
bit 3~7: unused
key => BYTES
value => BYTES
记录批处理的磁盘格式(0.10.0~0.11.x)
MessageSet (Version: 1) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
0: create time
1: log append time
bit 4~7: unused
timestamp =>INT64
key => BYTES
value => BYTES
记录批处理的磁盘格式(0.11以后)
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
Record:
length: varint
attributes: int8
bit 0~7: unused
timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
header:
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]
参考资料
文档信息
- 本文作者:Xinlong Zhu
- 本文链接:https://zhuxinlong.github.io/2020/08/06/kafka-logformat-upgrade/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)