2.3 日志删除
与其他消息中间件不同的是,Kafka集群中的消息不会因为消费与否而删除,跟日志一样消息最终会落盘,并提供对应的策略周期性(通过参数log.retention.check.interval.ms来设置,默认为5分钟)执行删除或者压缩操作(broker配置文件log.cleanup.policy
参数如果为“delete”则执行删除操作,如果为“compact”则执行压缩操作,默认为“delete”)。
2.3.1 基于时间的日志删除
参数 | 默认值 | 说明 |
---|---|---|
log.retention.hours | 168 | 日志保留时间(小时) |
log.retention.minutes | 无 | 日志保留时间(分钟),优先级大于小时 |
log.retention.ms | 无 | 日志保留时间(毫秒),优先级大于分钟 |
当消息在集群保留时间超过设定阈值(log.retention.hours,默认为168小时,即七天),则需要进行删除。这里会根据分片日志的最大时间戳来判断该分片的时间是否满足删除条件,最大时间戳首先会选取时间戳索引文件中的最后一条索引记录,如果对应的时间戳值大于0则取该值,否则为最近一次修改时间。
这里不直接选取最后修改时间的原因是避免分片日志的文件被无意篡改而导致其时间不准。
如果恰好该分区下的所有日志分片均已过期,那么会先生成一个新的日志分片作为新消息的写入文件,然后再执行删除参数。
2.3.2 基于空间的日志删除
参数 | 默认值 | 说明 |
---|---|---|
log.retention.bytes | 1073741824(即1G),默认未开启,即无穷大 | 日志文件总大小,并不是指单个分片的大小 |
log.segment.bytes | 1073741824(即1G) | 单个日志分片大小 |
首先会计算待删除的日志大小diff
(totalSize-log.rentention.bytes),然后从最旧的一个分片开始查看可以执行删除操作的文件集合(如果diff-segment.size>=0
,则满足删除条件),最后执行删除操作。
2.3.3 基于日志起始偏移量的日志删除
一般情况下,日志文件的起始偏移量(logStartOffset)会等于第一个日志分段的baseOffset,但是其值会因为删除消息请求而增长,logStartOffset的值实际上是日志集合中的最小消息,而小于这个值的消息都会被清理掉。如上图所示,我们假设logStartOffset=7421048,日志删除流程如下:
-
从最旧的日志分片开始遍历,判断其下一个分片的baseOffset是否小于或等于logStartOffset值,如果满足,则需要删除,因此第一个分片会被删除。
-
分片二的下一个分片baseOffset=6506251<7421048,所以分片二也需要删除。
-
分片三的下一个分片baseOffset=9751854>7421048,所以分片三不会被删除。