阅读 83

Kafka成长记4:Producer 元数据拉取源码原理(下)

上一节结尾,我们总结道: 初始化KafkaProducer时并没有去拉取元数据,但是创建了Selector组件,启动了Sender线程,select阻塞等待请求响应。由于还没有发送任何请求,所以初始化时并没有去真正拉取元数据。

真正拉取元数据是在第一次send方法调用时,会唤醒唤醒Selector之前阻塞的select(),进入第二次while循环,从而发送拉取元数据请求,并且通过Obejct.wait的机制等待60s,等到从Broker拉取元数据成功后,才会继续执行真正的生产消息的请求,否则会报拉取元数据超时异常。

如下图:

file

而唤醒Selector的select之后应该会进入第二次while循环,那第二次while循环如何发送请求拉取元数据请求,并且在成功后notifyAll()进行唤醒操作的呢?

我们今天来一起看一下。

第二次while循环-开始触发元数据拉取

唤醒了阻塞的select,你还记得阻塞后的逻辑么?

唤醒后会根据nioSelector.select()返回的readKeys这个int数字,如果大于0如执行pollSelectionKeys的一些操作,由于直接被wakeUp(),实际readKeys是0,所以poll方法直接就返回了,不会执行pollSelectionKeys的处理。

而且Selector的poll方法返回后,由于pollSelectionKeys没有执行,所以之后一系列方法handleCompletedSends、handleCompletedReceives、handleDisconnections、handleConnections、handleTimedOutRequests均没有执行。(你可以自己尝试断点下,就会发现。)

上面的逻辑执行完成,也就说第一次循环会结束,重新进行第二次循环。整体过程如下图所示:(主要执行了灰色的备注标注的流程)

file

第二次循环maybeUpdate执行的原因

既然进入第二次循环,就会重新执行将重新执行maybeUpdate()、poll()、handle开头的这些方法。

你还记得maybeUpdate的核心脉络么?它主要是根据3个时间决定了metadataTimeout是否为0,来决定是否执行。代码如下:

        @Override         public long maybeUpdate(long now) {             // should we update our metadata?             long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);             long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);             long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;             // if there is no node available to connect, back off refreshing metadata             long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),                     waitForMetadataFetch);             if (metadataTimeout == 0) {                 // Beware that the behavior of this method and the computation of timeouts for poll() are                 // highly dependent on the behavior of leastLoadedNode.                 Node node = leastLoadedNode(now);                 maybeUpdate(now, node);             }             return metadataTimeout;         }         public synchronized long timeToNextUpdate(long nowMs) {             long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);             long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;             return Math.max(timeToExpire, timeToAllowUpdate);         } 复制代码

第一次循环的时候metadataTimeout得到的是非0,而第二次循环这个值其实已经变成0了。

因为我们在上一节send的在sender.wakeyUp()前,曾经执行了metadata.requestUpdate();

这一行代码,它将needUpdate这个标记改为了true。会让决定metadataTimeout的3个时间值中的timeToNextMetadataUpdate也变为0,也就是说timeToNextMetadataUpdate、timeToNextReconnectAttempt、waitForMetadataFetch都变成了0,自然metadataTimeout也是0了。

如下图所示:

file

所以第二次循环的时候会真正执行maybeUpdae的逻辑。而不像之前第一次,什么都没做。

而如果metadataTimeout=0,主要执行了2个方法:

1)leastLoadedNode 这个其实从注释可以看出,是在选择一个Broker节点,从它那里拉取元数据。选择的标准肯定最好连接过的Broker,并且待发送数据少的节点,这些逻辑具体我们就仔细研究了。

2)maybeUpdate 这个方法其实非常关键,是主要建立连接或者发起拉取元数据请求的逻辑

所以这里我们主要看下这个mayBeUpdate的主要逻辑:

