阅读 83

Kafka源码分析11-Producer终于发送请求了

上篇文章Kafka源码分析10-Producer终于和broker建立连接了 Producer和broker建立网络连接了,本文分析Producer发送网络请求。看看Sender线程run方法,主要分析步骤五、七、八。

{
    /**
     * 步骤五:
     *
     * 我们有可能要发送的partition有很多个,
     * 很有可能有一些partition的leader partition是在同一台服务器上面。
     * p0:leader:0
     * p1:leader: 0
     * p2:leader: 1
     * p3:leader: 2
     *      假设我们集群只有3台服务器
     * 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。
     *
     * 按照broker进行分组,同一个broker的partition为同一组
     * 0:{p0,p1}
     * 1:{p2}
     * 2:{p3}
     */
    // 如果网络没有建立好,这的代码是不执行的
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                     result.readyNodes,
                                                                     this.maxRequestSize,
                                                                     now);
    // 保证分区有序
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<RecordBatch> batchList : batches.values()) {
            //如果batches 空的话,这而的代码也就不执行了。
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    /**
     * 步骤六:
     *  对超时的批次是如何处理的?
     */
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);

    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        pollTimeout = 0;
    }

    /**
     * 步骤七:
     *      创建发送消息的请求
     * 创建请求
     * 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面
     * ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁
     * 我们要知道,我们集群里面网络资源是非常珍贵的。
     * 会把发往同个broker上面partition的数据 组合成为一个请求。
     * 然后统一一次发送过去,这样子就减少了网络请求。
     */
    //如果网络连接没有建立好 batches其实是为空。
    //也就说其实这段代码也是不会执行。
    sendProduceRequests(batches, now);

    // if some partitions are already ready to be sent, the select time would be 0;
    // otherwise if some partition already has some data accumulated but not ready yet,
    // the select time will be the time difference between now and its linger expiry time;
    // otherwise the select time will be the time difference between now and the metadata expiry time;
    /**
     * 步骤八:
     * 真正执行网络操作的都是这个NetWorkClient这个组件
     * 包括:发送请求,接受响应(处理响应)
     */
    // 我们猜这儿可能就是去建立连接。
    this.client.poll(pollTimeout, now);
}复制代码

drain()

/**
 * 进行映射转换,将TopicPartition -> Deque<RecordBatch> 转换为 NodeId -> List<RecordBatch>
 */
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                             Set<Node> nodes,
                                             int maxSize,
                                             long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();
    // 转换后的结果
    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        int size = 0;
        // 获取Node上的分区集合
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        // 记录要发送的RecordBatch的集合
        List<RecordBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0 */
        int start = drainIndex = drainIndex % parts.size();
        do {
            // 获取分区详细信息
            PartitionInfo part = parts.get(drainIndex);
            // 创建TopicPartition
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // Only proceed if the partition has no in-flight batches.
            // 判断需要发送到的分区是否不可用
            if (!muted.contains(tp)) {
                // 获取对应的RecordBatch队列
                Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        // 查看deque队列的队首RecordBatch
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            // 检查是否需要退避
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            // Only drain the batch if it is not during backoff period.
                            if (!backoff) {
                                if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    // there is a rare case that a single batch size is larger than the request size due
                                    // to compression; in this case we will still eventually send this batch in a single
                                    // request
                                    // 数据量已满,结束循环
                                    break;
                                } else {
                                    // 从deque中获取第一个RecordBatch
                                    RecordBatch batch = deque.pollFirst();
                                    // 关闭Compressor及底层输出流,并将MemoryRecords设置为只读
                                    batch.close();
                                    size += batch.sizeInBytes();
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            // 更新drainIndex
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        // 记录NodeId与RecordBatch的对应关系
        batches.put(node.id(), ready);
    }
    return batches;
}复制代码

该方法主要是进行映射转换,将TopicPartition -> Deque 转换为 NodeId -> List。

sendProduceRequests()

private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
    for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
    for (RecordBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        produceRecordsByPartition.put(tp, batch.records());
        recordsByPartition.put(tp, batch);
    }

    ProduceRequest.Builder requestBuilder =
            new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            // 回调函数被调用了,其实就是这个方法被执行了
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    // 封装请求  参数包含一个回调函数
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);

    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
public class ProduceRequest extends AbstractRequest {
  
    public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
        private final short acks;
        private final int timeout;
        private final Map<TopicPartition, MemoryRecords> partitionRecords;

        public Builder(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
            super(ApiKeys.PRODUCE);
            this.acks = acks;
            this.timeout = timeout;
            this.partitionRecords = partitionRecords;
        }
    }
}复制代码

