Note

基于开源 kafka 2.5 版本。

如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。

本篇将针对 controller 的核心类——KafkaController 进行分析,包括 controller 最开始何被选举出来以及故障转移的、controler 如何管理集群成员、主题等。当然,controller 的作用不止这些,还有一些功能是没讲/没深入讲的,但读者可以借助本篇的内容,对 controller 在整个 kafka 中的作用做到举一反三。

zookeeper 知识

开始讲源码前先了解一下 zookeeper(以下简称 zk)的一些基本使用,因为 2.5 版本的 kafka 是依赖于 zk 对集群进行管理的,controller 作为集群的控制中枢更是离不开 zk。

zk 是一个分布式协调服务,在分布式系统中用于进行配置管理、分布式同步等,简单来说,你可以将它最核心的功能概括为提供轻量的、配置级别的存储组件。

znode 是 zk 的核心概念之一。在 zk 中,数据以一种类似文件系统的树状结构组织,树中的每个节点就称为一个 znode。你可以把它理解成 zk 中的「文件」或「目录」,比如://app/app/config/services/service1

  1. 每个 znode 可以存储数据:与传统的文件系统类似,但 znode 存储的数据大小通常不能超过 1MB(通常用于存储小型配置,而不是大块数据)。
  2. 每个 znode 可以有子节点:它类似于文件夹,可以嵌套其他 znode,构成树状结构。
  3. 临时节点(Ephemeral)会在客户端会话结束时被 zk 自动删除,持久节点(Persistent)创建后会一直存在,直到手动删除,一个节点要么是临时节点要么是持久节点。
  4. 顺序节点(Sequential)会自动在节点名后追加一个单调递增的数字,用于实现分布式锁、队列等场景。

watcher 机制

可以对某个 znode 设置观察者 watcher,一旦该节点发生变化(比如节点创建、内容变更、删除、子节点变化),客户端会收到通知。

kafka 对 zookeeper 的应用

kafka 在 zk 上使用 /controller 临时节点来表示当前集群哪个 broker 是 controller。下面这段代码展示的是一个双 broker 集群上 zk 的/controller 节点的信息:

{"version":1,"brokerid":0,"timestamp":"1585098432431"}
cZxid = 0x1a
ctime = Wed Mar 25 09:07:12 CST 2020
mZxid = 0x1a
mtime = Wed Mar 25 09:07:12 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002d3a1f0000
dataLength = 54
numChildren = 0

其中有两个需要注意的地方:

  • brokerid 是 0,表示序号为 0 的 broker 是集群 controller。
  • ephemeralOwner 字段不是0x0,说明这是一个临时节点

kafka 利用 zk 提供的 watcher 机制,每个 broker 都会监听 /controller 节点的变化随时准备成为 controller 角色,比如 controller 节点被删除,即当前集群没有 controller 了,各个 broker 就会去抢着当集群新的 controller。

总而言之,controller 的选举不需要 broker 之间相互通信,而是通过观测 zk 上的节点情况来做决定。

除了 /controller 节点外,还有存储其它元数据的节点,不再一一列举,下面遇到再说。

KafkaController 的字段

KafkaController 代表了 kafka 的 controller 实体,继承了 ControllerEventProcessor,负责各种事件的处理。下面列出了 KafkaController 一些较为重要的字段,看看就行,后面用到的话还会说,列出来这些字段只是想侧面说明 controller 负责干很多与“状态转换”有关的事情,是整个 kafka 集群的心脏。

class KafkaController(
  val config: KafkaConfig,
  // zk客户端
  zkClient: KafkaZkClient,
  // 初始broker的信息
  initialBrokerInfo: BrokerInfo,
  // 初始broker epoch
  initialBrokerEpoch: Long,
  // ...
) extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
  
  // controller元数据
  val controllerContext = new ControllerContext
  
  // controller到目标broker的请求发送类
  var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
    stateChangeLogger, threadNamePrefix)

  // 负责定期负载均衡分区leader
  private[controller] val kafkaScheduler = new KafkaScheduler(1)

  // 事件管理器
  private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
    controllerContext.stats.rateAndTimeMetrics)

  // 负责副本状态转换
  val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
    new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
  
  // 负责分区状态转换
  val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
    new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
  
  // 负责删除主题和日志
  val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
    partitionStateMachine, new ControllerDeletionClient(this, zkClient))
  
  // ...
}

