Note基于开源 kafka 2.5 版本。
如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。
我们知道 Java NIO 中的三大组件分别是 Channel、Buffer 以及 Selector,而 kafka 在网络层中对它们进行了进一步的封装,以便向上层组件提供更加直观和方便的网络 I/O 操作,具体对应的封装类如下:
Tip
TransportLayer
:它是一个接口,封装了底层 NIO 的 SocketChannel。NetworkReceive
:封装了 NIO 的 ByteBuffer 中的读 Buffer,对网络编程中的粘包、拆包经典实现。NetworkSend
:封装了 NIO 的 ByteBuffer 中的写 Buffer。KafkaChannel
:对 TransportLayer、NetworkReceive、NetworkSend 进一步封装,屏蔽了底层的实现细节,对上层更友好。KSelector
:封装了 NIO 的 Selector 多路复用器组件。
总览
在上篇中我们分析了前四个类,他们代表了 Java NIO 中的 Channel 和 Buffer,这篇我们继续分析 KSelector
(类名实际上是与 NIO 同名的 Selector
,为了进行区分我们将其称为 KSelector
),其代表了 NIO 中的 Selector。这个类和 KafkaChannel 一样,也是服务端和客户端共用的类:客户端通过 connect
方法连接服务端,服务端用 register
方法注册接收到的服务端 socket。
我们接下来重点还是以服务端的视角来进行讲述,最后一小节再简单过下客户端相关的代码。
类定义
按照惯例先看下类的一些核心成员:
public class Selector implements Selectable, AutoCloseable {
// 基础组件
private final java.nio.channels.Selector nioSelector; // Selector
private final ChannelBuilder channelBuilder; // KafkaChannel Builder
// 通道管理相关
private final Map<String, KafkaChannel> channels; // 所有注册到本Selector的通道
private final Set<KafkaChannel> explicitlyMutedChannels; // 被显式静默的通道
private final Map<String, KafkaChannel> closingChannels; // 正在关闭的通道
// 事件和状态追踪相关
private final List<Send> completedSends; // 已完成的发送
private final LinkedHashMap<String, NetworkReceive> completedReceives; // 已完成的接收
private final Set<SelectionKey> immediatelyConnectedKeys; // 立即连接成功的key
private final List<String> connected; // 已建立连接的通道
private final List<String> failedSends; // 发送失败的通道
private final Map<String, ChannelState> disconnected; // 已断开连接的通道
private boolean madeReadProgressLastPoll = true; // 上次poll之后是否在读数据方面有进展
// 内存管理相关
private final MemoryPool memoryPool; // 内存池
private final long lowMemThreshold; // 内存不足阈值(固定为10% * 内存池大小)
private boolean outOfMemory; // 是否内存不足
private Set<SelectionKey> keysWithBufferedRead; // 有缓冲数据待读的通道
// 参数配置相关
private final int maxReceiveSize; // 允许接收的最大消息字节数
private final boolean recordTimePerConnection; // 是否记录每个连接的时间
private final int failedAuthenticationDelayMs; // 认证失败后的延迟关闭连接时间
// 其它
private final SelectorMetrics sensors; // 监控指标
private final IdleExpiryManager idleExpiryManager; // // 空闲连接管理器
private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels; // 延迟关闭连接
}
对其中几个成员稍微解释下,有些太细节的东西可以直接忽略:
disconnected
:帮助上层应用感知连接的发送失败,以便采取相应的错误处理措施immediatelyConnectedKeys
:SocketChannel 被配置为非阻塞的,一般 connect 后不会马上返回成功,只能等后续 OP_CONNECT 触发才是连接成功并进行下一步处理。但在某些情况下(特别是本地连接)connect 可能会直接返回成功,这时候不会触发 OP_CONNECT,因此需要记录这些 SocketChannel 便于后续处理。madeReadProgressLastPoll
:在 poll 方法中,当madeReadProgressLastCall && dataInBuffers
满足时,timeout 设置为 0,即 poll 不会阻塞等待事件的发生。综合来看,应该是因为如果上次 poll 没有进展的话,可能是因为内存不足等原因,这次 poll 很可能也没进展,因此就没必要 timeout=0 导致频繁的无效 poll;反之如果上次 poll 有进展,并且 dataInBuffers 满足的话,就让 poll 快速返回然后去处理缓冲区的数据以降低内存占用,但 dataInBuffers 不满足的话,也就没必要设置 timeout=0 急着让 poll 快速返回。keysWithBufferedRead
:上一篇有提到,SslTransportLayer
有内部缓冲区,因此可能出现这样的情况:一次性从 socket 读取了多条消息到内部缓冲区,并且后续 socket 没有更多数据可读,即 OP_READ 不再触发,因此要记录这些通道并主动地去读取缓冲区剩余的消息。
下面来解析各个服务端相关的核心方法。
register
register
在之前解析 SocketServer 的时候就有见过,Processor 会将新建立的连接注册到 KSelector 上:
public void register(String id, SocketChannel socketChannel) throws IOException {
ensureNotRegistered(id);
// 注册OP_READ,创建KafkaChannel
registerChannel(id, socketChannel, SelectionKey.OP_READ);
// ...
}
protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
// SocketChannel注册到Selector
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
// 创建KafkaChannel
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
// 保存KafkaChannel
this.channels.put(id, channel);
// // 更新连接的访问时间
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), time.nanoseconds());
return key;
}
// 省略try-catch
private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
// 创建KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool,
new SelectorChannelMetadataRegistry());
// KafkaChannel绑定到key上,方便后续根据key取出KafkaChannel
key.attach(channel);
return channel;
}
send
send
方法在 SocketServer 的 Processor 中,当 Response 类型是 SendResponse 的时候,将会调用该方法,将待发送数据保存到 KafkaChannel 并注册 OP_WRITE 等待可写事件发生时再进行写入。
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// 如果通道已关闭,记录为一次失败的发送
// 根据failedSends,后续会将这些通道的关闭原因设置为FAILED_SEND,交给上层采取相应的错误处理措施
this.failedSends.add(connectionId);
} else {
try {
// 将send保存到KafkaChannel,并注册OP_WRITE
channel.setSend(send);
} catch (Exception e) {
// 发生异常,强行关闭通道
channel.state(ChannelState.FAILED_SEND);
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e);
throw e;
}
}
}
}
小细节:这里关闭时传入的 DISCARD_NO_NOTIFY 是指不把断连的通道保存到 disconnected
中,因为 failedSends
已经记录了该通道,在下一次 poll 的时候会根据 failedSends
将这些通道保存到 disconnected
中,避免重复保存。
poll
poll
方法是 KSelector 最核心的方法了。它会调用 Selector 的 poll 方法等待 I/O 事件就绪。当 OP_CONNECT 就绪,会将记录相关通道;当 OP_WRITE 就绪,会调用 KafkaChannel.write 将之前保存的 send 发送到网络;当 OP_READ 就绪,会调用 KafkaChannel.read 从网络读取数据。这个方法返回后,上层应用可以进一步通过 completedSends(), completedReceives(), connected(), disconnected() 这四个方法来处理结果。
另外这个方法源码中的注释也提到了之前说的请求处理顺序,poll 保证了在每次调用后,每个通道只有一个请求保存在 completedReceives
待处理,保证请求的发送的顺序与处理顺序相同。
public void poll(long timeout) throws IOException {
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
// 清除上次poll的结果
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
// 如果有客户端立刻连接成功的socket,或者存在缓冲区有未读数据的通道
// 则设置poll的timeout为0,即不阻塞等待I/O事件触发
if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
// 将之前由于内存压力而静默的通道取消静默
if (!memoryPool.isOutOfMemory() && outOfMemory) {
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.maybeUnmute();
}
}
outOfMemory = false;
}
long startSelect = time.nanoseconds();
// 这里真正select
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
// 检查本次poll是否有东西要处理,包括
// 1. 就绪的I/O事件
// 2. 建立连接直接成功的通道
// 3. 缓冲区非空的通道
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
// 获取就绪的I/O事件
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// 处理缓冲区非空的通道
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys);
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>();
pollSelectionKeys(toPoll, false, endSelect);
}
// 处理就绪的I/O事件
pollSelectionKeys(readyKeys, false, endSelect);
readyKeys.clear();
// 处理直接建立成功的连接
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// 处理延迟关闭的连接
completeDelayedChannelClose(endIo);
// 处理超时的空闲连接
maybeCloseOldestConnection(endSelect);
}
再重复提一嘴,所谓“缓冲区非空的通道”是因为启用了 SSL 的 socket 数据要加密传输,因此通道的缓冲区中可能有未完全加密/解密到的数据。而 I/O 事件准备就绪的通道则可以看成是数据在 socket 的操作系统缓冲区中。前者不会触发 I/O 事件因此需要单独处理。同样地,在 connect 时就直接建立成功的连接也不会触发 I/O 事件,因此也需要单独处理。尽管存在单独处理的通道,但实际上都是调用 pollSelectionKeys
来处理,这体现了将 Channel 和 Buffer 的底层读写等操作统一封装在 KafkaChannel
的好处。
最后,延迟关闭以及超时空闲这些并不核心的操作都放在了每次 poll 调用的最后才去处理。
pollSelectionKeys
这个方法用来处理上述的三种就绪通道,进行真正的 socket 级别的读写操作。传入的是 SelectionKey,通过这个 key 可以拿到对应的 SocketChannel 以及 KafkaChannel。
void pollSelectionKeys(
// 就绪的key集合,通过key可以拿到就绪的I/O事件、SocketChannel以及KafkaChannel
Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos
) {
// 遍历keys时可能会打乱遍历的顺序
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
boolean sendFailed = false;
String nodeId = channel.id();
sensors.maybeRegisterConnectionMetrics(nodeId);
// 更新连接的访问时间
if (idleExpiryManager != null)
idleExpiryManager.update(nodeId, currentTimeNanos);
try {
// 处理OP_CONNECT(只是将连接记录到connected)
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(nodeId);
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
nodeId);
} else {
continue;
}
}
// 处理已建立但未ready的连接(即未握手和认证)
if (channel.isConnected() && !channel.ready()) {
// 进行握手和认证
channel.prepare();
// 记录一些监控指标
if (channel.ready()) {
long readyTimeMs = time.milliseconds();
boolean isReauthentication = channel.successfulAuthentications() > 1;
if (isReauthentication) {
sensors.successfulReauthentication.record(1.0, readyTimeMs);
if (channel.reauthenticationLatencyMs() == null)
log.warn(
"Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");
else
sensors.reauthenticationLatency
.record(channel.reauthenticationLatencyMs().doubleValue(), readyTimeMs);
} else {
sensors.successfulAuthentication.record(1.0, readyTimeMs);
if (!channel.connectedClientSupportsReauthentication())
sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);
}
log.debug("Successfully {}authenticated with {}", isReauthentication ?
"re-" : "", channel.socketDescription());
}
}
if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
channel.state(ChannelState.READY);
// 对于需要认证的连接,一些客户端请求可能在认证完成后发送,但是在响应发送前认证失效并且需要重新认证
// 在重新认证的过程中可能会有一些服务端响应到达,会先将这些响应缓存起来
// 再用这个方法取出,添加到complectedReceives
Optional<NetworkReceive> responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
responseReceivedDuringReauthentication.ifPresent(receive -> {
long currentTimeMs = time.milliseconds();
addToCompletedReceives(channel, receive, currentTimeMs);
});
// 处理OP_READ,从socket读取数据,或者从内部缓冲区读取数据
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
attemptRead(channel);
}
if (channel.hasBytesBuffered()) {
// 将内部缓冲区还有数据的通道加入keysWithBufferedRead,下次poll时处理
keysWithBufferedRead.add(key);
}
long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
// 处理OP_WRITE
try {
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
// 发生异常,打日志、监控指标,最后关闭channel
String desc = channel.socketDescription();
if (e instanceof IOException) {
log.debug("Connection with {} disconnected", desc, e);
} else if (e instanceof AuthenticationException) {
boolean isReauthentication = channel.successfulAuthentications() > 0;
if (isReauthentication)
sensors.failedReauthentication.record();
else
sensors.failedAuthentication.record();
String exceptionMessage = e.getMessage();
if (e instanceof DelayedResponseAuthenticationException)
exceptionMessage = e.getCause().getMessage();
log.info("Failed {}authentication with {} ({})", isReauthentication ? "re-" : "",
desc, exceptionMessage);
} else {
log.warn("Unexpected error from {}; closing connection", desc, e);
}
if (e instanceof DelayedResponseAuthenticationException)
// 延迟关闭
maybeDelayCloseOnAuthenticationFailure(channel);
else
// 直接关闭/优雅关闭
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
有了前面细节知识的铺垫,比如握手/认证、内部缓冲区等,这块代码虽然看着很长但是并不难理解。需要注意的是,传进来的 key 可能每次都是一样顺序,为了保证在内存池内存较低时后面的 key 在读操作方面不被饿死(因为从网络读数据是需要从内存池分配内存的),将会打乱这些 key 的遍历顺序。
attemptWrite / attemptRead
讲解完这套核心的 poll 流程后,我们再来补充下 poll 中调用的读写方法,把我们上一篇的知识串了起来。首先是写操作:
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
write(channel);
}
}
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
// 写操作
long bytesSent = channel.write();
Send send = channel.maybeCompleteSend();
// 这里的判断条件要注意,由于加密通道有内部缓冲区,因此bytesSent只是说从send发了多少数据到内部缓冲区中,并不是真正发送到socket的数据量
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
// 因此send中的所有数据真正发到了socket只能由channel.maybeCompleteSend()来判断
if (send != null) {
// 已加入完成发送列表
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
}
前面也数次提到过,注意一下加密通道的内部缓冲区就行。
然后是读操作:
private void attemptRead(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
// 读操作
long bytesReceived = channel.read();
if (bytesReceived != 0) {
long currentTimeMs = time.milliseconds();
sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
madeReadProgressLastPoll = true;
NetworkReceive receive = channel.maybeCompleteReceive();
if (receive != null) {
// 加入已完成接收列表
addToCompletedReceives(channel, receive, currentTimeMs);
}
}
if (channel.isMuted()) {
// 这里channel由于内存池内存不足而主动静默
outOfMemory = true;
} else {
madeReadProgressLastPoll = true;
}
}
补充
上面是以服务端的视角去讲解的 KSelector 中几个核心方法,即多路复用以及具体的网络读写。本小节再来补充一下其它次重要的方法。
connect
显而易见 connect
用于客户端向服务端发起网络连接。然后服务端这边就是之前分析过的 Acceptor 检测到后会交给 Processor,随后注册到 KSelector 上…
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
// 创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
// 配置SocketChannel
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
// 发起对服务端的连接
boolean connected = doConnect(socketChannel, address);
// 创建KafkaChannel
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// 直接建立成功的连接不会触发OP_CONNECT,需要主动保存下来后续处理
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}
private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize) throws IOException {
// 设置SocketChannel为非阻塞(NIO)
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
// TCP Keepalive
socket.setKeepAlive(true);
// TCP发送缓冲区的大小(outbound)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
// TCP接收缓冲区的大小(inbound)
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
// 禁用TCP Nagle算法,提高数据传输实时性
socket.setTcpNoDelay(true);
}
close
KSelector 有好几个 close 方法,这里说的是以特定模式关闭通道的 close 方法,其它 close 方法最终都会调用这个 close:
private void close(KafkaChannel channel, CloseMode closeMode) {
// 底层会注销key,相当于selector不会再触发该key的任何事件
channel.disconnect();
connected.remove(channel.id());
if (closeMode == CloseMode.GRACEFUL && maybeReadFromClosingChannel(channel)) {
// 优雅关闭连接
closingChannels.put(channel.id(), channel);
log.debug("Tracking closing connection {} to process outstanding requests", channel.id());
} else {
// 强行关闭连接
doClose(channel, closeMode.notifyDisconnect);
}
this.channels.remove(channel.id());
if (delayedClosingChannels != null)
delayedClosingChannels.remove(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.remove(channel.id());
}
// 从优雅关闭的通道中读取请求,并告知上层该通道是否有待处理的请求
private boolean maybeReadFromClosingChannel(KafkaChannel channel) {
boolean hasPending;
if (channel.state().state() != ChannelState.State.READY)
hasPending = false;
else if (explicitlyMutedChannels.contains(channel) || hasCompletedReceive(channel))
// 有待处理的请求(如果通道静默的话,可能只是因为内存不足而静默,要等其解除静默后再次检查)
hasPending = true;
else {
// 没有待处理的请求,那么尝试读取
try {
attemptRead(channel);
// 注意这里的hasPending的值
hasPending = hasCompletedReceive(channel);
} catch (Exception e) {
log.trace("Read from closing channel failed, ignoring exception", e);
hasPending = false;
}
}
return hasPending;
}
所谓优雅关闭的概念之前已经说过很多次,但不同的优雅关闭又可能会有着具体的差别。这里的优雅关闭通道指的是先处理完已经接收到的请求再关闭连接,但是在这个过程中,不会再触发任何的 OP_READ(因为key已经被取消)。
另外注意到一个细节,在从优雅关闭连接中读取更多更多请求的过程中,如果读不到一个完整的请求的话,也会被视为后续没有更多的请求,从而被直接关闭连接,在网络的对方节点看来,就是数据发一半然后发现连接被关闭了。
在 close 方法中如果 maybeReadFromClosingChannel 认为通道后续还有待接收/处理的请求,将优雅关闭的通道记录到 closingChannels
。随后在每次 poll 开头调用的 clear 方法中,除了清理上一次 poll 的结果外,还会处理这些通道,继续尝试读取更多的请求或者关闭连接:
private void clear() {
// ...
// 继续处理优雅关闭的连接
for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
KafkaChannel channel = it.next().getValue();
boolean sendFailed = failedSends.remove(channel.id());
boolean hasPending = false;
if (!sendFailed)
hasPending = maybeReadFromClosingChannel(channel);
if (!hasPending || sendFailed) {
doClose(channel, true);
it.remove();
}
}
// ...
}
总结
本篇续上篇介绍了在 kafka 中封装了 Java NIO Selector 的 KSelector 类,包括 socket 注册与通道创建、多路复用轮询、I/O 事件处理以及优雅关闭连接等内容,并且针对 Java NIO、TCP 加密传输的特点进行了一些特殊处理,在此就不再赘述,都在上面文中体现了。
总而言之,这个类为服务端和客户端的上层组件屏蔽了许多 NIO 麻烦的处理细节,在 ServerSocket 中我们可以看出,上层组件基本上只需要「注册、轮询、处理封装好的结果」三部曲,十分方便。