Kafka 源码学习:日志加载与恢复
本文梳理主要梳理 Kafka 日志加载与恢复的源码。(Kafka 版本:2.8)
日志管理:LogManager
LogManager 是 kafka 日志管理子系统的入口点。负责日志的创建、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中维护日志。在日志最少的数据目录中创建新日志。事后不会尝试移动分区或根据大小或 I/O 速率进行平衡。后台线程通过定期截断多余的日志段来处理日志保留。
LogManger 的启动主要包括三个部分:
- 日志加载与恢复,即:loadLogs
- 各个定时任务启动,主要包括:
a. cleanupLogs:根据保留时间和保留大小进行历史 segment 的清理
b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
d. checkpointLogStartOffsets:将所有日志的当前日志开始偏移量写到日志目录中的文本文件中,以避免暴露已被 DeleteRecordsRequest 删除的数据
e. deleteLogs:定时删除标记为 delete 的日志文件 - 启动 LogCleaner,负责进行日志 compaction
本文主要对第一部分日志加载与恢复进行梳理。
1 | // visible for testing |
全部日志加载与恢复:loadLogs
所有日志的加载与恢复的流程主要包含以下几步:
- 加载并记录日志文件夹中标志状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
- 并发对每个 tp 的日志进行加载与恢复(下一小节详解)
- 记录并异步处理有问题的日志文件夹
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107/**
* Recover and load all logs in the given data directories
*/
private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {
// 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,但是不一定所有的磁盘都是可用的
info(s"Loading logs from log dirs $liveLogDirs")
val startMs = time.hiResClockMs()
val threadPools = ArrayBuffer.empty[ExecutorService]
val offlineDirs = mutable.Set.empty[(String, IOException)]
val jobs = ArrayBuffer.empty[Seq[Future[_]]]
var numTotalLogs = 0
// 遍历所有的磁盘,进行日志加载与恢复,如果出现 IOException,则将该目录记录到 offlineDirs 中进行后续处理
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
try {
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool)
// 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
Files.deleteIfExists(cleanShutdownFile.toPath)
hadCleanShutdown = true
} else {
// log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
}
// 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
}
// 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset
var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
}
// 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
val numLogsLoaded = new AtomicInteger(0)
numTotalLogs += logsToLoad.length
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
try {
debug(s"Loading log $logDir")
val logLoadStartMs = time.hiResClockMs()
val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val currentNumLoaded = numLogsLoaded.incrementAndGet()
info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
} catch {
case e: IOException =>
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
}
runnable
}
jobs += jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
}
try {
// 等待所有并发执行的日志加载流程执行完成
for (dirJobs <- jobs) {
dirJobs.foreach(_.get)
}
// 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
throw e.getCause
} finally {
threadPools.foreach(_.shutdown())
}
info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
}
单 tp 日志加载与恢复
单个 tp 的日志加载与恢复是在 Log 类的静态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,加入到 logsToBeDeleted 集合中等待定时任务对其进行清理。
Log 类的静态代码块中通过 loadSegments 加载日志
1 | private def loadSegments(): Long = { |
recoverLog 的核心代码如下:
1 | // if we have the clean shutdown marker, skip recovery |