/**  * Add a metadata request to the list of sends if we can make one  */ private void maybeUpdate(long now, Node node) {     if (node == null) {         log.debug("Give up sending metadata request since no node is available");         // mark the timestamp for no node available to connect         this.lastNoNodeAvailableMs = now;         return;     }     String nodeConnectionId = node.idString();     if (canSendRequest(nodeConnectionId)) {         this.metadataFetchInProgress = true;         MetadataRequest metadataRequest;         if (metadata.needMetadataForAllTopics())             metadataRequest = MetadataRequest.allTopics();         else             metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));         ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);         log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());         doSend(clientRequest, now);     } else if (connectionStates.canConnect(nodeConnectionId, now)) {         // we don't have a connection to this node right now, make one         log.debug("Initialize connection to node {} for sending metadata request", node.id());         initiateConnect(node, now);         // If initiateConnect failed immediately, this node will be put into blackout and we         // should allow immediately retrying in case there is another candidate node. If it         // is still connecting, the worst case is that we end up setting a longer timeout         // on the next round and then wait for the response.     } else { // connected, but can't send more OR connecting         // In either case, we just need to wait for a network event to let us know the selected         // connection might be usable again.         this.lastNoNodeAvailableMs = now;     } } 复制代码

上面的脉络比较简单,主要就是一个if-else。

if 是否可以发送拉取元数据请求,可以就调用doSend()方法

else 如果不可以发送请求,说明连接还未建立,需要初始化连接,调用initateConnection()方法

整个过程如下图所示:

file

拉取元数据前,是如何基于NIO建立连接的?

maybeUpdae会根据canSendRequest、canConnect方法使用ClusterConnectionStates这个组件,判断是否和Broker建立过连接,这个组件之前第二节我们提到过,是NetworklClient记录和Broker连接情况的组件。代码主要如下:

NetworklClient.java; private boolean canSendRequest(String node) {     return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } ClusterConnectionStates public boolean canConnect(String id, long now) {     NodeConnectionState state = nodeState.get(id);     if (state == null)         return true;     else         return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; } 复制代码

除了连接状态,还做了其他额外逻辑判断,是很细节的判断了,我们抓大放小,在这里不用深究。

主要知道,目前是没有和Broker建立过任何连接的,所以一定会走到initiateConnect()这个方法,来建立连接。让我们一起来看下吧。

    /**      * Initiate a connection to the given node      */     private void initiateConnect(Node node, long now) {         String nodeConnectionId = node.idString();         try {             log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());             this.connectionStates.connecting(nodeConnectionId, now);             selector.connect(nodeConnectionId,                              new InetSocketAddress(node.host(), node.port()),                              this.socketSendBuffer,                              this.socketReceiveBuffer);         } catch (IOException e) {             /* attempt failed, we'll try again after the backoff */             connectionStates.disconnected(nodeConnectionId, now);             /* maybe the problem is our metadata, update it */             metadataUpdater.requestUpdate();             log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);         }     }     public void connecting(String id, long now) {         nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));     } 复制代码

核心脉络非常简单,就两句话:

1)connectionStates.connecting() 记录状态为连接中,这个没啥好说的。

2)selector.connect() 通过Kafka封装的Selector执行connect方法,这个就是建立连接的关键了。

file

Selector的connect方法就十分关键,我们看下它的代码在做什么:

org.apache.kafka.common.network.Selector.java @Override public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {     if (this.channels.containsKey(id))         throw new IllegalStateException("There is already a connection for id " + id);     SocketChannel socketChannel = SocketChannel.open();     socketChannel.configureBlocking(false);     Socket socket = socketChannel.socket();     socket.setKeepAlive(true);     if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)         socket.setSendBufferSize(sendBufferSize);     if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)         socket.setReceiveBufferSize(receiveBufferSize);     socket.setTcpNoDelay(true);     boolean connected;     try {         connected = socketChannel.connect(address);     } catch (UnresolvedAddressException e) {         socketChannel.close();         throw new IOException("Can't resolve address: " + address, e);     } catch (IOException e) {         socketChannel.close();         throw e;     }     SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);     KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);     key.attach(channel);     this.channels.put(id, channel);     if (connected) {         // OP_CONNECT won't trigger for immediately connected channels         log.debug("Immediately connected to node {}", channel.id());         immediatelyConnectedKeys.add(key);         key.interestOps(0);     } } PlaintextChannelBuilder.java     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {         KafkaChannel channel = null;         try {             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);             Authenticator authenticator = new DefaultAuthenticator();             authenticator.configure(transportLayer, this.principalBuilder, this.configs);             channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);         } catch (Exception e) {             log.warn("Failed to create channel due to ", e);             throw new KafkaException(e);         }         return channel;     } 复制代码

上面connect()方法的核心脉络主要就是:

1)SocketChannel.open()创建了NIO的SocketChannel