封装ClientRequest请求参数,他需要包含对应的请求头,api key,api version,acks,request timeout,接着才是请求体,里面就是包含了对应的多个batch的数据,最后的最后,一定是把刚才说的那些东西都给打成一个二进制的字节数组

ClientRequest里面就是封装了按照二进制协议的格式,放入了组装好的数据,发送到broker上去的有很多个Topic,每个Topic有很多Partition,每个Partitioin是对应就一个batch的数据发送过去

client.send(clientRequest, now)

public void send(ClientRequest request, long now) {
    //TODO 看上去就是关键的代码。
    doSend(request, false, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
    String nodeId = clientRequest.destination();
    //省略。。
    Send send = request.toSend(nodeId, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            header,
            clientRequest.createdTimeMs(),
            clientRequest.destination(),
            clientRequest.callback(),
            clientRequest.expectResponse(),
            isInternalRequest,
            send,
            now);

    //这儿往inFlightRequests 组件里存 Request请求。
    //存储的就是还没有收到响应的请求。
    //这个里面默认最多能存5个请求。如果保证分区有序性,只能设置为1
    //其实我们可以猜想一个事,如果我们的请求发送出去了
    //然后也成功的接受到了响应,后面就会到这儿把这个请求移除。
    this.inFlightRequests.add(inFlightRequest);
    // TODO
    selector.send(inFlightRequest.send);
}复制代码

把请求添加到inFlightRequests 中 ,存储的就是还没有收到响应的请求。 这个里面默认最多能存5个请求。如果保证分区有序性,只能设置为1 。其实我们可以猜想一个事,如果我们的请求发送出去了。  然后也成功的接受到了响应,后面就会到这儿把这个请求移除。

selector.send(inFlightRequest.send)

public void send(Send send) {
    String connectionId = send.destination();
    if (closingChannels.containsKey(connectionId))
        this.failedSends.add(connectionId);
    else {
        //获取到一个KafakChannel
        KafkaChannel channel = channelOrFail(connectionId, false);
        try {
            /**
             * 将send对象缓存到KafkaChannel的send字段中,同时添加OP_WRITE事件的关注
             * send对象实际类型是RequestSend对象,其中封装了具体的请求数据,包括请求头和请求体
             * 这里只是将RequestSend对象用KafkaChannel的send字段记录下来
             * 具体的发送会在Selector.poll()方法中进行
             * KafkaChannel每次只会发送一个RequestSend对象
             */
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(connectionId);
            close(channel, false);
        }
    }
}
public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
    //往KafkaChannel里面绑定一个发送出去的请求。
    this.send = send;
    //关键的代码来了
    //这儿绑定了一个OP_WRITE事件。
    //一旦绑定了这个事件以后,我们就可以往服务端发送请求了。
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}复制代码

也就是在setSend方法中注册了SelectionKey.OP_WRITE事件,然后在poll()方法中把消息发出去,接下来继续看pollSelectionKeys

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        //根据key找到对应的KafkaChannel
        KafkaChannel channel = channel(key);

        // register all per-connection metrics at once
        sensors.maybeRegisterConnectionMetrics(channel.id());
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), currentTimeNanos);

        try {

           // 省略。。
            /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
            //核心代码,处理发送请求的事件
            //selector 注册了一个OP_WRITE
            if (channel.ready() && key.isWritable()) {
                //获取到我们要发送的那个网络请求。
                //是这句代码就是要往服务端发送数据了。

                // todo 服务端
                // 里面我们发现如果消息被发送出去了,然后移除OP_WRITE
                Send send = channel.write();
                // 已经完成响应消息的发送
                if (send != null) {
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }

            /* cancel any defunct sockets */
            if (!key.isValid())
                close(channel, true);

        } catch (Exception e) {
            String desc = channel.socketDescription();
            if (e instanceof IOException)
                log.debug("Connection with {} disconnected", desc, e);
            else
                log.warn("Unexpected error from {}; closing connection", desc, e);
            close(channel, true);
        }
    }
}复制代码

接下来channel.write()

public Send write() throws IOException {
    Send result = null;
    //send方法就是发送网络请求的方法
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}
private boolean send(Send send) throws IOException {
    //最终执行发送请求的代码是在这儿
    send.writeTo(transportLayer);
    //如果已经完成网络请求的发送。
    if (send.completed())
        //然后就移除OP_WRITE
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

    return send.completed();
}复制代码

在上面把请求发送到服务端并取消了SelectionKey.OP_WRITE的监听。


作者:hsfxuebao
链接:https://juejin.cn/post/7019513198772486152


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