Kafka源码分析17-如何处理超时的批次
上文Kafka源码分析16-消息有异常如何处理? 分析了消息异常了Producer如何处理,本文将分析Producer如何处理超时的批次?
继续看Sender 的poll()方法:
void run(long now) { /** * 步骤六: * 对超时的批次是如何处理的? */ List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); } public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) { List<RecordBatch> expiredBatches = new ArrayList<>(); int count = 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { //获取到每个分区的队列 -> 队列里面对应的批次 Deque<RecordBatch> dq = entry.getValue(); TopicPartition tp = entry.getKey(); // We only check if the batch should be expired if the partition does not have a batch in flight. // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. if (!muted.contains(tp)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); Iterator<RecordBatch> batchIterator = dq.iterator(); //迭代的看每个分区里面的每个批次 while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); boolean isFull = batch != lastBatch || batch.isFull(); //判断一下是否超时 // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks // are invoked after completing the iterations, since sends invoked from callbacks // may append more batches to the deque being iterated. The batch is deallocated after // callbacks are invoked. if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { //增加到超时的数据结构里面 expiredBatches.add(batch); count++; //从数据结构里面移除 batchIterator.remove(); } else { // Stop at the first batch that has not expired. break; } } } } } if (!expiredBatches.isEmpty()) { log.trace("Expired {} batches in accumulator", count); for (RecordBatch batch : expiredBatches) { //调用done方法 //方法里面传过去了一个TimeoutException的异常。(超时了) batch.expirationDone(); // 释放资源 deallocate(batch); } } return expiredBatches; }复制代码
maybeExpire() 方法来判断什么样的批次才算超时?注释中写的已经很清楚了,不再赘述
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { /** * requestTimeoutMs:代表的是请求发送的超时的时间。默认值是30. * now:当前时间 * lastAppendTime:批次的创建的时间(上一次重试的时间) * now - this.lastAppendTime 大于30秒,说明批次超时了 还没发送出去。 */ if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) // 记录异常信息 expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; /** * lingerMs: 默认0 我们 一般都会设置100ms,无论如何都要把消息发送出去的时间 * createdMs:批次创建的时间 * 已经大于30秒了。 说明也是超时了。 */ else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time"; /** * 针对重试 * lastAttemptMs: 上一次重试的时间(批次创建的时间) * retryBackoffMs: 重试的时间间隔 * 说明也是超时了。 */ else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time"; boolean expired = expiryErrorMessage != null; if (expired) close(); return expired; }复制代码
对超时的批次会放入expiredBatches中,对这些数据怎么处理呢?
if (!expiredBatches.isEmpty()) { log.trace("Expired {} batches in accumulator", count); for (RecordBatch batch : expiredBatches) { //调用done方法 //方法里面传过去了一个TimeoutException的异常。(超时了) batch.expirationDone(); // 释放资源 deallocate(batch); } }复制代码
首先会返回给Producer一个超时的异常:
void expirationDone() { if (expiryErrorMessage == null) throw new IllegalStateException("Batch has not expired"); //调用done方法 //方法里面传过去了一个TimeoutException的异常。(超时了 this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage)); }复制代码
然后释放掉内存deallocate(batch)。
作者:hsfxuebao
链接:https://juejin.cn/post/7019565374039392270