阅读 87

Kafka源码分析14-如何处理响应消息

本文分析Producer如何响应消息,NetworkClient的poll():

public List<ClientResponse> poll(long timeout, long now) {
    /**
     * 在这个方法里面有涉及到kafka的网络的方法,但是
     * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候
     * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据
     * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码
     * 的时候,其实代码就比较简单了。
     */
    //步骤一:封装了一个要拉取元数据请求
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {

        //步骤二: 发送请求,进行复杂的网络操作
        //但是我们目前还没有学习到kafka的网络
        //所以这儿大家就只需要知道这儿会发送网络请求。
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();

    //步骤三:处理响应,响应里面就会有我们需要的元数据。
    handleAbortedSends(responses);
    handleCompletedSends(responses, updatedNow);
    /**
     * 这个地方是我们在看生产者是如何获取元数据的时候,看的。
     * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。
     * 获取元数据 -> 判断网络连接是否建立好 -> 建立网络连接
     * -> 发送请求(获取元数据的请求) -> 服务端发送回来响应(带了集群的元数据信息)
     */
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    //处理超时的请求
    handleTimedOutRequests(responses, updatedNow);

    // invoke callbacks
    for (ClientResponse response : responses) {
        try {

            //调用的响应的里面的我们之前发送出去的请求的回调函数
            //看到了这儿,我们回头再去看一下
            //我们当时发送请求的时候,是如何封装这个请求。
            //不过虽然目前我们还没看到,但是我们可以大胆猜一下。
            //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }

    return responses;
}复制代码

handleCompletedReceives(responses, updatedNow)

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        //获取broker id
        String source = receive.source();
        /**
         * kafka 有这样的一个机制:每个连接可以容忍5个发送出去了(参数配置),但是还没接收到响应的请求。
         */
        //从数据结构里面移除已经接收到响应的请求。
        //把之前存入进去的请求也获取到了
        InFlightRequest req = inFlightRequests.completeNext(source);
        //解析服务端发送回来的请求(里面有响应的结果数据)
        AbstractResponse body = parseResponse(receive.payload(), req.header);
        log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
        //TODO 如果是关于元数据信息的响应
        if (req.isInternalRequest && body instanceof MetadataResponse)
            //解析完了以后就把封装成一个一个的clientResponse
            //body 存储的是响应的内容
            //req 发送出去的那个请求信息
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            responses.add(req.completed(body, now));
    }
}复制代码

然后response 就构建好了,后面执行回调函数response.onComplete(); 回调函数应该是我们在发送producer请求的时候带的,我们来看看代码:

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
   // 省略。。
    ProduceRequest.Builder requestBuilder =
            new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            // 回调函数被调用了,其实就是这个方法被执行了
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    // 封装请求  参数包含一个回调函数
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);

    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
    int correlationId = response.requestHeader().correlationId();
    //这个地方就是就是一个特殊情况
    //我们要发送请求了,但是发现 broker失去连接。
    //不过这个是一个小概率事件。
    if (response.wasDisconnected()) {
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
        for (RecordBatch batch : batches.values())
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
    } else if (response.versionMismatch() != null) {
        log.warn("Cancelled request {} due to a version mismatch with node {}",
                response, response.destination(), response.versionMismatch());
        for (RecordBatch batch : batches.values())
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now);
    } else {
        log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
        // if we have a response, parse it
        //所以我们正常情况下,走的都是这个分支
        if (response.hasResponse()) {
            ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
            /**
             * 遍历每个分区的响应
             */
            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                TopicPartition tp = entry.getKey();
                ProduceResponse.PartitionResponse partResp = entry.getValue();

                //获取到当前分区的响应。
                RecordBatch batch = batches.get(tp);
                //对响应进行处理
                completeBatch(batch, partResp, correlationId, now);
            }
            this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
            this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
        } else {
            // this is the acks = 0 case, just complete all requests
            //acks=0意思就是不需要返回响应。
            //1 p -> b  leader partion
            //-1  p -> broker leader partition -> follower partition
            //在生产环境里面,我们一般是不会把acks 参数设置为0
            for (RecordBatch batch : batches.values()) {
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
            }
        }
    }
}复制代码