2)设置了一些Sokect参数,通过SocketChannel发起了connect连接(这个是NIO常见的操作,大家可以自己取搜一个Java原生NIO的HelloWorld,或者之后关注NIO成长记,就会对这个肯定不会陌生了)

3)socketChannel向Selector注册register,并且指明关注建立连接请求SelectionKey.OP_CONNECT,通过SelectionKey关联对应的SocketChannel

4)buildChannel将上面SocketChannel、Selector、SelectionKey整个关系封装到了KafkaChannel中,这里比较坑的是,它还二次封装了一个对象叫做TransportLayer。并且通过 key.attach(channel);将KafkaChannel绑定了到了SelcetionKey上去

5) 通过Map<String, KafkaChannel> channels,缓存了KafkaChannel

整个逻辑如下图所示:

file

到这里initateConnect()方法就执行完成了,maybeupdate方法返回,接着进入第二次while循环的下一步,Selector.poll();

如下粉红线条所示:

file

Selector.poll();之前我们就知道它底层会调用nioSelector的select()阻塞等待是否有关心的请求。

如果你熟悉NIO的话,就知道,如果之前发送的connect连接建立成功,那注册的Selectionkey有对应关心的事件SelectionKey.OP_CONNECT,就会跳出阻塞。

这个过程如下图所示:

file

从上图来看,接着就一定会执行pollSelectionKeys()方法了:

  private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {         Iterator<SelectionKey> iterator = selectionKeys.iterator();         while (iterator.hasNext()) {             SelectionKey key = iterator.next();             iterator.remove();             KafkaChannel channel = channel(key);             // register all per-connection metrics at once             sensors.maybeRegisterConnectionMetrics(channel.id());             lruConnections.put(channel.id(), currentTimeNanos);             try {                 /* complete any connections that have finished their handshake (either normally or immediately) */                 if (isImmediatelyConnected || key.isConnectable()) {                     if (channel.finishConnect()) {                         this.connected.add(channel.id());                         this.sensors.connectionCreated.record();                     } else                         continue;                 }                 /* if channel is not ready finish prepare */                 if (channel.isConnected() && !channel.ready())                     channel.prepare();                 /* if channel is ready read from any connections that have readable data */                 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {                     NetworkReceive networkReceive;                     while ((networkReceive = channel.read()) != null)                         addToStagedReceives(channel, networkReceive);                 }                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */                 if (channel.ready() && key.isWritable()) {                     Send send = channel.write();                     if (send != null) {                         this.completedSends.add(send);                         this.sensors.recordBytesSent(channel.id(), send.size());                     }                 }                 /* cancel any defunct sockets */                 if (!key.isValid()) {                     close(channel);                     this.disconnected.add(channel.id());                 }             } catch (Exception e) {                 String desc = channel.socketDescription();                 if (e instanceof IOException)                     log.debug("Connection with {} disconnected", desc, e);                 else                     log.warn("Unexpected error from {}; closing connection", desc, e);                 close(channel);                 this.disconnected.add(channel.id());             }         }     } 复制代码

这个方法的逻辑看上去不太清晰,没关系,我们可以debug看下:

file

你会发现这个方法主要就是在遍历有响应的SelectionKeys集合,由于之前只注册了一个SelectioinKey,关系Connect类型的请求,所以这里我们只遍历到了一个。

接着你一路断点就会发现,这个while循环核心执行如下一句话:

private final List<String> connected; if (channel.finishConnect()) {      this.connected.add(channel.id());      this.sensors.connectionCreated.record();  } else   continue;  } 复制代码

KafkaChannel.java public boolean finishConnect() throws IOException {     return transportLayer.finishConnect(); } PlaintextTransportLayer.java public boolean finishConnect() throws IOException {     boolean connected = socketChannel.finishConnect();     if (connected)     key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);     return connected; } 复制代码

上面if-else这段代码的核心脉络就是:

首先通过channel.finishConnect() 判断了连接是否建立,底层本质就是NIO的socketChannel.finishConnect();,如果连接建立,修改了SelectionKey关心的操作主要是SelectionKey.OP_READ类型,不再是OP_CONNECT类型了。之后将建立连接的ChannelId缓存了起来,在一个List connected集合中。

