Kafka 源码学习:动态配置

Kafka 的动态配置基于 Zookeeper 实现,本文主要梳理了 Kafka(version:2.8) 中动态配置的实现逻辑。

背景信息

在 Kafka 中,Zookeeper 客户端没有使用常见的客户端工具(如:Curator),而是直接基于原生的客户端实现了自己的 KafkaZkClient,将一些通用或特有的 Zookeeper 操作封装在内。因此,关于 Zookeeper 的使用及回调等逻辑也完全是独立实现的。另外,由于 Zookeeper 中一个节点下的 Watcher 顺序触发,如果同一个节点下有大量的 Watcher,将会产生性能瓶颈。下面将基于这些背景信息来介绍 Kafka 是如何基于 Zookeeper 实现高效的动态配置管理的。

Kafka 动态配置结构

Kafka 动态配置 Zookeeper 目录结构

在当前版本的 Kafka 中,动态配置类型有 5 种:topic, client, user, broker, ip。Kafka 动态配置的目录结构为: /config/entityType/entityName ,entityType 代表配置的类型,entityName 代表具体某个实体,比如:Topic 类型对应的实体就是具体某个 Topic,某个配置类型下所有实体的默认配置的 entityName 为 <deafult> (topic 类型没有 <default> 配置,通过 broker 类型来修改 LogConfig 的默认配置)。具体来说,所有 Topic 的默认动态配置会放在 /config/topic/<default> 的节点信息中,Topic AAA 的所有动态配置会放在 /config/topic/AAA 的节点信息中。

Listener 实现

下面介绍 Kafka 中 Zookeeper Listener 的实现。既然是基于 Zookeeper 实现,必然少不了 Zookeeper 的 Watcher 通知机制,但是,如背景信息中所说,在 Watcher 数量过多的情况下,会存在性能瓶颈。以 Topic 配置变更为例,在生产环境中,一个 Topic 的 Partition 数量可能多达上千,如果每个 Partition Leader 都去监听这个 Topic 配置信息,那么在一个 Kafka 集群内,仅监听 Topic 配置的 Watcher 就会有上万个甚至更多。Kafka 通过独立的通知机制来避免了这一问题,即:每次 AdminClient 进行配置变更时,会在 /config/changes/ 目录下创建以 config_change_ 为前缀的顺序节点,Wather 只监听 /config/changes/ 目录的孩子节点变化,所以对于动态配置来说,所有 Broker 只监听 /config/changes/ 这一个目录,大大减少集群整体的 Watcher 数量。

Kafka 中动态配置的 Zookeeper Listener 的实现在 ZkNodeChangeNotificationListener 类中,该类监听指定目录下的顺序节点添加动作,在收到子节点变化通知后,ZkNodeChangeNotificationListener 一方面执行通知动作,通知对应的 Handler 处理配置变更,另一面会清除所有已经处理过的配置变更。

下面对 ZkNodeChangeNotificationListener 类的实现进行介绍,主要分为以下几个部分:
a. 初始化:注册 zk 连接状态变更的 Handler 和 zk 子节点变更的 Handler;调用一次 addChangeNotification() 触发一次配置变更的处理,用来初始化动态配置;启动用来处理配置变更事件的线程 ChangeEventProcessThread
b. 配置变更处理:每次 zk 状态变更或者动态配置变更都会向 queue 中放入一个处理事件,与此同时,ChangeEventProcessThread 会持续不断的从 queue 中取出事件,执行对应的处理动作,即:processNotifications()
c. 清除过期通知:每次执行完 processNotifications(),都会调用 purgeObsoleteNotifications 执行过期通知的清理动作,删除所有进行本次 processNotifications() 之前创建的所有变更通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val queue = new LinkedBlockingQueue[ChangeNotification]
private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
private val isClosed = new AtomicBoolean(false)

def init(): Unit = {
// ZkStateChangeHandler 和 ChangeNotificationHandler 都是 addChangeNotification()
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
addChangeNotification()
thread.start()
}

