Note基于开源 kafka 2.5 版本。
如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。
我们知道 Java NIO 中的三大组件分别是 Channel、Buffer 以及 Selector,而 kafka 在网络层中对它们进行了进一步的封装,以便向上层组件提供更加直观和方便的网络 I/O 操作,具体对应的封装类如下:
Tip
TransportLayer
:它是一个接口,封装了底层 NIO 的 SocketChannel。NetworkReceive
:封装了 NIO 的 ByteBuffer 中的读 Buffer,对网络编程中的粘包、拆包经典实现。NetworkSend
:封装了 NIO 的 ByteBuffer 中的写 Buffer。KafkaChannel
:对 TransportLayer、NetworkReceive、NetworkSend 进一步封装,屏蔽了底层的实现细节,对上层更友好。KSelector
:封装了 NIO 的 Selector 多路复用器组件。
这些类在上一篇对 Acceptor
和 Processor
中多少有接触过,但为了避免过于深入细节而失去对 Acceptor
和 Processor
全局的把握,因此并没有深入分析这些类。本篇自底向上地去分析对这些类展开深入分析,即先分析被依赖的类。
TransportLayer
TransportLayer
接口用于通信,底层封装了 SocketChannel
。这个接口继承了 Java NIO 中的 ScatteringByteChannel
和 GatheringByteChannel
这两个接口,这说明 TransportLayer
除了提供最基本的读写能力外,还支持高级 I/O 中的散布读(scatter read)以及聚集写(gather write)能力,仅通过一次(系统)调用就能读写多个非连续缓冲区,在处理网络协议时,就可以将 header 和 body 保存在不同缓冲区中集中地进行散布读和聚集写,提高效率。
TransportLayer
有两个实现类,分别是用于明文传输的 PlaintextTransportLayer
以及用于 SSL 加密传输的 SslTransportLayer
。由于本篇的目的不是身份认证,因此只分析明文传输实现类。看下类定义:
public class PlaintextTransportLayer implements TransportLayer {
// NIO中的事件key
private final SelectionKey key;
// key对应的socket
private final SocketChannel socketChannel;
// 客户端身份,由于明文传输不需身份认证,因此是匿名身份
private final Principal principal = KafkaPrincipal.ANONYMOUS;
}
这里 key
成员是在 SocketServer
的 configureNewConnections
方法调用的时候传入的,调用路径是:
SocketServer.configureNewConnections -> KSelector.register -> KSelector.registerChannel(注册SocketChannel到Selector上得到key) -> KSelector.buildAndAttachKafkaChannel(创建KafkaChannel,并将其作为key作为attachment,便于后续根据key反向获取KafkaChannel)
KafkaChannel
的方法基本上就只是简单调用了 SocketChannel
的同名方法,比如:
public int read(ByteBuffer dst) throws IOException {
return socketChannel.read(dst);
}
public long write(ByteBuffer[] srcs) throws IOException {
return socketChannel.write(srcs);
}
需要注意的只有这个 finishConnect
方法,由于 SocketChannel
被设置成非阻塞的,因此外界需要通过 finishConnect
方法检查连接是否已经成功,连接成功后,注册 OP_READ
事件等待从 socket 读取对端发来的数据:
public boolean finishConnect() throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected)
// 注销OP_CONNECT,注册OP_READ
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
相比之下,SslTransportLayer
的实现就复杂许多,因为涉及到数据加密操作,不再是简单地调用 SocketChannel
同名方法。比如在 transferFrom
方法的实现中,PlainTransportLayer
只是调用了 SocketChannel.transferTo
,让操作系统去优化写入,而 SslTransportLayer
还需要申请堆外内存缓冲来存放文件的数据,加密之后再写入对端网络节点,感兴趣可以的读者自行查看源码。
另外还有个 hasPendingWrites 方法,在 PlaintextTransportLayer 中固定返回 false,而 SslTranportLayer 中返回 netWriteBuffer.hasRemaining()。这同样是因为加密的原因,调用 SslTransportLayer.write 时,数据首先会加密并保存在 netWriteBuffer 中,然后再写入 SocketChannel,因此 SslTransportLayer 相当于多了个内部缓冲 netWriteBuffer, hasPendingWrites 就是用于检查内部缓冲的数据是否已经全部写入 SocketChannel。因此如下图所示, Ssl 和 Plaintext 的很大一个区别就是是否有内部缓冲区,这一点要记住,因为会影响到后面的源码分析。

NetworkReceive
NetworkReceive
封装了 NIO 中的读 Buffer,核心是解决 TCP 的粘包拆包问题。我们在应用层协议上大多以“包”为单位进行传输,而面向字节流的 TCP 并不知道“包”的存在。数据通过 TCP 发送时,如果数据量没达到 TCP 缓冲区大小,TCP 可能会将多个请求合并成同一个请求进行发送,这就形成了所谓的粘包问题。而如果一个包过大的话,TCP 又会将其拆分为多次发送,形成拆包问题。说白了其实就是 TCP 是面向字节流的,每次发送或者接收的到的 TCP 报文并不与应用层的包一一对应,因此我们需要在应用层从字节流中识别并组装成一个完整的包,而 TCP 的字节流承诺有序和可靠性,因此应用层只需聚焦于如何识别出一个包的边界。
常见的定义边界方法有固定包大小、特殊分隔符、消息头定义包大小。kafka 使用的是第三种,即包由 4 个字节的消息体大小以及消息体组成,如图所示:

了解完背景知识后,回到我们的代码中,先看一下 NetworkReceive
类的定义:
public class NetworkReceive implements Receive {
// 连接id,即下一小节中的KafkaChannel.id
private final String source;
// 用于接收消息体大小的缓冲区,固定4字节
private final ByteBuffer size;
// 消息最大大小
private final int maxSize;
// 内存池
private final MemoryPool memoryPool;
// 记录从size缓冲区中解析出的消息体大小
private int requestedBufferSize = -1;
// 用于接收消息体的缓冲区
private ByteBuffer buffer;
}
这个类的成员比较简单,不再解释。核心方法只有 readFrom
,从 Channel
中读取一条消息并存到 Buffer
中,我们之前说的消息边界识别就是在这里进行的:
public long readFrom(ScatteringByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
// 读取size
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
// size读取完毕,将其转换成整数
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
// 根据size分配消息体的缓冲区
if (buffer == null && requestedBufferSize != -1) {
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
// 从Channel读入消息体
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
需要注意的是,由于拆包问题,可能需要多次 readFrom
的调用才能读完整个包。最后外界通过 complete
方法轮询是否已经完成接收,payload
方法获取接收的消息缓冲:
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
public ByteBuffer payload() {
return this.buffer;
}
NetworkSend
NetworkSend
也很简单,这个类继承自 ByteBufferSend
,实现了 Send
接口。主要就是通过 writeTo
方法将 Buffer
中的数据写入 Channel
,并通过 completed
方法检查是否已经全部写入完毕,这两个方法都是 ByteBufferSend
中的。
public class ByteBufferSend implements Send {
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
// 只有SslTransportLayer(加密传输)才可能pending
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
public boolean completed() {
return remaining <= 0 && !pending;
}
}
可以看到 ByteBufferSend
包装的是若干个 Buffer
。NetworkSend
只需要两个 Buffer
,分别对应之前说的 size 以及消息体:
public class NetworkSend extends ByteBufferSend {
public NetworkSend(String destination, ByteBuffer buffer) {
super(destination, sizeBuffer(buffer.remaining()), buffer);
}
private static ByteBuffer sizeBuffer(int size) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(size);
sizeBuffer.rewind();
return sizeBuffer;
}
}
KafkaChannel
KafkaChannel 用于表示一条客户端与服务端之间的连接,提供了底层 I/O 读写、身份认证、通道状态管理等功能。KafkaChannel 既服务于客户端代码也服务于服务端代码,即如图所示:

先看一下类定义:
public class KafkaChannel implements AutoCloseable {
// 唯一标识整个系统中的网络连接,如localAddr:localPort-remoteAddr:remotePort-index
private final String id;
// 数据传输相关
private final TransportLayer transportLayer; // 传输层对象,底层使用SocketChannel进行读写
private SocketAddress remoteAddress; // 客户端网络地址
private NetworkReceive receive; // 当前正在处理的接收操作,包含从客户端接收的数据
private Send send; // 当前正在处理的发送操作,包含要发送给客户端的数据
private final int maxReceiveSize; // 允许接收的最大消息字节数
private long networkThreadTimeNanos; // 指标监控,网络线程处理该通道的累计时间
// 认证相关,处理SASL等认证机制
private final Supplier<Authenticator> authenticatorCreator;
private Authenticator authenticator; // Authenticator底层使用TransportLayer进行读写,
private int successfulAuthentications;
private long lastReauthenticationStartNanos;
// 状态相关
private ChannelState state; // 通道当前状态,如认证中、关闭中等状态
private boolean disconnected; // 通道是否已断开连接
private ChannelMuteState muteState; // 静默状态,控制是否接收新请求,以及连接断开后是否有待处理的请求
private boolean midWrite; // 是否正在进行写操作,防止并发写入导致的数据损坏
// 资源管理相关
private final MemoryPool memoryPool; // 内存池,用于复用内存,避免频繁分配和释放
private final ChannelMetadataRegistry metadataRegistry; // 记录通道相关的元数据,例如KIP-511引入的客户端软件名称、版本等
}
数据传输相关的成员中,之前介绍了核心的 TransportLayer
、NetworkReceive
以及 NetworkSend
,可以将他们三个分别对应于 NIO 中的 Channel
以及读写 Buffer
,但是在实际进行读写的时候,NIO 的做法是调用 Channel
的读写方法并传入 Buffer
,而 kafka 这里是调用 NetworkReceive
/ NetworkSend
的读写方法并传入 TransportLayer
,这里注意一下即可。
下面我们从 KafkaChannel
生命周期中被调用的方法逐个讲解分析。但因为 KafkaChannel
与最后一节要讲的 KSelector
息息相关,因此期间可能会涉及到 KSelector
,读者可以对比观看甚至自行查阅源码,以便心中有数。
首先是用于检查连接是否已建立的 finishConnect
方法:
public boolean finishConnect() throws IOException {
SocketChannel socketChannel = transportLayer.socketChannel();
if (socketChannel != null) {
remoteAddress = socketChannel.getRemoteAddress();
}
// TransportLayer是否已经建立连接
boolean connected = transportLayer.finishConnect();
if (connected) {
if (ready()) {
state = ChannelState.READY;
} else if (remoteAddress != null) {
state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
} else {
state = ChannelState.AUTHENTICATE;
}
}
return connected;
}
TCP 连接建立完成后,接下来是用于 SSL 握手和 SASL 认证操作的 prepare
方法:
public void prepare() throws AuthenticationException, IOException {
boolean authenticating = false;
try {
if (!transportLayer.ready())
transportLayer.handshake();
if (transportLayer.ready() && !authenticator.complete()) {
authenticating = true;
authenticator.authenticate();
}
} catch (AuthenticationException e) {
// 握手或认证失败,state更新为AUTHENTICATION_FAILED
String remoteDesc = remoteAddress != null ? remoteAddress.toString() : null;
state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e, remoteDesc);
if (authenticating) {
// 如果是认证失败,要延迟关闭连接,防止暴力破解
// 延迟时间由参数connection.failed.authentication.delay.ms决定
delayCloseOnAuthenticationFailure();
throw new DelayedResponseAuthenticationException(e);
}
throw e;
}
if (ready()) {
// 握手与认证完成后,state转到READY
++successfulAuthentications;
state = ChannelState.READY;
}
}
// 延迟关闭期间,禁止OP_WRITE
private void delayCloseOnAuthenticationFailure() {
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
}
下面就是网络读写操作相关方法,首先是用于预发送的 setSend
,这个方法将待发送数据保存到 send
中,并注册 OP_WRITE
监听写事件的发生:
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
// 监听是否可写
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
write
方法将 send
中的数据真正发送出去:
public long write() throws IOException {
if (send == null)
return 0;
// 标识正在处于写入状态中
midWrite = true;
// 数据写入TransportLayer
return send.writeTo(transportLayer);
}
read
方法用于从网络接收数据保存到 NetworkReceive
中:
public long read() throws IOException {
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
// 读取数据到receive中
long bytesReceived = receive(this.receive);
if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) {
// 已经从消息头读取到了消息体大小,但由于内存池内存不足分配内存给消息体
// 因此由于内存压力,要静默通道
mute();
}
return bytesReceived;
}
// 从网络接收数据
private long receive(NetworkReceive receive) throws IOException {
try {
// 从TransportLayer读取数据
return receive.readFrom(transportLayer);
} catch (SslAuthenticationException e) {
String remoteDesc = remoteAddress != null ? remoteAddress.toString() : null;
state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e, remoteDesc);
throw e;
}
}
// 通道静默
void mute() {
if (muteState == ChannelMuteState.NOT_MUTED) {
// 注销OP_READ事件
if (!disconnected) transportLayer.removeInterestOps(SelectionKey.OP_READ);
muteState = ChannelMuteState.MUTED;
}
}
最后,由于 socket 是非阻塞模式,读写可能都需要多次调用,因此提供了 maybeCompleteSend
和 maybeCompleteReceive
返回发送完毕后的 NetworkSend
以及接收完毕后的 NetworkReceive
,如果还未完成则返回 null:
public Send maybeCompleteSend() {
if (send != null && send.completed()) {
midWrite = false;
// 注销OP_WRITE
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
// 返回NetworkSend
Send result = send;
send = null;
return result;
}
return null;
}
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
// 将NetworkReceive的缓冲区指针置0,供稍后外界读取数据
receive.payload().rewind();
// 返回NetworkReceive
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
总结一下,在 KafkaChannel
中的核心操作也就上面这些连接、读、写操作,除了这些外还有几个 authentication 相关的方法,本篇不关注。另外注意到还有个 ChannelMuteState
用于标识通道的静默状态:
public enum ChannelMuteState {
NOT_MUTED,
MUTED,
MUTED_AND_RESPONSE_PENDING,
MUTED_AND_THROTTLED,
MUTED_AND_THROTTLED_AND_RESPONSE_PENDING
}
是否静默对应的只是是否注册了 OP_READ,静默则注销 OP_READ 表示不接受新请求。但我发现其实后面三种状态在代码中并没有用上,这几个状态可能是为了测试/调试,或者用作将来的扩展,我们只关心是否静默即可。
总结
本篇介绍了开篇提及的在 kafka 中对应 Java NIO 所封装的几个类,包括:
- 封装
SocketChannel
的TransportLayer
- 封装
Buffer
的NetworkSend
和NetworkRecieve
- 将上面三个类统一封装的
KafkaChannel
,提供更友好的接口供上层使用
由于篇幅原因,将封装 Selector
的 KafkaSelector
(KSelector)放在了下一篇。