整体如下图所示:

file

poll方法就执行完成了,第二次while循环的第二步也就执行完成了,最后while循环还会执行一堆handle方法:

handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleTimedOutRequests(responses, updatedNow); 复制代码

其实你都可以猜出来,建立连接后会执行哪一个方法?没错,会进行handleConnections()的执行,其他方法压根执行不到,都是直接返回的。

handleConnections执行什么逻辑呢?

NetWorkClient.java    private void handleConnections() {         for (String node : this.selector.connected()) {             log.debug("Completed connection to node {}", node);             this.connectionStates.connected(node);         }     } Selector.java public List<String> connected() {     return this.connected; }      ClusterConnectionStates.java     public void connected(String id) {         NodeConnectionState nodeState = nodeState(id);         nodeState.state = ConnectionState.CONNECTED;     } 复制代码

其实就是遍历了建立了Channel的Node(Broker),记录了这个Node的连接状态为CONNECTED。(你还记得之前maybeUpdate的执行initiateConnect()时候是状态是CONNECTING么?)

到这里其实第二次while循环就执行完成了,第二次循环也是一样核心执行这三大步的,maybeUpdate()->poll()->handle开头方法。主要做的事情就是和Broker通过NIO的方式建立了连接。

而之前的第一次循环,maybeUpdate()->poll()->handle开头方法,中主要就是poll()方法阻塞了下,其余什么都没有干。

第二次循环的整体过程,总结如下的大图:

file

经过这第二次循环逻辑,是不是你对Producer有了更熟悉的认识了呢?

之后还有会再次执行第三次while循环甚至更多,都是一样的再次执行maybeUpdate()->poll()->handle开头方法的逻辑。

发送元数据的拉取请求

Sender的再次执行第三次循环,第一步肯定还是执行maybeUpdate(),而这次执行maybeUpdate(),连接已经建立,会执行另一段逻辑,doSend()方法,真正进行元数据的拉取。让我们快来一起看下吧!

  /**          * Add a metadata request to the list of sends if we can make one          */         private void maybeUpdate(long now, Node node) {             if (node == null) {                 log.debug("Give up sending metadata request since no node is available");                 // mark the timestamp for no node available to connect                 this.lastNoNodeAvailableMs = now;                 return;             }             String nodeConnectionId = node.idString();             if (canSendRequest(nodeConnectionId)) {                 this.metadataFetchInProgress = true;                 MetadataRequest metadataRequest;                 if (metadata.needMetadataForAllTopics())                     metadataRequest = MetadataRequest.allTopics();                 else                     metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));                 ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());                 doSend(clientRequest, now);             } else if (connectionStates.canConnect(nodeConnectionId, now)) {                 // we don't have a connection to this node right now, make one                 log.debug("Initialize connection to node {} for sending metadata request", node.id());                 initiateConnect(node, now);                 // If initiateConnect failed immediately, this node will be put into blackout and we                 // should allow immediately retrying in case there is another candidate node. If it                 // is still connecting, the worst case is that we end up setting a longer timeout                 // on the next round and then wait for the response.             } else { // connected, but can't send more OR connecting                 // In either case, we just need to wait for a network event to let us know the selected                 // connection might be usable again.                 this.lastNoNodeAvailableMs = now;             } 复制代码

这次执行到maybeUpdate的时候,会执行

//NetworkClient.java private boolean canSendRequest(String node) {     return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } //ClusterConnectionStates.java public boolean isConnected(String id) {     NodeConnectionState state = nodeState.get(id);     return state != null && state.state == ConnectionState.CONNECTED; } //Selector.java public boolean isChannelReady(String id) {     KafkaChannel channel = this.channels.get(id);     return channel != null && channel.ready(); } //PlaintextTransportLayer.java public boolean ready() {     return true; } //InFlightRequests.java private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>(); public boolean canSendMore(String node) {     Deque<ClientRequest> queue = requests.get(node);     return queue == null || queue.isEmpty() ||         (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); } 复制代码

上面通过一堆组件,当三个条件都是true才会执行,doSend方法。

connectionStates.isConnected(node):肯定是ture了,因为连接状态已经记录为Connected了。

selector.isChannelReady(node) :之前建立的Kafkachannel缓存在了map中,channel.ready()默认永远返回ture,

inFlightRequests.canSendMore(node):requests队列非空并且队列元素数量小于maxInFlightRequestsPerConnection 默认5,这个配置即可。

第二次循环的时候,队列压根是空的,所以这个条件也是ture了。

/** *这里涉及了一个很关键的内存结构InFlightRequests 中的Map<String, Deque<ClientRequest>> requests,一个Map和双向队列组成的内存结构。之前分析*Network的时候我们提到过这个组件,那个时候只是通过注释知道:InFlightRequests ,是表示已发送或正在发送但尚未收到响应的请求集合。具体做什么的并不知道。 *但是,这里我们就可以看到,在发送请求前,请求request会进入这个内存结构进行暂存,和注释表达的很接近了,经常会用来判断有没有待发送请求。 */ 复制代码

也就是说当连接已建立后,第三次循环就会执行到doSend方法逻辑了。

如下图所示:

file

接着if如果通过就是执行了如下逻辑了:

if (canSendRequest(nodeConnectionId)) {     this.metadataFetchInProgress = true;     MetadataRequest metadataRequest;     if (metadata.needMetadataForAllTopics())         metadataRequest = MetadataRequest.allTopics();     else         metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));     ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);     log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());     doSend(clientRequest, now); } public RequestSend(String destination, RequestHeader header, Struct body) {     super(destination, serialize(header, body));     this.header = header;     this.body = body; } public static ByteBuffer serialize(RequestHeader header, Struct body) {     ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());     header.writeTo(buffer);     body.writeTo(buffer);     buffer.rewind();     return buffer; } 复制代码

