Note

基于开源 kafka 2.5 版本。

如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。

上一篇简单过了一下 kafka 网络层中的请求队列,本章开始上点干货,真正看一下 kafka 的网络层是如何运作的。本篇的解析重点在于 SocketServer 中的 Acceptor 以及 Processor 线程。

总览

从 broker 服务启动开始看起。从 Kafka.scala 的 main 函数开始,会调用到 KafkaServer.startup 函数,这个函数负责启动一堆的组件(比如 zk 客户端、log、group coordinator),其中 socketServer 这个组件的构造与启动如下:

socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

这个 SocketServer 是 kafka 网络通信中重中之重的组件,实现与客户端之间的高性能通信。

比如说,当Broker处理速度很慢、需要优化的时候,你只有明确知道SocketServer组件的工作原理,才能制定出恰当的解决方案,并有针对性地给出对应的调优参数。

结合上一篇对 RequestChannel 的分析,这里可以直接给出 SocketServer对一个网络请求及其响应的流转过程,为了便于新手理解全貌,这里给出的是简化版,如图所示:

image-20250613102901361

图中最左边的小人表示客户端,可以是 producer、consumer 或 broker。 KafkaRequestHandlerPool 是真正处理请求逻辑的线程池。具体地,它会调用 KafkaApis.handle 对不同 API 进行不同的处理逻辑。目前对这个组件有个印象即可不必深究,我们的重点还是在上面的 SocketServer 上。

图中的 Acceptor 线程用于接收 TCP 连接建立请求,是整个网络通信层的门面。Processor 线程之前说过,TCP 连接的建立、以及所有请求和响应由一个 Processor 线程全权负责接收和发送。可见,Acceptor 加上 Processor 就是网络编程中经典的 reactor 模式。下面将分析这两个类的源码。

Acceptor

Acceptor 作为 reactor 模式中 dispatcher 的角色,负责接收新的连接请求然后分发给 Processor 线程去处理。先看一下类定义:

private[kafka] class Acceptor(
  // Acceptor使用endpoint创建服务端socket,比如PLAINTEXT://127.0.0.1:9092
  val endPoint: EndPoint,
  // TCP发送缓冲区的大小(outbound)
  val sendBufferSize: Int,
  // TCP接收缓冲区的大小(inbound)
  val recvBufferSize: Int,
  // 当前broker的id
  brokerId: Int,
  // 控制最大连接数
  connectionQuotas: ConnectionQuotas,
  metricPrefix: String
) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
	// NIO中的Selector,负责监听连接创建、读写事件等
  private val nioSelector = NSelector.open()
  // NIO中的ServerSocketChannel,后续会注册到nioSelector
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  // Processor线程池
  private val processors = new ArrayBuffer[Processor]()
  private val processorsStarted = new AtomicBoolean
  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))

AcceptorProcessor 都继承自 AbstractServerThread,定义了一些公有方法比如 shutdown,这个类实际上是一个 Runnable 类。

Acceptor 成员中的的 sendBufferSizerecvBufferSize 用于设置 socket 的发送和接收缓冲区大小,可以分别通过 broker 端的 socket.send.buffer.bytessocket.receive.buffer.bytes 参数进行配置,默认是 100KB。另外说个题外话,这里引出了一个生产环境的调优点:在生产环境中,如果客户端和 broker 的网络通信延迟比较大(比如RTT>10ms),这样一来带宽延迟乘积(Bandwidth delay product, BDP)就比较大,那么可以将他们的两个缓冲区大小调高些,以提高数据吞吐量,因为 BDP 表示了网络承载能力(想象一下网络是一根水管,带宽是每秒的水量,延迟是端到端的延迟,那么 BDP 就是已发出但还未到达对端的总水量),最大接收窗口就表示了网络承载能力内可以不经确认发出的报文。

Acceptor 中的 connectionQuotas 用于管理连接配额,简单来说包括:限制每个 IP 地址的最大连接数、整个 broker 的最大连接数、为不同监听器(listener)设置不同的连接限制,并且支持动态更新连接配额配置。

Acceptor 成员中的 nioSelectorserverChannel 就不在这儿介绍了,对 Java NIO 这块不熟悉的话建议阅读 Java NIO,大道至简,非常赞的教程。

Acceptor 中的 processors 在上一篇说过,网络线程池,在此也不过多介绍。

下面继续分析 Acceptor 的各个功能方法,先看下对 processor 的新增和移除操作:

