Kafka 源码学习:日志加载与恢复

本文梳理主要梳理 Kafka 日志加载与恢复的源码。(Kafka 版本:2.8)

日志管理:LogManager

LogManager 是 kafka 日志管理子系统的入口点。负责日志的创建、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中维护日志。在日志最少的数据目录中创建新日志。事后不会尝试移动分区或根据大小或 I/O 速率进行平衡。后台线程通过定期截断多余的日志段来处理日志保留。

LogManger 的启动主要包括三个部分:

  1. 日志加载与恢复,即:loadLogs
  2. 各个定时任务启动,主要包括:
    a. cleanupLogs:根据保留时间和保留大小进行历史 segment 的清理
    b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
    c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
    d. checkpointLogStartOffsets:将所有日志的当前日志开始偏移量写到日志目录中的文本文件中,以避免暴露已被 DeleteRecordsRequest 删除的数据
    e. deleteLogs:定时删除标记为 delete 的日志文件
  3. 启动 LogCleaner,负责进行日志 compaction

本文主要对第一部分日志加载与恢复进行梳理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// visible for testing
private[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = {
loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean

/* Schedule the cleanup task to delete old logs */
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
_cleaner.startup()
}
}

全部日志加载与恢复:loadLogs

所有日志的加载与恢复的流程主要包含以下几步:

  1. 加载并记录日志文件夹中标志状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
  2. 并发对每个 tp 的日志进行加载与恢复(下一小节详解)
  3. 记录并异步处理有问题的日志文件夹
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    /**
    * Recover and load all logs in the given data directories
    */
    private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {
    // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,但是不一定所有的磁盘都是可用的
    info(s"Loading logs from log dirs $liveLogDirs")
    val startMs = time.hiResClockMs()
    val threadPools = ArrayBuffer.empty[ExecutorService]
    val offlineDirs = mutable.Set.empty[(String, IOException)]
    val jobs = ArrayBuffer.empty[Seq[Future[_]]]
    var numTotalLogs = 0

    // 遍历所有的磁盘,进行日志加载与恢复,如果出现 IOException,则将该目录记录到 offlineDirs 中进行后续处理
    for (dir <- liveLogDirs) {
    val logDirAbsolutePath = dir.getAbsolutePath
    var hadCleanShutdown: Boolean = false
    try {
    val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
    threadPools.append(pool)

    // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。
    val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
    if (cleanShutdownFile.exists) {
    info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
    // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
    // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
    Files.deleteIfExists(cleanShutdownFile.toPath)
    hadCleanShutdown = true
    } else {
    // log recovery itself is being performed by `Log` class during initialization
    info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
    }

    // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
    var recoveryPoints = Map[TopicPartition, Long]()
    try {
    recoveryPoints = this.recoveryPointCheckpoints(dir).read()
    } catch {
    case e: Exception =>
    warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
    s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
    }

    // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset
    var logStartOffsets = Map[TopicPartition, Long]()
    try {
    logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
    } catch {
    case e: Exception =>
    warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
    s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
    }

    // 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog
    val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
    logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
    val numLogsLoaded = new AtomicInteger(0)
    numTotalLogs += logsToLoad.length

    val jobsForDir = logsToLoad.map { logDir =>
    val runnable: Runnable = () => {
    try {
    debug(s"Loading log $logDir")

    val logLoadStartMs = time.hiResClockMs()
    val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)
    val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
    val currentNumLoaded = numLogsLoaded.incrementAndGet()

    info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +
    s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
    } catch {
    case e: IOException =>
    offlineDirs.add((logDirAbsolutePath, e))
    error(s"Error while loading log dir $logDirAbsolutePath", e)
    }
    }
    runnable
    }

    jobs += jobsForDir.map(pool.submit)
    } catch {
    case e: IOException =>
    offlineDirs.add((logDirAbsolutePath, e))
    error(s"Error while loading log dir $logDirAbsolutePath", e)
    }
    }

    try {
    // 等待所有并发执行的日志加载流程执行完成
    for (dirJobs <- jobs) {
    dirJobs.foreach(_.get)
    }
    // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作
    offlineDirs.foreach { case (dir, e) =>
    logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)
    }
    } catch {
    case e: ExecutionException =>
    error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
    throw e.getCause
    } finally {
    threadPools.foreach(_.shutdown())
    }

    info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
    }

单 tp 日志加载与恢复

单个 tp 的日志加载与恢复是在 Log 类的静态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,加入到 logsToBeDeleted 集合中等待定时任务对其进行清理。
Log 类的静态代码块中通过 loadSegments 加载日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private def loadSegments(): Long = {
// 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件
val swapFiles = removeTempFilesAndCollectSwapFiles()

// retryOnOffsetOverflow 兜住可能发生的 LogSegmentOffsetOverflowException 异常,并进行日志切分处理。
retryOnOffsetOverflow {
// 加载文件的中的所有文件并进行必要的完整性检查
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()
}

// 根据 swap 文件恢复完成所有被中断的操作
completeSwapOperations(swapFiles)

// 如果不是待删除的 tp 日志,执行 recover 流程
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}

// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
initFileSize = this.initFileSize))
}
0
}
}

recoverLog 的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// if we have the clean shutdown marker, skip recovery
// 只有未进行 cleanshutdown 的情况下才需要 recovery
if (!hadCleanShutdown) {
// 取出 recoveryPoint 之后的所有 segment(正常情况下只有一个)
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
var truncated = false

while (unflushed.hasNext && !truncated) {
val segment = unflushed.next()
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
// 清空 segment 对应的 index,逐个 batch 读取校验数据,并重新构造index
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
s"creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) {
// 如果前一个 segment 执行了 truncate, 则之后的所有 segment 直接删除
// unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全部 segment
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
removeAndDeleteSegments(unflushed.toList,
asyncDelete = true,
reason = LogRecovery)
truncated = true
}
}
}