首先可以看到,doSend前对请求参数做了各种层次的包装,最终对象序列化成ByteBuffer。(这里按照什么格式序列化成ByteBuffer的此时我们暂时不做研究,之后研究Kafka解决粘包和拆包问题的时候我们会再次提到的)

具体细节我就不带大家看了,简单概括下就是:MetadataRequest->RequestHeader+Struct-=RequestSend(serialize方法转为ByteBuffer)->ClientRequest

file

包装好请求之后,调用了doSend方法:

private void doSend(ClientRequest request, long now) {     request.setSendTimeMs(now);     this.inFlightRequests.add(request);     selector.send(request.request()); } //ClientRequest.java public RequestSend request() {     return request; } // Selector.java   public void send(Send send) {         KafkaChannel channel = channelOrFail(send.destination());         try {             channel.setSend(send);         } catch (CancelledKeyException e) {             this.failedSends.add(send.destination());             close(channel);         }     }          private KafkaChannel channelOrFail(String id) {         KafkaChannel channel = this.channels.get(id);         if (channel == null)             throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());         return channel;     } // KafkaChannel.java public void setSend(Send send) {     if (this.send != null)         throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");     this.send = send;     this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // PlaintextTransportLayer.java public void addInterestOps(int ops) {     key.interestOps(key.interestOps() | ops); } 复制代码

这段方法就比较有意思了,你会发现doSend主要脉络如下:

1)将请求暂存到了inFlightRequests内存结构中

2)Selector从map中获取到之前缓存的KafkaChannel

3)  KafkaChannel记录了发送的请求数据RequestSend,并且补充了对写请求的关注(在之前连接建立后,取消了OP_CONNECT关注,增加关注OP_READ,你还记得么?)

上面的操作基本就是NIO的常规操作了,获取Channel,设置关注事件。但是...

channel.write操作呢?这里并没有写数据出去呀?所以KafkaChannel这个方法叫setSend(Send send),只是设置了待发送的对象,和关心的OP_WRITE而已。

整个过程如下图所示:

file

doSend方法执行后,metadataUpdater.maybeUpdate的方法也就返回了,接着就会进入第三次循环的第二大步,selector.poll()方法,最后会执行handle开头的方法。这个相信你已经不陌生了。

而selector.poll()核心就两步:

1) selector.select() 阻塞等待服务端返回关心的事件

2)执行pollSelectionKeys(),遍历所有SelectionKeys,根据SelectionKey关心的事件,执行不同的处理(之前建立连接的时候,只是根据OP_CONNECT,记录了连接成功的ChannelId。)