除了这些字段外,还有一类字段单独拎出来介绍一下,那就是 zk 的各种监听器,借助这些监听器,controller 便能以观察者的方式轻松应对集群的各种状态变更处理。我将它们监听的东西及其主要功能都写在了字段的注释中:

// 监听controller节点的创建/变更/删除
// 处理控制器选举和切换,确保集群中只有一个活跃的控制器
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
// 监听broker数量变更
// 更新控制器中的Broker列表
private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
// 监听broker信息变更
// 当broker的配置或状态发生变化时,通知控制器更新相关信息
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
// 监听topic数量变更
// 更新控制器中的主题列表和分区分配信息
private val topicChangeHandler = new TopicChangeHandler(eventManager)
// 监听topic删除
// 启动topic删除流程
private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
// 监听partition变更
// 处理partition数量变更、副本分配变更等操作
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
// 监听partition重分配请求
// 重新分配分区的副本到不同的Broker上
private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
// 监听首选副本的选举
// 将分区leader恢复到首选副本,目的是尽可能保障broker间负载均衡
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
// 监听ISR变更
// 更新元数据并通知相关broker
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
// 监听日志路径变更
// 处理broker日志目录失败事件,如磁盘故障等,触发相应的副本状态变更
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)

需要注意的是,有些监听器是上任 controller 后才会注册,一旦卸任就会注销,比如 brokerChangeHandler,表示只有 controller 才能处理接收到的事件。但是为了防止非 controler 由于分布式竞态问题,也去处理这些事件,在大多数 processXXX 方法中都加一个 isActive 判断自己是不是 controller,不是的话就直接返回

controller 选举

接下来,我以 controller 的选举流程为例,引出 KafkaController 的一些方法的实现原理。选举与 ContorollerChangeHandler 监听器息息相关,这个监听器会触发选举流程:

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  // 节点路径字符串,即"/controller"
  override val path: String = ControllerZNode.path

  // /controller节点创建
  override def handleCreation(): Unit = eventManager.put(ControllerChange)
  // /controller节点删除
  override def handleDeletion(): Unit = eventManager.put(Reelect)
  // /controller节点数据变更
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

可以看到节点创建和变更发送的都是 ControllerChange 事件,而节点删除则是 Reelect 事件:

private def processControllerChange(): Unit = {
  maybeResign()
}

private def processReelect(): Unit = {
  maybeResign()
  elect()
}

maybeResign 是用于卸任当前 broker 的 controller 身份,因为当前 broker 不一定是 controller,所以是"maybe"。

可以看到,processReelect 的处理多了一个 elect 的调用,意思是要重新选举出新的 controller,本 broker 作为候选人当然要 elect 参与竞选。除了 /controller 节点清空会导致竞选外,在 KafkaController 启动的 startup 方法中也会发起竞选:

def startup() = {
  // ...
  eventManager.put(Startup)
  eventManager.start()
}

private def processStartup(): Unit = {
  zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
  // 发起竞选
  elect()
}

ok,下面先看看卸任的逻辑:

// 根据上次从zk读取的controllerId,判断当前broker是否controller
def isActive: Boolean = activeControllerId == config.brokerId

private def maybeResign(): Unit = {
  val wasActiveBeforeChange = isActive
  zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
  activeControllerId = zkClient.getControllerId.getOrElse(-1)
  // 如果之前是controller,但现在不是了,就执行卸任逻辑
  if (wasActiveBeforeChange && !isActive) {
    onControllerResignation()
  }
}