接下看对响应处理的方法completeBatch(batch, partResp, correlationId, now):

private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now) {
    //如果处理成功那就是成功了,但是如果服务端那儿处理失败了,是不是也要给我们发送回来异常的信息。
    //error 这个里面存储的就是服务端发送回来的异常码
    Errors error = response.error;
    //如果响应里面带有异常 并且 这个请求是可以重试的
    if (error != Errors.NONE && canRetry(batch, error)) {
        // retry
        log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                 correlationId,
                 batch.topicPartition,
                 this.retries - batch.attempts - 1,
                 error);
        //重新把发送失败等着批次 加入到队列里面。
        this.accumulator.reenqueue(batch, now);
        this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
    } else {
        //这儿过来的数据:带有异常,但是不可以重试(1:压根就不让重试2:重试次数超了)

        //其余的都走这个分支。
        RuntimeException exception;
        //如果响应里面带有 没有权限的异常
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
            //自己封装一个异常信息(自定义了异常)
            exception = new TopicAuthorizationException(batch.topicPartition.topic());
        else
            exception = error.exception();
        // tell the user the result of their request
        //TODO 核心代码 把异常的信息也给带过去了
        //我们刚刚看的就是这儿的代码
        //里面调用了用户传进来的回调函数
        //回调函数调用了以后
        //说明我们的一个完整的消息的发送流程就结束了。
        batch.done(response.baseOffset, response.logAppendTime, exception);
        //看起来这个代码就是要回收资源的。
        this.accumulator.deallocate(batch);
        if (error != Errors.NONE)
            this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
    }
    if (error.exception() instanceof InvalidMetadataException) {
        if (error.exception() instanceof UnknownTopicOrPartitionException)
            log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                    "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
        metadata.requestUpdate();
    }

    // Unmute the completed partition.
    if (guaranteeMessageOrder)
        this.accumulator.unmutePartition(batch.topicPartition);
}复制代码

现在我们只考虑生产者成功发送消息的情况(异常情况我们后面分析),直接进入batch.done(response.baseOffset, response.logAppendTime, exception)

public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
    log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
              topicPartition, baseOffset, exception);

    if (completed.getAndSet(true))
        throw new IllegalStateException("Batch has already been completed");

    // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
    produceFuture.set(baseOffset, logAppendTime, exception);

    // execute callbacks
    /**
     *
     * 我们发送数据的时候,一条消息就代表一个thunk
     * 遍历所有我们当时发送出去的消息。
     */
    for (Thunk thunk : thunks) {
        try {
            if (exception == null) {
                RecordMetadata metadata = thunk.future.value();
                //调用我们发送的消息的回调函数
                //大家还记不记得我们在发送数据的时候
                //还不是绑定了一个回调函数。
                //这儿说的调用的回调函数
                //就是我们开发,生产者代码的时候,我们用户传进去的那个回调函数。

                thunk.callback.onCompletion(metadata, null);
                //带过去的就是没有异常
                //也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。
            } else {
                //如果有异常就会把异常传给回调函数。
                //由我们用户自己去捕获这个异常。
                //然后对这个异常进行处理
                //大家根据自己公司的业务规则进行处理就可以了。

                //如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception
                //这个异常,如果用户捕获到了,做对应的处理就可以了。
                thunk.callback.onCompletion(null, exception);
            }
        } catch (Exception e) {
            log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
        }
    }

    produceFuture.done();
}复制代码

thunk.callback.onCompletion(metadata, null); 回调生产者发送消息的回调函数,如下方法:

producer.send(new ProducerRecord<>(topic,    messageNo,    messageStr), new DemoCallBack(startTime, messageNo, messageStr))复制代码

至此,生产者就把消息成功发送出去了


作者:hsfxuebao
链接:https://juejin.cn/post/7019556999985627144

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