这里由于客户端关心的OP_READ和OP_WRITE事件,所以第三次执行循环的时候,这里selector.select() 阻塞会跳出,执行后面pollSelectionKeys()的逻辑。

这里我直接截取了关键逻辑出来,Selector.java 第三次while循环执行时,pollSelectionKeys方法 遍历SelectionKeys的核心逻辑如下:

//Selector.java 第三次while循环执行时,pollSelectionKeys方法 遍历SelectionKeys的核心逻辑 if (channel.ready() && key.isWritable()) {     Send send = channel.write();     if (send != null) {     this.completedSends.add(send);     this.sensors.recordBytesSent(channel.id(), send.size());     } } 复制代码

上面的核心主要就是:

1)通过channel.write()将拉取元数据的请求发送出去!

2)发送完成后,记录已经发送成功的请求到List completedSends;中

在这里我们终于看到了channel.write() 而且最终底层是通过的nio的socketChannel.write,将之前序列化好的ByteBuffer写出去的。而且发送完成会移除SelectionKey.OP_WRITE的关注,不再写出数据了。

//KafkaChannel.java public Send write() throws IOException {     Send result = null;     if (send != null && send(send)) {         result = send;         send = null;     }     return result; } private boolean send(Send send) throws IOException {     send.writeTo(transportLayer);     if (send.completed())         transportLayer.removeInterestOps(SelectionKey.OP_WRITE);     return send.completed(); } //PlaintextTransportLayer.java     public long write(ByteBuffer[] srcs) throws IOException {         return socketChannel.write(srcs);     } 复制代码

数据终于发送完成了,整个过程可以总结如下图所示:

file

while循环的第三次执行,已经执行了maybeUpdat()和poll()方法了,最后就是执行handle开头的方法了。

   handleCompletedSends(responses, updatedNow);    handleCompletedReceives(responses, updatedNow);    handleDisconnections(responses, updatedNow);    handleConnections();    handleTimedOutRequests(responses, updatedNow); 复制代码

这些方法中肯定的执行是handleCompletedSends方法了。

    private void handleCompletedSends(List<ClientResponse> responses, long now) {         // if no response is expected then when the send is completed, return it         for (Send send : this.selector.completedSends()) {             ClientRequest request = this.inFlightRequests.lastSent(send.destination());             if (!request.expectResponse()) {                 this.inFlightRequests.completeLastSent(send.destination());                 responses.add(new ClientResponse(request, now, false, null));             }         }     } 复制代码

这里会从之前暂存的inFlightRequests取出来发送的请求,request.expectResponse()默认是true,所以if条件不会成立,handleCompletedSends相当于什么都没做。从注释看这个方法是为了处理:"如果没有响应,那么当发送完成时,返回它。" 也就是说这个逻辑不是关键逻辑,我们抓大放小,跳过就行了。

随着你阅读源码的经验提升, 你会经常发现这种不是核心的逻辑。此时你一定要学会取舍,学会抓大放小的思想。

既然如此,handle开头的方法其实就执行完成了。该进入第四次while循环了....

接收拉取的元数据,唤醒KafkaProduer.Send方法

其实你可以想到,第四次while循环会做些什么。当然是接收服务端返回的元数据,唤醒之前wait的KafkaProduer.Send方法了。有了之前3次while循环的经验,这次让我们直接找到核心逻辑,看看它是如何做的,一起快速的看一下吧!

1)首先执行的maybeUpdate:

第四次while循环,maybeUpdate中waitForMetadataFetch会计算出一个非0的值,导致maybeUpdate和第一次循环一样,什么都不会执行

long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; 复制代码

**2)接着执行selector.poll(),**会阻塞在select()方法,但是当服务器返回数据,由于我们SelectionKey上目前只关注了OP_READ,所以会此时会跳出阻塞执行对应的pollSelectionKeys中的逻辑

/* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {     NetworkReceive networkReceive;     while ((networkReceive = channel.read()) != null)     addToStagedReceives(channel, networkReceive); } Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; /**    * adds a receive to staged receives   */ private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {     if (!stagedReceives.containsKey(channel))         stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());     Deque<NetworkReceive> deque = stagedReceives.get(channel);     deque.add(receive); } 复制代码