private def onControllerResignation(): Unit = {
  debug("Resigning")
  // 注销一些zk监听器
  zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
  zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
  zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
  zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
  unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)

  // 关闭负责定期重新平衡分区leader的调度器
  kafkaScheduler.shutdown()
  // 重置统计字段
  offlinePartitionCount = 0
  preferredReplicaImbalanceCount = 0
  globalTopicCount = 0
  globalPartitionCount = 0
  topicsToDeleteCount = 0
  replicasToDeleteCount = 0
  ineligibleTopicsToDeleteCount = 0
  ineligibleReplicasToDeleteCount = 0

  // 关闭负责定期清理过期委托令牌的调度器
  if (tokenCleanScheduler.isStarted)
    tokenCleanScheduler.shutdown()

  // 注销zk监听器
  unregisterPartitionReassignmentIsrChangeHandlers()
  // 关闭分区状态机
  partitionStateMachine.shutdown()
  // 注销zk监听器
  zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
  unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
  zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
  // shutdown replica state machine
  // 关闭副本状态机
  replicaStateMachine.shutdown()
  // 注销zk监听器
  zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)

  // 关闭与broker间的所有连接
  controllerChannelManager.shutdown()
  // 清空元数据
  controllerContext.resetContext()

  info("Resigned")
}

可以看到,卸任的条件为该 broker 之前是 controller 而现在不是。所谓卸任就是做了一些清理工作,包括注销监听器、重置统计字段、关闭相关组件。

继续看如何竞选,我们重点关注的是如何保证只有一个 broker 可以竞选成功,避免出现脑裂的情况(出现多个 controller):

private def elect(): Unit = {
  activeControllerId = zkClient.getControllerId.getOrElse(-1)
  // 如果当前已经选出了controller(/controller节点不为空),就不再参与竞选
  if (activeControllerId != -1) {
    debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
    return
  }

  try {
    // 尝试注册为controller
    // 如果注册失败(比如其它broker已经抢先注册),则会抛出ControllerMovedException异常
    val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
    // 更新epoch
    controllerContext.epoch = epoch
    controllerContext.epochZkVersion = epochZkVersion
    // 更新activeControllerId为当前brokerId
    activeControllerId = config.brokerId

    // 执行上任后的一些操作
    onControllerFailover()
  } catch {
    case e: ControllerMovedException =>
      // 已经有controller,本broker卸任
      maybeResign()

    case t: Throwable =>
      // 遇到未知异常,卸任当前controller角色,重新选一个controller
      triggerControllerMove()
  }
}

所以选举就是两步:

  1. 将自身注册到 zk 上,包括更新 controller epoch
  2. 执行上任后的一些操作(onControllerFailover)

我们再看看 onControllerFailover 做了什么:

private def onControllerFailover(): Unit = {
  info("Registering handlers")

  // 注册监听器
  val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
    isrChangeNotificationHandler)
  childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
  val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
  nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

  // 清理之前controller遗留的日志目录事件、ISR变更事件
  zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
  zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
  // 初始化元数据
  initializeControllerContext()
  val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
  // TopicDeletionManager负责管理主题删除的整个生命周期,包括排队、执行、失败重试等
  // 新控制器上任时,需要用最新的主题删除状态初始化它
  topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

  // 向broker集群广播最新的元数据
  sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)

  // 启动副本状态机和分区状态机
  replicaStateMachine.startup()
  partitionStateMachine.startup()

  // 此时,本broker作为controller已经准备好

  // 继续分区重分配、主题删除、首选分区副本的选举
  initializePartitionReassignments()
  topicDeletionManager.tryTopicDeletion()
  val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
  onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
  
  // 启动scheudler(用于定期负载均衡分区leader)
  kafkaScheduler.startup()
  if (config.autoLeaderRebalanceEnable) {
    scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
  }

  // 启动tokenCleanScheduler(用于定期清理过期的委托令牌)
  if (config.tokenAuthEnabled) {
    info("starting the token expiry check scheduler")
    tokenCleanScheduler.startup()
    tokenCleanScheduler.schedule(name = "delete-expired-tokens",
      fun = () => tokenManager.expireTokens,
      period = config.delegationTokenExpiryCheckIntervalMs,
      unit = TimeUnit.MILLISECONDS)
  }
}

