Kafka源码分析14-如何处理响应消息
本文分析Producer如何响应消息,NetworkClient的poll():
public List<ClientResponse> poll(long timeout, long now) { /** * 在这个方法里面有涉及到kafka的网络的方法,但是 * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候 * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据 * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码 * 的时候,其实代码就比较简单了。 */ //步骤一:封装了一个要拉取元数据请求 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //步骤二: 发送请求,进行复杂的网络操作 //但是我们目前还没有学习到kafka的网络 //所以这儿大家就只需要知道这儿会发送网络请求。 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); //步骤三:处理响应,响应里面就会有我们需要的元数据。 handleAbortedSends(responses); handleCompletedSends(responses, updatedNow); /** * 这个地方是我们在看生产者是如何获取元数据的时候,看的。 * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。 * 获取元数据 -> 判断网络连接是否建立好 -> 建立网络连接 * -> 发送请求(获取元数据的请求) -> 服务端发送回来响应(带了集群的元数据信息) */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); //处理超时的请求 handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { try { //调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } return responses; }复制代码
handleCompletedReceives(responses, updatedNow)
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { //获取broker id String source = receive.source(); /** * kafka 有这样的一个机制:每个连接可以容忍5个发送出去了(参数配置),但是还没接收到响应的请求。 */ //从数据结构里面移除已经接收到响应的请求。 //把之前存入进去的请求也获取到了 InFlightRequest req = inFlightRequests.completeNext(source); //解析服务端发送回来的请求(里面有响应的结果数据) AbstractResponse body = parseResponse(receive.payload(), req.header); log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body); //TODO 如果是关于元数据信息的响应 if (req.isInternalRequest && body instanceof MetadataResponse) //解析完了以后就把封装成一个一个的clientResponse //body 存储的是响应的内容 //req 发送出去的那个请求信息 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else responses.add(req.completed(body, now)); } }复制代码
然后response 就构建好了,后面执行回调函数response.onComplete(); 回调函数应该是我们在发送producer请求的时候带的,我们来看看代码:
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { // 省略。。 ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { // 回调函数被调用了,其实就是这个方法被执行了 handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); // 封装请求 参数包含一个回调函数 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { int correlationId = response.requestHeader().correlationId(); //这个地方就是就是一个特殊情况 //我们要发送请求了,但是发现 broker失去连接。 //不过这个是一个小概率事件。 if (response.wasDisconnected()) { log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination()); for (RecordBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now); } else if (response.versionMismatch() != null) { log.warn("Cancelled request {} due to a version mismatch with node {}", response, response.destination(), response.versionMismatch()); for (RecordBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now); } else { log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it //所以我们正常情况下,走的都是这个分支 if (response.hasResponse()) { ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); /** * 遍历每个分区的响应 */ for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); //获取到当前分区的响应。 RecordBatch batch = batches.get(tp); //对响应进行处理 completeBatch(batch, partResp, correlationId, now); } this.sensors.recordLatency(response.destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests //acks=0意思就是不需要返回响应。 //1 p -> b leader partion //-1 p -> broker leader partition -> follower partition //在生产环境里面,我们一般是不会把acks 参数设置为0 for (RecordBatch batch : batches.values()) { completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now); } } } }复制代码
接下看对响应处理的方法completeBatch(batch, partResp, correlationId, now):
private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) { //如果处理成功那就是成功了,但是如果服务端那儿处理失败了,是不是也要给我们发送回来异常的信息。 //error 这个里面存储的就是服务端发送回来的异常码 Errors error = response.error; //如果响应里面带有异常 并且 这个请求是可以重试的 if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts - 1, error); //重新把发送失败等着批次 加入到队列里面。 this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { //这儿过来的数据:带有异常,但是不可以重试(1:压根就不让重试2:重试次数超了) //其余的都走这个分支。 RuntimeException exception; //如果响应里面带有 没有权限的异常 if (error == Errors.TOPIC_AUTHORIZATION_FAILED) //自己封装一个异常信息(自定义了异常) exception = new TopicAuthorizationException(batch.topicPartition.topic()); else exception = error.exception(); // tell the user the result of their request //TODO 核心代码 把异常的信息也给带过去了 //我们刚刚看的就是这儿的代码 //里面调用了用户传进来的回调函数 //回调函数调用了以后 //说明我们的一个完整的消息的发送流程就结束了。 batch.done(response.baseOffset, response.logAppendTime, exception); //看起来这个代码就是要回收资源的。 this.accumulator.deallocate(batch); if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition); metadata.requestUpdate(); } // Unmute the completed partition. if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition); }复制代码
现在我们只考虑生产者成功发送消息的情况(异常情况我们后面分析),直接进入batch.done(response.baseOffset, response.logAppendTime, exception)
public void done(long baseOffset, long logAppendTime, RuntimeException exception) { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); if (completed.getAndSet(true)) throw new IllegalStateException("Batch has already been completed"); // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call produceFuture.set(baseOffset, logAppendTime, exception); // execute callbacks /** * * 我们发送数据的时候,一条消息就代表一个thunk * 遍历所有我们当时发送出去的消息。 */ for (Thunk thunk : thunks) { try { if (exception == null) { RecordMetadata metadata = thunk.future.value(); //调用我们发送的消息的回调函数 //大家还记不记得我们在发送数据的时候 //还不是绑定了一个回调函数。 //这儿说的调用的回调函数 //就是我们开发,生产者代码的时候,我们用户传进去的那个回调函数。 thunk.callback.onCompletion(metadata, null); //带过去的就是没有异常 //也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。 } else { //如果有异常就会把异常传给回调函数。 //由我们用户自己去捕获这个异常。 //然后对这个异常进行处理 //大家根据自己公司的业务规则进行处理就可以了。 //如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception //这个异常,如果用户捕获到了,做对应的处理就可以了。 thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e); } } produceFuture.done(); }复制代码
thunk.callback.onCompletion(metadata, null);
回调生产者发送消息的回调函数,如下方法:
producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr))复制代码
至此,生产者就把消息成功发送出去了
作者:hsfxuebao
链接:https://juejin.cn/post/7019556999985627144