private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
  // 将一批processor添加到线程池
  processors ++= newProcessors
  // 启动这批新添加的processor
  if (processorsStarted.get)
    startProcessors(newProcessors, processorThreadPrefix)
}

// 启动一批processor
private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
  processors.foreach { processor =>
    // 每个processor是一个runnable,开启一个新的thread去运行runnable
    KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
      processor).start()
  }
}

可以看到添加 processor 到线程池其实就是启动这批 processor 线程。

private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
  val toRemove = processors.takeRight(removeCount)
  // 移除一批processor
  processors.remove(processors.size - removeCount, removeCount)
  // 关闭这批processor
  toRemove.foreach(_.shutdown())
  // requestChannel也引用了processor,也需要移除引用
  toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}

// Processor类
override def shutdown(): Unit = {
  super.shutdown()
  removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
  metrics.removeMetric(expiredConnectionsKilledCountMetricName)
}

// AbstractServerThread类
def shutdown(): Unit = {
  if (alive.getAndSet(false))
    wakeup() // 唤醒selector
  // ???
  shutdownLatch.await()
}

可以看到移除 processor 将会调用 shutdown 唤醒阻塞在 poll 上的 selector 关闭 processor。最后还有一个 shutdownLatch.await(),后面会说到。

上面简单分析了 Acceptor 对 processor 的管理相关方法。作为一个 Runnable,我们重点需要分析的其实是 run 方法,即线程执行的代码:

def run(): Unit = {
  // selector注册accept事件
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  // Acceptor优雅启动完毕
  startupComplete()
  try {
    var currentProcessorIndex = 0
    
    // 循环不断获取新连接
    while (isRunning) {
      try {
        // 获取就绪事件
        // 最多等500ms,以便当isRunning=false的时候及时结束循环
        val ready = nioSelector.select(500)
        if (ready > 0) {
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          // 处理这些已就绪的事件
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()
              if (key.isAcceptable) {
                accept(key).foreach { socketChannel =>
                  var retriesLeft = synchronized(processors.length)
                  var processor: Processor = null
                  do {
                    retriesLeft -= 1
                    // Acceptor使用round-robin轮询算法,选择下一个processor处理该连接
                    processor = synchronized {
                      currentProcessorIndex = currentProcessorIndex % processors.length
                      processors(currentProcessorIndex)
                    }
                    currentProcessorIndex += 1
                    // 如果processor的内部队列已经满了,并且不阻塞的话,继续尝试下一个processor
                  } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                }
              } else
                // Acceptor只负责处理accept事件
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
            } catch {
              case e: Throwable => error("Error while accepting connection", e)
            }
          }
        }
      }
      catch {
        case e: ControlThrowable => throw e
        // 对异常静默处理,继续接收新的请求
        case e: Throwable => error("Error occurred", e)
      }
    }
  } finally {
    debug("Closing server socket and selector.")
    // 关闭selector和channel
    CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
    CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
    // Acceptor优雅关闭完毕
    shutdownComplete()
  }
}

private def accept(key: SelectionKey): Option[SocketChannel] = {
  // 获取SocketChannel
  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  val socketChannel = serverSocketChannel.accept()
  try {
    // 如果是总连接数超过最大限制或者listener的连接数超过最大限制,一般是临时的资源紧张,此时阻塞等待
    // 如果是请求ip对应的连接数超过最大限制,为了防止客户端滥用,此时会抛异常
    connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
    // 设置为非阻塞I/O
    socketChannel.configureBlocking(false)
    // 禁用TCP Nagle算法,提高数据传输实时性
    socketChannel.socket().setTcpNoDelay(true)
    // 启用TCP长连接
    socketChannel.socket().setKeepAlive(true)
    // 设置TCP发送缓冲区大小
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
    // 返回该连接
    Some(socketChannel)
  } catch {
    case e: TooManyConnectionsException =>
      info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
      // 关闭连接
      close(endPoint.listenerName, socketChannel)
      None
  }
}

这段代码中的 nioSelector.selectedKeys() 还有 iter.remove() 等,这些都只是 NIO Selector 的模板代码,建议把上面推荐的 Java NIO 入门教程看一遍,就很好懂了。这段代码还有几个值得讲一下的点:

首先是 startupComplete 以及 shutdownComplete (关于 CountDownLatch 默认读者已经知道是什么东西,如不会请自行查阅用法),这两个方法实现线程的优雅启动与关闭,优雅启动和关闭在服务端开发中十分常见。所谓“优雅”其实很好理解,我们脱离代码,将“优雅”抽象成一种通用的解释:想象有两个线程,一个主线程一个子线程,主线程可能会与子线程通信交互。主线程启动子线程后,需要等待子线程初始化完成后才能与其通信,因此这里的“等待子线程完成初始化”就是一种“优雅启动”,想象一下如果没有优雅启动,万一子线程初始化资源时失败并退出了,而主线程毫无感知并且直接尝试与子线程进行通信,很有可能出现意想不到的 bug。同理,“优雅关闭”是指等待子线程完成资源的回收工作再进行关闭。另外,光从线程角度来看,说白了不过是一种线程同步方式而已。综上,代码非常简单就不多说了。

其次,assignNewConnection 是将连接放到 processor 中的内部队列等待处理,如果队列满了就尝试下一个 processor。如果前 n-1 个 processor 的队列都满了,就阻塞在最后一个 processor 上,直到能放进去为止。

最后,客户端 IP 对应的连接数超过最大限制时,为了防止客户端滥用,会直接抛异常,拒绝连接。

Processor

通过上面的分析,我们知道 Acceptor 用于接收连接建立请求。本节介绍的 Processor 则是负责创建连接、分发请求还有发送响应的组件,Processor 干了非常多事情。首先来看一下 Processor 的类定义:

private[kafka] class Processor(
  val id: Int,
  time: Time,
  // 单个请求的最大字节数
  maxRequestSize: Int,
  requestChannel: RequestChannel,
  // 控制最大连接数
  connectionQuotas: ConnectionQuotas,
  // 连接最大空闲时间
  connectionsMaxIdleMs: Long,
  // 认证失败后的延迟时间,防止暴力破解攻击
  failedAuthenticationDelayMs: Int,
  // 标识此处理器属于哪个网络监听器
  listenerName: ListenerName,
  // 安全协议类型
  securityProtocol: SecurityProtocol,
  // 包含broker的各种配置参数
  config: KafkaConfig,
  metrics: Metrics,
  // 管理认证相关的凭证信息
  credentialProvider: CredentialProvider,
  // 内存池,用于高效管理请求和响应的内存分配
  memoryPool: MemoryPool,
  // 提供统一的日志记录功能
  logContext: LogContext,
  connectionQueueSize: Int = ConnectionQueueSize
) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  // 待处理连接队列
  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  // 正在发送的响应
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  // 待处理响应队列
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
  
  // kafka对NIO Selector的封装
  private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time,
      logContext))
}

可以看到 Processor 大部分的成员都比较好理解,但是需要注意以下几个成员:

  • selector,这个是 kafka 对 Java NIO Selector 的二次封装,主要是封装了网络 I/O 事件的处理、安全协议支持等。为了与 Java NIO Selector 进行区分,我们称其为 KSelector。由于 KSelector 比较复杂,另起了一篇文章进行剖析,详见kafka源码阅读(7)-KSelector。总之,我们知道,Acceptor 和 Processor 都会拥有自己的 selector。
  • inflightResponses 不同于 responseQueue,后者是为了排队这些待发送的响应,而有些 Response 回调函数需要在响应发送给客户端之后才触发,因此额外需要一个 inflightResponses 来暂存正在发送的响应,以便发送成功后执行回调逻辑。

接下来我们继续分析 Processor 线程的方法。Processor 的众多方法都是由 run 方法展开调用的,因此我们直接分析 run 方法:

override def run(): Unit = {
  // Processor优雅启动完毕
  startupComplete()
  try {
    while (isRunning) {
      try {
        // 处理newConnection队列中的新连接
        configureNewConnections()
        // 处理responseQueue队列中的响应,发送给客户端
        processNewResponses()
        // 基于I/O多路复用,获取并处理就绪的I/O事件
        poll()
        // 处理客户端请求
        processCompletedReceives()
        // 处理已完成的响应
        processCompletedSends()
        // 处理因发送失败导致的客户端连接断开
        processDisconnected()
        // 检查并关闭超出配额的连接
        closeExcessConnections()
      } catch {
        case e: Throwable => processException("Processor got uncaught exception.", e)
      }
    }
  } finally {
    debug(s"Closing selector - processor $id")
    // 关闭底层资源
    CoreUtils.swallow(closeAll(), this, Level.ERROR)
    // Processor优雅关闭完毕
    shutdownComplete()
  }
}

configureNewConnections

Processorrun 定义了一套网络 I/O 的处理流程,流程中的各个步骤封装在相应方法中,首先是 configureNewConnections,这个方法主要是通过 kselector 注册连接的读 I/O 事件:

