Kafka源码分析11-Producer终于发送请求了
上篇文章Kafka源码分析10-Producer终于和broker建立连接了 Producer和broker建立网络连接了,本文分析Producer发送网络请求。看看Sender线程run方法,主要分析步骤五、七、八。
{ /** * 步骤五: * * 我们有可能要发送的partition有很多个, * 很有可能有一些partition的leader partition是在同一台服务器上面。 * p0:leader:0 * p1:leader: 0 * p2:leader: 1 * p3:leader: 2 * 假设我们集群只有3台服务器 * 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。 * * 按照broker进行分组,同一个broker的partition为同一组 * 0:{p0,p1} * 1:{p2} * 2:{p3} */ // 如果网络没有建立好,这的代码是不执行的 Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // 保证分区有序 if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { //如果batches 空的话,这而的代码也就不执行了。 for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** * 步骤六: * 对超时的批次是如何处理的? */ List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; } /** * 步骤七: * 创建发送消息的请求 * 创建请求 * 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面 * ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁 * 我们要知道,我们集群里面网络资源是非常珍贵的。 * 会把发往同个broker上面partition的数据 组合成为一个请求。 * 然后统一一次发送过去,这样子就减少了网络请求。 */ //如果网络连接没有建立好 batches其实是为空。 //也就说其实这段代码也是不会执行。 sendProduceRequests(batches, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; /** * 步骤八: * 真正执行网络操作的都是这个NetWorkClient这个组件 * 包括:发送请求,接受响应(处理响应) */ // 我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now); }复制代码
drain()
/** * 进行映射转换,将TopicPartition -> Deque<RecordBatch> 转换为 NodeId -> List<RecordBatch> */ public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); // 转换后的结果 Map<Integer, List<RecordBatch>> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; // 获取Node上的分区集合 List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // 记录要发送的RecordBatch的集合 List<RecordBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); do { // 获取分区详细信息 PartitionInfo part = parts.get(drainIndex); // 创建TopicPartition TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. // 判断需要发送到的分区是否不可用 if (!muted.contains(tp)) { // 获取对应的RecordBatch队列 Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { // 查看deque队列的队首RecordBatch RecordBatch first = deque.peekFirst(); if (first != null) { // 检查是否需要退避 boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request // 数据量已满,结束循环 break; } else { // 从deque中获取第一个RecordBatch RecordBatch batch = deque.pollFirst(); // 关闭Compressor及底层输出流,并将MemoryRecords设置为只读 batch.close(); size += batch.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } // 更新drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); // 记录NodeId与RecordBatch的对应关系 batches.put(node.id(), ready); } return batches; }复制代码
该方法主要是进行映射转换,将TopicPartition -> Deque 转换为 NodeId -> List。
sendProduceRequests()
private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()); } private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records()); recordsByPartition.put(tp, batch); } ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { // 回调函数被调用了,其实就是这个方法被执行了 handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); // 封装请求 参数包含一个回调函数 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } public class ProduceRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ProduceRequest> { private final short acks; private final int timeout; private final Map<TopicPartition, MemoryRecords> partitionRecords; public Builder(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) { super(ApiKeys.PRODUCE); this.acks = acks; this.timeout = timeout; this.partitionRecords = partitionRecords; } } }复制代码
封装ClientRequest请求参数,他需要包含对应的请求头,api key,api version,acks,request timeout,接着才是请求体,里面就是包含了对应的多个batch的数据,最后的最后,一定是把刚才说的那些东西都给打成一个二进制的字节数组
ClientRequest里面就是封装了按照二进制协议的格式,放入了组装好的数据,发送到broker上去的有很多个Topic,每个Topic有很多Partition,每个Partitioin是对应就一个batch的数据发送过去
client.send(clientRequest, now)
public void send(ClientRequest request, long now) { //TODO 看上去就是关键的代码。 doSend(request, false, now); } private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { String nodeId = clientRequest.destination(); //省略。。 Send send = request.toSend(nodeId, header); InFlightRequest inFlightRequest = new InFlightRequest( header, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, send, now); //这儿往inFlightRequests 组件里存 Request请求。 //存储的就是还没有收到响应的请求。 //这个里面默认最多能存5个请求。如果保证分区有序性,只能设置为1 //其实我们可以猜想一个事,如果我们的请求发送出去了 //然后也成功的接受到了响应,后面就会到这儿把这个请求移除。 this.inFlightRequests.add(inFlightRequest); // TODO selector.send(inFlightRequest.send); }复制代码
把请求添加到inFlightRequests 中 ,存储的就是还没有收到响应的请求。 这个里面默认最多能存5个请求。如果保证分区有序性,只能设置为1 。其实我们可以猜想一个事,如果我们的请求发送出去了。 然后也成功的接受到了响应,后面就会到这儿把这个请求移除。
selector.send(inFlightRequest.send)
public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { //获取到一个KafakChannel KafkaChannel channel = channelOrFail(connectionId, false); try { /** * 将send对象缓存到KafkaChannel的send字段中,同时添加OP_WRITE事件的关注 * send对象实际类型是RequestSend对象,其中封装了具体的请求数据,包括请求头和请求体 * 这里只是将RequestSend对象用KafkaChannel的send字段记录下来 * 具体的发送会在Selector.poll()方法中进行 * KafkaChannel每次只会发送一个RequestSend对象 */ channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } } public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); //往KafkaChannel里面绑定一个发送出去的请求。 this.send = send; //关键的代码来了 //这儿绑定了一个OP_WRITE事件。 //一旦绑定了这个事件以后,我们就可以往服务端发送请求了。 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }复制代码
也就是在setSend方法中注册了SelectionKey.OP_WRITE事件,然后在poll()方法中把消息发出去,接下来继续看pollSelectionKeys :
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { // 省略。。 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 // todo 服务端 // 里面我们发现如果消息被发送出去了,然后移除OP_WRITE Send send = channel.write(); // 已经完成响应消息的发送 if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) close(channel, true); } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel, true); } } }复制代码
接下来channel.write()
public Send write() throws IOException { Send result = null; //send方法就是发送网络请求的方法 if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { //最终执行发送请求的代码是在这儿 send.writeTo(transportLayer); //如果已经完成网络请求的发送。 if (send.completed()) //然后就移除OP_WRITE transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }复制代码
在上面把请求发送到服务端并取消了SelectionKey.OP_WRITE的监听。
作者:hsfxuebao
链接:https://juejin.cn/post/7019513198772486152