可以看到,broker 通过向 zk 注册上任 controller 之后,还要做一堆的事情,都写在了注释里。

controller 集群成员管理

官方文档说,当我们要扩展集群的大小时,只需要给该 broker 分配唯一 brokerId,然后启动 kafka 服务端程序就 OK 了:

Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers.

其实这就是 controller 对集群的管理功能起作用了。controller 对集群的管理包括对集群成员的加入与退出、对单个成员的数据改变。对于上面说的扩展集群原理如下:

每个 broker 在启动的时候,会在 zk 的 /brokers/ids 节点下创建一个名为 broker.id 参数值的临时节点。比如 broker 的 broker.id 参数值设置为 1001,那么,当 broker 启动后,你会在 zk 的 /brokers/ids 下观测到一个名为 1001 的子节点,该节点的内容包括了 broker 配置的主机名、端口号以及所用监听器的信息。当 broker 关闭或与 zk 意外断连,该节点又会自动删除。

因此,结合 zk 的监听器,controller 就能监听 /brokers/ids 下的 broker 数量变化:

class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  override val path: String = BrokerIdsZNode.path

  override def handleChildChange(): Unit = {
    eventManager.put(BrokerChange)
  }
}

controller 对 BrokerChange 的处理如下:

private def processBrokerChange(): Unit = {
  if (!isActive) return
  // 从zk获取最新的broker元数据,从controllerContex获取本地缓存的broker元数据
  // 通过将两者比对,得知:
  // 1. 当前存活的所有broker
  // 2. 加入集群的broker
  // 3. 退出集群的broker
  // 4. 重启过的broker
  val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
  val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
  val curBrokerIds = curBrokerIdAndEpochs.keySet
  val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
  val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
  val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
  val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
    .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
  val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
  val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
  val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
  val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
  val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
  val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted

  // 连接新broker
  newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
  // 重新连接重启的broker
  bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
  bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
  // 断开已退出的broker
  deadBrokerIds.foreach(controllerChannelManager.removeBroker)
  
  // 记录新broker到缓存的元数据中,回调onBrokerStartup
  if (newBrokerIds.nonEmpty) {
    controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
    onBrokerStartup(newBrokerIdsSorted)
  }
  // 重新记录重启的broker到缓存的元数据中,回调onBrokerFailure与onBrokerStartup
  if (bouncedBrokerIds.nonEmpty) {
    controllerContext.removeLiveBrokers(bouncedBrokerIds)
    onBrokerFailure(bouncedBrokerIdsSorted)
    controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
    onBrokerStartup(bouncedBrokerIdsSorted)
  }
  // 从缓存的元数据中移除旧broker,回调onBrokerFailure
  if (deadBrokerIds.nonEmpty) {
    controllerContext.removeLiveBrokers(deadBrokerIds)
    onBrokerFailure(deadBrokerIdsSorted)
  }
}

也就是说,当 broker 集群成员发生变动,controller 将会:

  1. 建立/断开与 broker 的连接
  2. 更新本地元数据缓存
  3. 回调 onBrokerFailure/onBrokerStartup

下面继续追踪回调函数干了什么:

// onBrokerStartup主要干了三件事情:
// 1. 通知broker更新元数据
// 2. 通知分区和副本状态机有副本/分区上线,触发状态变更
// 3. 恢复分区重分配以及主题删除流程
private def onBrokerStartup(newBrokers: Seq[Int]): Unit = {
  // 新broker不应该有离线目录,清除这些离线目录
  newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
  val newBrokersSet = newBrokers.toSet
  val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
  // 通知已有的broker更新broker列表
  sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
  // 通知新broker更新broker列表,以及所有主题分区信息
  sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)
  // 通知副本状态机这些新broker上的分区处于在线状态,这会启动这些副本的高水位线线程
  val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
  replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
  // 触发分区状态机,尝试将处于新建或离线状态的分区转为在线状态。这可能会导致新broker成为某些分区的leader
  partitionStateMachine.triggerOnlinePartitionStateChange()
  // 恢复分区重分配
  maybeResumeReassignments { (_, assignment) =>
    assignment.targetReplicas.exists(newBrokersSet.contains)
  }
  // 恢复主题删除
  val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
  if (replicasForTopicsToBeDeleted.nonEmpty) {
    topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
  }
  // 为新broker注册BrokerModifications监听器
  registerBrokerModificationsHandler(newBrokers)
}

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {
  // 不再记录下线broker的离线目录信息
  deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
  val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  if (deadBrokersThatWereShuttingDown.nonEmpty)
  val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
  // 处理受控关闭并最终下线的broker上的分区副本
  onReplicasBecomeOffline(allReplicasOnDeadBrokers)
	// 注销下线broker的BrokerModifications监听器
  unregisterBrokerModificationsHandler(deadBrokers)
}

我们继续跟踪对那些离线副本的处理:

private def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
  // 离线副本分为两类,处理方式不同
  // 1. 属于待删除主题
  // 2. 属于正常主题
  val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
    newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))

  // 找出当前「leader所在broker已经下线并且其所属主题不是待删除」的分区,然后:
  // 1. 将这些分区标记为离线状态(OfflinePartition),表示当前分区没有可用leader
  // 2. 触发分区状态机,尝试将这些离线分区转为在线状态,实际上就是为这些分区选举新leader以恢复可用性
  val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
    !controllerContext.isReplicaOnline(partitionAndLeader._2.leaderAndIsr.leader, partitionAndLeader._1) &&
      !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
  partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
  partitionStateMachine.triggerOnlinePartitionStateChange()
  
  // 将属于正常主题的副本从ISR中移除,并将它们状态变更为OfflineReplica
  replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)

  // 将属于待删除主题的副本标记为删除失败状态,避免无限重试删除
  if (newOfflineReplicasForDeletion.nonEmpty) {
    topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
  }

  // 如果没有分区需要进行leader重新选举,更新其它broker的元数据(更新broker列表)
  // 否则不用发,因为在leader选举过程中已经发送了元数据更新
  if (partitionsWithoutLeader.isEmpty) {
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
  }
}

上面介绍的是 broker 的增删,而下面来说说 broker 本身内容变更的处理。细心的读者会注意到,当 broker 新增的时候会注册 BrokerModificationsHandler 这个监听器,下线的时候又会给注销掉。没错,这个监听器就是用来监听 broker 自身信息的变更:

class BrokerModificationsHandler(eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
  override val path: String = BrokerIdZNode.path(brokerId)

  override def handleDataChange(): Unit = {
    eventManager.put(BrokerModifications(brokerId))
  }
}

本来 ZNodeChangeHandler 中有三种事件可以处理,分别是节点创建、删除以及变更。但 BrokerModificationsHandler 只关心节点内容变更(因为创建和删除事件其实就是 broker 的增删,在 BrokerChangeHandler 中已经统一进行处理了)。再重申一遍,这里说的节点就是 zk 上 /broker/ids/{brokerId} 数据节点。

private def processBrokerModification(brokerId: Int): Unit = {
  if (!isActive) return
  // 分别获取新旧broker的连接信息
  val newMetadataOpt = zkClient.getBroker(brokerId)
  val oldMetadataOpt = controllerContext.liveOrShuttingDownBroker(brokerId)
  if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
    val oldMetadata = oldMetadataOpt.get
    val newMetadata = newMetadataOpt.get
    // 如果连接信息发生变更,则更新元数据,并且回调onBrokerUpdate进行处理
    if (newMetadata.endPoints != oldMetadata.endPoints) {
      controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
      onBrokerUpdate(brokerId)
    }
  }
}

private def onBrokerUpdate(updatedBrokerId: Int): Unit = {
  // 通知其它broker更新元数据
  sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}

比较简单,所谓成员信息管理只是更新一下元数据。

Warning

