Kafka源码分析10-Producer终于和broker建立连接了
从 Kafka源码分析9-若网络没建立好会发送消息吗 中得知,如果网络没有建立完成,是不会发送消息的。本文就讲解producer与broker是如何建立连接的,直接看sender 线程的步骤八:
{ // 省略。。 /** * 步骤八: * 真正执行网络操作的都是这个NetWorkClient这个组件 * 包括:发送请求,接受响应(处理响应) */ // 我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now); }复制代码
接下来 NetworkClient 的poll() 方法:
public List<ClientResponse> poll(long timeout, long now) { /** * 在这个方法里面有涉及到kafka的网络的方法,但是 * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候 * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据 * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码 * 的时候,其实代码就比较简单了。 */ //步骤一:封装了一个要拉取元数据请求 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //步骤二: 发送请求,进行复杂的网络操作 //但是我们目前还没有学习到kafka的网络 //所以这儿大家就只需要知道这儿会发送网络请求。 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // 省略。。 } public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); //从Selector上找到有多少个key注册了 int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); //因为我们用场景驱动的方式 //我们刚刚确实是注册了一个key if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { //立马就要对这个Selector上面的key要进行处理。 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO 对stagedReceives里面的数据要进行处理 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }复制代码
Java NIO的Selector.select -> 他会负责去看看,注册到他这里的多个Channel,谁有响应过来可以接收,或者谁现在可以执行一个请求的发送,如果Channel可以准备执行IO读写操作,此时就把那个Channel的SelectionKey返回
接下来就会对获取到的一堆SelectionKeys进行处理,到这一步为止,我们就可以看到基于NIO来开发的很多企业级的一些功能,一个是SocketChannel如何构建,二个是一个客户端如何连接多个服务器,三个如何通过轮询调用Selector.select
select一般在这种场景里可以设置对应的超时时间,然后就可以获取到SelectionKeys
我们着重看一下pollSelectionKeys()方法:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } // 省略。。 } } private static class IdleExpiryManager { private final Map<String, Long> lruConnections; private final long connectionsMaxIdleNanos; private long nextIdleCloseCheckTime; }复制代码
idleExpiryManager 包含lruConnections 因为一般来说一个客户端不能放太多的Socket连接资源,否则会导致这个客户端的复杂过重,所以他需要采用lru的方式来不断的淘汰掉最近最少使用的一些连接,很多连接最近没怎么发送消息。
比如说有一个连接,最近一次使用是在1个小时之前了,还有一个连接,最近一次使用是在1分钟之前,此时如果要淘汰掉一个连接,你会选择谁?LRU算法,明显是淘汰掉那个1小时之前才使用的连接
如果发现SelectionKey当前处于的状态是可以建立连接,isConnectable方法是true,接着其实就是调用到KafkaChannel最底层的SocketChannel的finishConnect方法,等待这个连接必须执行完毕
同时接下来就不要关注OP_CONNECT事件了,对于这个Channel,接下来Selector就不要关注连接相关的事件了,也不是OP_READ读取事件,肯定selector要关注的是OP_WRITE事件,要针对这个连接写数据。
至此,Producer终于和broker建立连接了。
作者:hsfxuebao
链接:https://juejin.cn/post/7019503555312631845