private def configureNewConnections(): Unit = {
  var connectionsProcessed = 0
  // 由于Acceptor会一直accept新连接,因此需要限制每次处理连接的数量为connectionQueueSize
  while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
    val channel = newConnections.poll()
    try {
      debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
      // 将该新连接交由kselector处理,实际上是注册OP_READ事件
      // 每个连接在kselector中使用ID唯一标识(TCP四元组+唯一自增序号)
      selector.register(connectionId(channel.socket), channel)
      connectionsProcessed += 1
    } catch {
      case e: Throwable =>
        val remoteAddress = channel.socket.getRemoteSocketAddress
        // need to close the channel here to avoid a socket leak.
        close(listenerName, channel)
        processException(s"Processor $id closed connection from $remoteAddress", e)
    }
  }
}

processNewResponses

processNewResponses 用于处理响应队列中的响应:

private def processNewResponses(): Unit = {
  var currentResponse: RequestChannel.Response = null
  while ({currentResponse = dequeueResponse(); currentResponse != null}) {
    val channelId = currentResponse.request.context.connectionId
    try {
      currentResponse match {
        case response: NoOpResponse =>
          updateRequestMetrics(response)
          trace(s"Socket server received empty response to send, registering for read: $response")
          // 无需发送响应给客户端,因此需要取消通道静默,读取更多请求到socket buffer中
          // 实际上会注册OP_READ事件
          handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
          tryUnmuteChannel(channelId)

        case response: SendResponse =>
          // 发送响应,实际上是注册OP_WRITE事件
          sendResponse(response, response.responseSend)
        case response: CloseConnectionResponse =>
          // 关闭连接
          updateRequestMetrics(response)
          trace("Closing socket connection actively according to the response code.")
          close(channelId)
        case _: StartThrottlingResponse =>
          // 开始对连接限流
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
        case _: EndThrottlingResponse =>
          // 结束对连接限流
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
          tryUnmuteChannel(channelId)
        case _ =>
          throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
      }
    } catch {
      case e: Throwable =>
        processChannelException(channelId, s"Exception while processing response for $channelId", e)
    }
  }
}

可见 processNewResponses 只是处理了暂存在队列中的响应,这些响应最初其实是由 KafkaApi 完成对请求的处理之后生成响应存到队列中的。在 processNewResponses 中的几种响应类型在上一篇博客中已经说过,不再多说。另外这里出现了一些陌生的概念,比如 通道 KafkaChannel、静默 mute 等,通道用来封装一个网络连接,隐藏了底层的网络通信细节,而静默和限流都是通道提供的功能,静默期间,服务器不会从该通道读取更多请求,但会继续处理已接收的请求。这些概念目前知道大概的意思就够了。(实际上只有静默与非静默会实际产生作用,并没有单独的限流功能,这点在下一篇中会提及)

poll

poll 用于获取就绪的 I/O 事件并处理。这个方法只有两行,复杂度都集中在 kselector 的 poll 方法上,暂时先跳过:

private def poll(): Unit = {
  val pollTimeout = if (newConnections.isEmpty) 300 else 0
  try selector.poll(pollTimeout)
  catch {
    case e @ (_: IllegalStateException | _: IOException) =>
      error(s"Processor $id poll failed", e)
  }
}

processCompletedReceives

processCompletedReceives 用于处理经过 poll 读取到的网络请求,将其包装成 Request 发送给 RequestChannel 等待处理:

private def processCompletedReceives(): Unit = {
  // 遍历kselector中已完成的接收,这些接收是在poll中读取的
  selector.completedReceives.asScala.foreach { receive =>
    try {
      openOrClosingChannel(receive.source) match {
        case Some(channel) =>
          // 获取header
          val header = RequestHeader.parse(receive.payload)
          if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
        		// 刷新身份认证
            trace(s"Begin re-authentication: $channel")
          else {
            val nowNanos = time.nanoseconds()
            if (channel.serverAuthenticationSessionExpired(nowNanos)) {
              // 如果会话的身份认证已经过期,则断开连接
              debug(s"Disconnecting expired channel: $channel : $header")
              close(channel.id)
              expiredConnectionsKilledCount.record(null, 1, 0)
            } else {
              val connectionId = receive.source
              // 构建Request
              val context = new RequestContext(header, connectionId, channel.socketAddress,
                channel.principal, listenerName, securityProtocol,
                channel.channelMetadataRegistry.clientInformation)
              val req = new RequestChannel.Request(processor = id, context = context,
                startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
              // KIP-511: 收集ApiVersionsRequest中的客户端名称以及版本,以增强运维对客户端的洞察
              if (header.apiKey == ApiKeys.API_VERSIONS) {
                val apiVersionsRequest = req.body[ApiVersionsRequest]
                if (apiVersionsRequest.isValid) {
                  channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
                    apiVersionsRequest.data.clientSoftwareName,
                    apiVersionsRequest.data.clientSoftwareVersion))
                }
              }
              // 将Request发送到RequestChannel待处理
              requestChannel.sendRequest(req)
              // 将通道静默,等待请求处理完毕后再继续接收新的请求,底层实现是注销OP_READ
              selector.mute(connectionId)
              handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
            }
          }
        case None =>
          throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
      }
    } catch {
      case e: Throwable =>
        processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
    }
  }
  // 清除这些已经处理的请求
  selector.clearCompletedReceives()
}

