阅读 129

Kafka源码分析17-如何处理超时的批次

上文Kafka源码分析16-消息有异常如何处理? 分析了消息异常了Producer如何处理,本文将分析Producer如何处理超时的批次?

继续看Sender 的poll()方法:

void run(long now) {

    /**
     * 步骤六:
     *  对超时的批次是如何处理的?
     */
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);
}

public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
    List<RecordBatch> expiredBatches = new ArrayList<>();
    int count = 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        //获取到每个分区的队列 -> 队列里面对应的批次
        Deque<RecordBatch> dq = entry.getValue();
        TopicPartition tp = entry.getKey();
        // We only check if the batch should be expired if the partition does not have a batch in flight.
        // This is to prevent later batches from being expired while an earlier batch is still in progress.
        // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
        // is only active in this case. Otherwise the expiration order is not guaranteed.
        if (!muted.contains(tp)) {
            synchronized (dq) {
                // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
                RecordBatch lastBatch = dq.peekLast();
                Iterator<RecordBatch> batchIterator = dq.iterator();
                //迭代的看每个分区里面的每个批次
                while (batchIterator.hasNext()) {
                    RecordBatch batch = batchIterator.next();
                    boolean isFull = batch != lastBatch || batch.isFull();
                    //判断一下是否超时
                    // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks
                    // are invoked after completing the iterations, since sends invoked from callbacks
                    // may append more batches to the deque being iterated. The batch is deallocated after
                    // callbacks are invoked.
                    if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                        //增加到超时的数据结构里面
                        expiredBatches.add(batch);
                        count++;
                        //从数据结构里面移除
                        batchIterator.remove();
                    } else {
                        // Stop at the first batch that has not expired.
                        break;
                    }
                }
            }
        }
    }
    if (!expiredBatches.isEmpty()) {
        log.trace("Expired {} batches in accumulator", count);
        for (RecordBatch batch : expiredBatches) {
            //调用done方法
            //方法里面传过去了一个TimeoutException的异常。(超时了)
            batch.expirationDone();
            // 释放资源
            deallocate(batch);
        }
    }

    return expiredBatches;
}复制代码

maybeExpire() 方法来判断什么样的批次才算超时?注释中写的已经很清楚了,不再赘述

public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {

    /**
     * requestTimeoutMs:代表的是请求发送的超时的时间。默认值是30.
     * now:当前时间
     * lastAppendTime:批次的创建的时间(上一次重试的时间)
     * now - this.lastAppendTime 大于30秒,说明批次超时了 还没发送出去。
     */
    if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
        // 记录异常信息
        expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
    /**
     * lingerMs: 默认0  我们 一般都会设置100ms,无论如何都要把消息发送出去的时间
     * createdMs:批次创建的时间
     * 已经大于30秒了。 说明也是超时了。
     */
    else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
        expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time";
    /**
     * 针对重试
     * lastAttemptMs: 上一次重试的时间(批次创建的时间)
     * retryBackoffMs: 重试的时间间隔
     * 说明也是超时了。
     */
    else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
        expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time";

    boolean expired = expiryErrorMessage != null;
    if (expired)
        close();
    return expired;
}复制代码

对超时的批次会放入expiredBatches中,对这些数据怎么处理呢?

if (!expiredBatches.isEmpty()) {
    log.trace("Expired {} batches in accumulator", count);
    for (RecordBatch batch : expiredBatches) {
        //调用done方法
        //方法里面传过去了一个TimeoutException的异常。(超时了)
        batch.expirationDone();
        // 释放资源
        deallocate(batch);
    }
}复制代码

首先会返回给Producer一个超时的异常:

void expirationDone() {
    if (expiryErrorMessage == null)
        throw new IllegalStateException("Batch has not expired");
    //调用done方法
    //方法里面传过去了一个TimeoutException的异常。(超时了
    this.done(-1L, Record.NO_TIMESTAMP,
              new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}复制代码

然后释放掉内存deallocate(batch)。


作者:hsfxuebao
链接:https://juejin.cn/post/7019565374039392270


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