这段逻辑其实就是就是接受ByteBuffer为放入NetworkReceive对象中,底层本质调用的是SocketChannel的read()方法,就是常见的NIO操作而已。和发送数据的是类似的。底层这里就不带大家看了,相信你自己可以看明白的。

除了接受数据到NetworkReceive对象中,还会将接受的数据暂存到一个双端队列Deque中。Map<KafkaChannel, Deque> stagedReceives;

3) 执行完poll方法后,就该执行handle开头的方法了,此次执行的是handleCompletedReceives()方法:

    /**      * Handle any completed receives and update the response list with the responses received.      *      * @param responses The list of responses to update      * @param now The current time      */     private void handleCompletedReceives(List<ClientResponse> responses, long now) {         for (NetworkReceive receive : this.selector.completedReceives()) {             String source = receive.source();             ClientRequest req = inFlightRequests.completeNext(source);             Struct body = parseResponse(receive.payload(), req.request().header());             if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))                 responses.add(new ClientResponse(req, now, false, body));         }     }        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {             short apiKey = req.request().header().apiKey();             if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {                 handleResponse(req.request().header(), body, now);                 return true;             }             return false;         } 复制代码

和这个方法脉络很简单:

1)根据之前暂存的请求ClientRequest,从NetworkReceive找到对应的响应,接着进行一系列的解析ButeBuffer为一个Struct对象。

2)执行DefaultMetadataUpdater的maybeHandleCompletedReceive方法

之后的DefaultMetadataUpdater的maybeHandleCompletedReceive这个方法有做什么的?

         private void handleResponse(RequestHeader header, Struct body, long now) {             this.metadataFetchInProgress = false;             MetadataResponse response = new MetadataResponse(body);             Cluster cluster = response.cluster();             // check if any topics metadata failed to get updated             Map<String, Errors> errors = response.errors();             if (!errors.isEmpty())                 log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being             // created which means we will get errors and no nodes until it exists             if (cluster.nodes().size() > 0) {                 this.metadata.update(cluster, now);             } else {                 log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());                 this.metadata.failedUpdate(now);             }   }     public synchronized void update(Cluster cluster, long now) {         this.needUpdate = false;         this.lastRefreshMs = now;         this.lastSuccessfulRefreshMs = now;         this.version += 1;         for (Listener listener: listeners)             listener.onMetadataUpdate(cluster);         // Do this after notifying listeners as subscribed topics' list can be changed by listeners         this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;         notifyAll();         log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);     } 复制代码

上面的代码核心脉络如下:

1)  maybeHandleCompletedReceive会将Strut对象转为MetadataResponse之后转为Cluster对象

2)最后根据Cluster中的Nodes信息,如果大于0,执行metadata.update()方法,执行一些Listener回调,最后关键的是metadata.notifyAll() 唤醒了之前阻塞等待的KafkaProducer.send()

整个过程总结如下图所示:

file

总结

到此元数据的拉取源码原理我们就研究的完了。其实当你研究完成之后,你会发现,我们执行了核心while循环4次,随着重复重复的过程,好像源码原理并没有多难了。

其实就是这样的,很多时候,简单的事情重复做,只要多思考多琢磨,就会发现规律,就会慢慢理解事情的本质。 这个思想比我们研究清楚Kafka拉取元数据的源码原理重要的多。

另外就是元数据拉取说白了其实并不复杂,无非都是连接建立,请求发送,请求响应。是Kafka使用了一些有意思的机制,wait+notifyAll机制和NIO的方式而已。

之前我一直给你们画的是详细的逻辑图,你们可以自己画了一个简图,总结下它的逻辑。如果自己能画图,给别人解释明白,就说明你真正理解了。

不过其实在这个过程中Kafka还是做了很多思考的,你可以思考下,它的一些亮点和优势,就像之前ZK选举原理研究后一样。你思考出的思路和想法,远远大于知识本身。你可以留言在评论去给我我们一起讨论!

Kafka成长记定位虽然偏向于提升技术深度,如果你熟练的使用过NIO,当然很好理解元数据拉取过程中的NIO知识。

如果不太了解NIO的,可以自己百度下NIO的基本知识。或者关注我之后出的《NIO小白起步营》

我们下一节再见!


作者:繁茂
链接:https://juejin.cn/post/7015760802166620191


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