需要注意的是,每个请求发送到 RequestChannel 之后,会 mute 请求所在的通道,以保证同一客户端的请求之间的处理顺序,如果不 mute 的话,后续请求也会被放到请求队列中,最终被 KafkaRequestHandler 并行处理,这将破坏请求的处理顺序。

processCompletedSends

processCompletedSends 这个方法用于处理已经完成发送的响应:

private def processCompletedSends(): Unit = {
  // 遍历kselector中已经完成的发送,
  selector.completedSends.asScala.foreach { send =>
    try {
      // 因为响应已经发送完毕,因此将该响应从inflightResponses中移除
      val response = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      updateRequestMetrics(response)

      // 执行响应完成后的回调逻辑
      response.onComplete.foreach(onComplete => onComplete(send))

      // 通道取消静默
      handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
      tryUnmuteChannel(send.destination)
    } catch {
      case e: Throwable => processChannelException(send.destination,
        s"Exception while processing completed send to ${send.destination}", e)
    }
  }
  selector.clearCompletedSends()
}

可以看到,与 processCompletedReceives 中收到请求时对通道静默相对应,响应完成发送之后需要取消对通道的静默。

processDisconnected

processDisconnected 方法用于处理断开的连接:

private def processDisconnected(): Unit = {
  // 遍历kselector中所有已经断开的连接
  selector.disconnected.keySet.asScala.foreach { connectionId =>
    try {
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      // 断开连接中正在发送的响应也从inflightResponses移除
      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
      // 更新连接配额
      connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
    } catch {
      case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
    }
  }
}

closeExcessConnections

closeExcessConnections 方法用于当总连接数超出配额时强行关闭其中一个连接。优先关闭正处于关闭状态的连接,其次是空闲时间最长的连接,如果都没有的话,关闭任意一个连接。所谓“正处于关闭状态的连接”,是一种优雅关闭,目的是处理该连接上已接收但还未处理完毕的请求,而强行关闭就是将这些连接直接关闭,不再等这些请求处理完毕。

private def closeExcessConnections(): Unit = {
  // 检查总连接数是否超出配额
  if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
    // 获取应该被关闭的连接
    val channel = selector.lowestPriorityChannel()
    if (channel != null)
    	// 关闭连接
      close(channel.id)
  }
}

private def close(connectionId: String): Unit = {
  openOrClosingChannel(connectionId).foreach { channel =>
    debug(s"Closing selector connection $connectionId")
    val address = channel.socketAddress
    // 更新连接配额
    if (address != null)
      connectionQuotas.dec(listenerName, address)
    // 强行关闭连接
    selector.close(connectionId)

    // 将连接上发送中的响应从inflightResponses移除
    inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response))
  }
}

总结

本篇我们深入解析了 Kafka 网络通信的关键组件 —— SocketServer,重点关注了其中的 Acceptor 与 Processor 两大核心线程。可以看到,Kafka 采用了典型的 Reactor 模式,通过 Acceptor 线程专职接收连接请求、将连接分发至 Processor,而 Processor 再负责网络读写、请求转发、响应发送等复杂逻辑。

值得关注的是,为了简化 Processor 对连接的处理逻辑,kafka 将 NIO Selector 抽取出来封装到 KSelector 以提供复杂的 I/O 操作、将 SocketChannel 封装到 KafkaChannel 中实现底层传输逻辑以及维护通道的状态,这些内容我们在下一篇kafka源码阅读(7)-KSelector中进行分析。

参考

Kafka源码阅读01: 网络层阅读之服务器的启动 - BewareMyPower的博客

极客时间《Kafka核心源码解读》——胡夕

TCP选项之SO_RCVBUF和SO_SNDBUF

Java NIO