阅读 87

Kafka源码分析10-Producer终于和broker建立连接了

从 Kafka源码分析9-若网络没建立好会发送消息吗 中得知,如果网络没有建立完成,是不会发送消息的。本文就讲解producer与broker是如何建立连接的,直接看sender 线程的步骤八:

{
    // 省略。。
    /**
     * 步骤八:
     * 真正执行网络操作的都是这个NetWorkClient这个组件
     * 包括:发送请求,接受响应(处理响应)
     */
    // 我们猜这儿可能就是去建立连接。
    this.client.poll(pollTimeout, now);
}复制代码

接下来 NetworkClient 的poll() 方法:

public List<ClientResponse> poll(long timeout, long now) {
    /**
     * 在这个方法里面有涉及到kafka的网络的方法,但是
     * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候
     * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据
     * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码
     * 的时候,其实代码就比较简单了。
     */
    //步骤一:封装了一个要拉取元数据请求
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {

        //步骤二: 发送请求,进行复杂的网络操作
        //但是我们目前还没有学习到kafka的网络
        //所以这儿大家就只需要知道这儿会发送网络请求。
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // 省略。。
}

public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    clear();

    if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
        timeout = 0;

    /* check ready keys */
    long startSelect = time.nanoseconds();
    //从Selector上找到有多少个key注册了
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();

    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    //因为我们用场景驱动的方式
    //我们刚刚确实是注册了一个key
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        //立马就要对这个Selector上面的key要进行处理。
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }

    //TODO 对stagedReceives里面的数据要进行处理
    addToCompletedReceives();

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    // we use the time at the end of select to ensure that we don't close any connections that
    // have just been processed in pollSelectionKeys
    maybeCloseOldestConnection(endSelect);
}复制代码

Java NIO的Selector.select -> 他会负责去看看,注册到他这里的多个Channel,谁有响应过来可以接收,或者谁现在可以执行一个请求的发送,如果Channel可以准备执行IO读写操作,此时就把那个Channel的SelectionKey返回

接下来就会对获取到的一堆SelectionKeys进行处理,到这一步为止,我们就可以看到基于NIO来开发的很多企业级的一些功能,一个是SocketChannel如何构建,二个是一个客户端如何连接多个服务器,三个如何通过轮询调用Selector.select

select一般在这种场景里可以设置对应的超时时间,然后就可以获取到SelectionKeys

我们着重看一下pollSelectionKeys()方法:

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        //根据key找到对应的KafkaChannel
        KafkaChannel channel = channel(key);

        // register all per-connection metrics at once
        sensors.maybeRegisterConnectionMetrics(channel.id());
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), currentTimeNanos);

        try {

            /* complete any connections that have finished their handshake (either normally or immediately) */
            /**
             * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是
             * SelectionKey key = socketChannel.register(nioSelector,
             * SelectionKey.OP_CONNECT);
             */
            if (isImmediatelyConnected || key.isConnectable()) {
                //TODO 核心的代码来了
                //去最后完成网络的连接
                //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你完成网络的连接。
                if (channel.finishConnect()) {
                    //网络连接已经完成了以后,就把这个channel存储到
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                            socketChannel.socket().getReceiveBufferSize(),
                            socketChannel.socket().getSendBufferSize(),
                            socketChannel.socket().getSoTimeout(),
                            channel.id());
                } else
                    continue;
            }
            // 省略。。
    }
}

private static class IdleExpiryManager {
    private final Map<String, Long> lruConnections;
    private final long connectionsMaxIdleNanos;
    private long nextIdleCloseCheckTime;
}复制代码

idleExpiryManager 包含lruConnections 因为一般来说一个客户端不能放太多的Socket连接资源,否则会导致这个客户端的复杂过重,所以他需要采用lru的方式来不断的淘汰掉最近最少使用的一些连接,很多连接最近没怎么发送消息。

  • 比如说有一个连接,最近一次使用是在1个小时之前了,还有一个连接,最近一次使用是在1分钟之前,此时如果要淘汰掉一个连接,你会选择谁?LRU算法,明显是淘汰掉那个1小时之前才使用的连接

如果发现SelectionKey当前处于的状态是可以建立连接,isConnectable方法是true,接着其实就是调用到KafkaChannel最底层的SocketChannel的finishConnect方法,等待这个连接必须执行完毕

同时接下来就不要关注OP_CONNECT事件了,对于这个Channel,接下来Selector就不要关注连接相关的事件了,也不是OP_READ读取事件,肯定selector要关注的是OP_WRITE事件,要针对这个连接写数据。

至此,Producer终于和broker建立连接了。


作者:hsfxuebao
链接:https://juejin.cn/post/7019503555312631845




文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