Kafka源码分析18-如何处理长时间没有接收到响应的消息
我们直接看NetworkClient 的poll()方法:
public List<ClientResponse> poll(long timeout, long now) { //步骤三:处理响应,响应里面就会有我们需要的元数据。 handleAbortedSends(responses); handleCompletedSends(responses, updatedNow); /** * 这个地方是我们在看生产者是如何获取元数据的时候,看的。 * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。 * 获取元数据 -> 判断网络连接是否建立好 -> 建立网络连接 * -> 发送请求(获取元数据的请求) -> 服务端发送回来响应(带了集群的元数据信息) */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); //处理超时的请求 handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { try { //调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } return responses; }复制代码
我们来看处理超时的请求handleTimedOutRequests(responses, updatedNow)
private void handleTimedOutRequests(List<ClientResponse> responses, long now) { //获取到请求超时的主机。 List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs); for (String nodeId : nodeIds) { // close connection to the node //关闭请求超时的主机的连接 this.selector.close(nodeId); log.debug("Disconnecting from node {} due to request timeout.", nodeId); //我们猜应该是会去修改 连接的状态 processDisconnection(responses, nodeId, now); } // we disconnected, so we should probably refresh our metadata if (!nodeIds.isEmpty()) metadataUpdater.requestUpdate(); } private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) { //修改连接状态 connectionStates.disconnected(nodeId, now); nodeApiVersions.remove(nodeId); nodesNeedingApiVersionsFetch.remove(nodeId); for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId); if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id) metadataUpdater.handleDisconnection(request.destination); else //对这些请求进行处理 //大家会看到一个比较有意思的事 //自己封装了一个响应。这个响应里面没有服务端响应消息(服务端没给响应) //失去连接的状态表标识为true responses.add(request.disconnected(now)); } }复制代码
也就是对于长时间没有接收到响应的请求,Producer会自己封装一下响应消息返回。
作者:hsfxuebao
链接:https://juejin.cn/post/7019566875021426701