def close() = {
···
}

/**
* Process notifications
*/
private def processNotifications(): Unit = {
try {
val notifications = zkClient.getChildren(seqNodeRoot).sorted
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
val now = time.milliseconds
for (notification <- notifications) {
val changeId = changeNumber(notification)
// 只处理更新的变更信息
if (changeId > lastExecutedChange) {
// 调用 notificationHandler.processNotification() 进行配置变更的处理
processNotification(notification)
lastExecutedChange = changeId
}
}
purgeObsoleteNotifications(now, notifications)
}
} catch {
···
}
}

···
···

private def addChangeNotification(): Unit = {
if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification)
}

class ChangeNotification {
def process(): Unit = processNotifications()
}

private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]): Unit = {
for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
val (data, stat) = zkClient.getDataAndStat(notificationNode)
if (data.isDefined) {
if (now - stat.getCtime > changeExpirationMs) {
debug(s"Purging change notification $notificationNode")
zkClient.deletePath(notificationNode)
}
}
}
}

/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = queue.take().process()
}
···
···
}

Handler 实现

上面介绍了配置变更通知是如何接收的,实际的处理在 NotificationHandler.processNotification() 中进行,对于动态配置来说, NotificationHandler 接口的实现是类 ConfigChangedNotificationHandler, ConfigChangedNotificationHandlerprocessNotification 会根据配置变更通知版本对配置变更通知内容进行解析,然后调用对应的类型的 ConfigHandler 进行配置更新。

  1. 在当前版本中的配置变更通知分为 version1 和 version2 两个版本,不同版本格式不同,以某个 topic 下的某个 clientId 的动态配置为例,version1 的内容为 {"version" : 1, "entity_type":"topic/client", "entity_name" : "<topic_name>/<client_id>"}, version2 的内容为 {"version" : 2, "entity_path":"topic/<topic_name>/client/<client_id>"}
  2. 所有动态配置有默认的 entity_name: <default>,当 entity_name 为 <default> ,表示所有 entity 的默认配置,例如: /config/topics/<default> 中的配置表示所有 topic 的默认动态配置。

下面对 ConfigHandler 的实现类进行简单介绍:

  1. TopicConfigHandler:
    主要处理 3 类配置:
    a. LogManager 中管理的 topic 配置
    b. 副本限流配置
    c. controller 中动态开关配置 "unclean.leader.election.enable"

  2. ClientIdConfigHandler 和 UserConfigHandler:
    都继承自 QuotaConfigHandler,用来更新客户端侧的限流配置,ClientIdConfigHandler 负责 client id 维度的限流配置更新, UserConfigHandler 用来负责用户维度的限流配置更新

  3. IpConfigHandler:
    负责连接维度 ConnectionQuotas 的限流配置更新

  4. BrokerConfigHandler:
    一方面负责 broker 相关的 quota 配置,另一方面负责 broker 动态配置的更新。broker 的动态配置逻辑在类 DynamicBrokerConfig 中实现,主要逻辑是根据以下优先级顺序进行 broker 配置的更新和覆盖:
    a. DYNAMIC_BROKER_CONFIG:存储在 ZK 中的 /configs/brokers/{brokerId}
    b. DYNAMIC_DEFAULT_BROKER_CONFIG:存储在 ZK 中的 /configs/brokers/<default>
    c. STATIC_BROKER_CONFIG:broker 启动配置,通常来自 server.properties 文件
    d. DEFAULT_CONFIG:KafkaConfig 中硬编码的默认配置

其它补充

在 broker 初始化 partition 的过程中,该 topic 的配置可能会发生变化,为了避免漏掉这部分配置的更新,会在 createLog 过程中记录配置变更的情况,在 createLog 结束后处理这部分配置的更新, 具体可以参考:https://issues.apache.org/jira/browse/KAFKA-8813