但是有个问题!这里 broker 的 endpoints 变了,我觉得也应该去更新一下 ControllerChannelManager 中维护的该 broker 的连接信息,否则 ControllerChannelManager 不就会一直重试往旧 endpoints 发消息了吗?进一步地说,如果真存在这个设计缺陷,是不是就得重启 controller 来解决了。蹲个好心人解答一下

controller 主题管理

除了维护集群成员之外,controller 还有一个重要的任务,那就是对所有主题进行管理,主要包括主题的创建、变更与删除。

一共涉及到两个监听器:

// 负责主题的创建和变更
class TopicChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // 节点/brokers/topics
  override val path: String = TopicsZNode.path

  override def handleChildChange(): Unit = eventManager.put(TopicChange)
}

// 负责主题的删除
class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // 节点/admin/delete_topics
  override val path: String = DeleteTopicsZNode.path

  override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
}

先来看下主题的创建和变更:

private def processTopicChange(): Unit = {
  if (!isActive) return
  val topics = zkClient.getAllTopicsInCluster
  // 新增的topic
  val newTopics = topics -- controllerContext.allTopics
  // 删除的topic
  val deletedTopics = controllerContext.allTopics -- topics
  controllerContext.allTopics = topics

  // 为新topic注册PartitionModifications监听器
  registerPartitionModificationsHandlers(newTopics.toSeq)
  val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
  deletedTopics.foreach(controllerContext.removeTopic)
  // 更新缓存元数据
  addedPartitionReplicaAssignment.foreach {
    case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
  }
  // 如果有新增的副本,回调onNewPartitionCreation
  if (addedPartitionReplicaAssignment.nonEmpty)
    onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}

继续看看 onNewPartitionCreation:

private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
  // 通知分区状态机有新分区加入
  partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
  // 通知副本状态机有新副本加入
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
  // 通知分区状态机让新分区上线,并选举leader
  partitionStateMachine.handleStateChanges(
    newPartitions.toSeq,
    OnlinePartition,
    Some(OfflinePartitionLeaderElectionStrategy(false))
  )
  // 通知副本状态机让新副本上线
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}

可以看到,TopicChange 主要是处理 topic 数量的变更,具体包括更新元数据以及通知分区/副本状态机。

状态机这个东西之前也出现了好几次,目前只需要知道,当分区和副本只要处于“上线”状态就能正常工作即可。

下面再看看 topic 删除的处理:

private def processTopicDeletion(): Unit = {
  if (!isActive) return
  // 获取被删除的topic
  var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
  // 忽略不存在的topic
  val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
  if (nonExistentTopics.nonEmpty) {
    zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
  }
  topicsToBeDeleted --= nonExistentTopics
  if (config.deleteTopicEnable) {
    if (topicsToBeDeleted.nonEmpty) {
      // 遍历并删除topic
      topicsToBeDeleted.foreach { topic =>
        // 将正在重分配的topic标记为"ineligible for deletion"
        val partitionReassignmentInProgress =
          controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
        if (partitionReassignmentInProgress)
          topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
            reason = "topic reassignment in progress")
      }
      // 将待删除topic入队,交由TopicDeletionManager处理
      topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
  } else {
    // 用户已经配置了禁止删除topic
    info(s"Removing $topicsToBeDeleted since delete topic is disabled")
    zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
  }
}

总结

总的来说,controller 利用了 zk 的观察者模式,通过设置节点监听器,来观测集群中各种事件的发生,然后将他们抽象成一个个的事件(ControllerEvent),放到事件队列中最终单线程地一个个去处理。

在 controller 工作过程中(这里特指 KafkaController 这个类的工作过程中),用到了很多的组件,比如之前介绍到的 ControllerChannelManager、ControllerContext、ControllerEventManager,这些组件必须与 controller 组件紧耦合在一起才能实现各自的功能。

但 controller 也使用到了一些功能相对独立的组件,比如 TopicDeletionManager、ReplicaStateMachine、PartitionStateMachine,后续将会一一深入分析。

参考

极客时间《Kafka核心源码解读》——胡夕