阅读 129

Kafka-Producer的实现细节

背景

遇到一些线上问题,发现对一些kakfa的配置细节不太了解,通过本文梳理一下认知,一些核心目标:

  • 梳理一下Kafka-Client的主要流程

  • 梳理一下Kafka-Client的配置,以及他们如何影响kafka的客户端发送

  • 梳理一下kafka-Client的metric指标有哪些,如何配置和获取这些指标,以及如何利用这些指标进行观测和排查

主要流程

作为一个Producer来说其实核心是梳理2个东西:Sender和RecordAccumulator

  • Sender: 是kafka发送流程的主要服务,负责接收数据并将其放置到RecordAccumulator,或者从RecordAccumulator中取出数据发送到Kafka的服务端,或者负责更新一些meta服务等情况。

  • RecordAccumulator:kafka的整个发送流程是异步的,主要目的是为了batch一些数据以增大吞吐,而RecordAccumulator则是主要负责进行对数据缓存进行管理的主要对象

作为Sender单次循环体内的核心的流程大如上图所示,我们可以按照图中的流程自顶向下拆解出各个步骤的细。上述流程在Sender#sendProducerData中

如何判断和获取可以发送的kafka节点

首先在RecordAccumulator内部,数据是以Map<TopicPartition, Deque>的形式缓存的:

TopicPartition是很显然指topic-partion

ProducerBatch则是需要同一批发送的Record请求,ProducerBatch本身不是线程安全的,实际操作时会以所在的Deque粒度进行上锁。在ProducerBatch内,实际的recrod以MemoryRecordsBuilder的形式维护,同时ProducerBatch也会为何很多其他数据,比如一些request的数据回调等等,如果后面我们可以继续聊,现阶段还是先回归主流程的分析

final long createdMs;
final TopicPartition topicPartition;
final ProduceRequestResult produceFuture;

private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);

int recordCount;
int maxRecordSize;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private boolean retry;
private boolean reopened;复制代码

判断这哪些数据是ready的核心代码在kafka的RecordAccumulator的ready的部分:

首先需要服务端满足一定的条件:

  • 需要被发送的partion的leader是已知的,如果包含未知的leader则需要访问kafka的服务端查询元数据,不过这部分内容会block整体流程,因此实际上会做成异步的

  • 当前待发送的partion并且没有被muted,也就是没有被设置为阻塞状态。

  • 当前partion不处于backoff状态,这里主要指当前的partion有正在触发重试的状态。

其次则是当前partion的batch的需要满足一定条件

  • 当前batch的距离上一次发送过去的时间的等待时间> 允许等待的时延(如果是首次尝试则使用lingerMs,如果是重试逻辑则使用retryBackoffMs)

  • 当前双端队列是否存在已满的batch,比如队列中的原始的数量大于1,或者仅有一个元素但是size满足发送条件

  • 当前Producer已经处于close状态

  • 总体内存已满:我们已经知道Producer的数据是需要缓存一段时间的,Producer内部有一个控制内存的内存池即BufferPool,如果内存不够用了则会排队申请,如果这对队列不为空则说明总内存不够了

  • 存在正在刷新的线程:这里稍微难理解一点,等我比较确定了再补充。

  • 事务完成,(高版本kakfa支持的事务模型,暂不赘述)

如何获取待发送的Batch数据

主要逻辑概括的说:

遍历RecordAccumulator中的ConcurrentMap<TopicPartition, Deque>,针对每个TopicPartition尝试获取不高于maxRequestSize的batch列表,将这些Batch关闭并放入待发送列表中。

但是在实现中还是有一些逻辑需要注意。

我们都知道基本的kafka的broker和kafka的topic-partion的概念,不同的partion可能分配到同一个broker上。在kafka的实现中,每次drain的过程只会从当前的node节点中调出一个partion进行发送消息。

为了避免每次投递的时候都从0开始投递从而导致序列化较大的partion会饥饿,客户端虚拟出了一个drainIndex,在每次drain的过程中会递增,实际的其实节点从start开始。

int start = drainIndex = drainIndex % parts.size();复制代码

不过这里有一点我没太看懂,为什么drainInde是全局的,如果是我做可能就做nodeId维度的了,不太清楚这里考虑的点是什么?如果是全局的drainIndex,其实还存在如果单个Node的partion太多远远多余其他的Node从而导致饥饿?

另一个有意思的问题是当有一些比较极端的case,比如单个Batch里面只有一个message,但是这个message的size已经大于request size的限制了,这时候就会尝试将这条消息单独作为一个batch发送,为了实现在这一点,kafka的client只在待发送列表不为空时检测当前待发送+nextbatch的size之和是否大于request size

上述代在:RecordAccumulator#drainBatchesForOneNode中

如何检测过期数据

检测其实分成2个部分,一部分是RecordAccumulator中的buffer的数据ConcurrentMap<TopicPartition, Deque>中的过期数据,一部分是Sender中的待发送的数据Map<TopicPartition, List> inFlightBatches

比较的时间是deliveryTimeoutMs,和当前时间-创建时间的差值。

对于失效的数据会调用失败的callBack。

数据发送和打包

完成了上述数据过滤的数据会打包成ProduceRequest交给client进行发送。

配置和限制条件

梳理完上述的条件之后我们来一起看下有哪些配置控制了上述的一些流程:

  • batch.size:这里指的是每个双端队列的ProducerBatch的size大小

  • buffer.memory:这里指的是RecordAccumulator的Buffer的size大小

  • max.request.size:这里指的是在drain的过程中发给每个Node的size大小,如果是单个messge大于这个值是会跳过检测的,但是会影响打包的方式。

  • linger.ms:在非重试的场景下数据从ProducerBatch开始创建到drain到待发送区域之间,在buffer中驻留的时间

  • retry.backoff.ms:基本同上,区别在于是在重试场景下允许驻留在buffer和待发送区域的最大时间,这个配置实际上是为了避免一些极端的场景,比如在重试的场景下,可能是由于服务端有问题,如果我们不增加client在内存驻留的时间,则可能在非常短的时间内把重试次数耗尽。

  • delivery.timeout.ms:这个配置指的的是从数据从add到kafka的客户端到client开始处理发送流程的总耗时,包括驻留在buffer中的时间和待发送列表中的时间。

  • request.timeout.ms:这部分时间实际上指的是client开始发送request到收到response之间的时间。

比如我所遇到的线上问题 :e.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

从代码来看是卡在了申请内存的阶段,实际上就是buffer的size不够了。对照了一下Producer的配置发现了batch.size设置过大导致了下游的topicpartion的数量batch.size之后远超过buffer.memory,也就是buffer最多只能放部分partion的数据,进而导致整个Producer的生产流程阻塞。

其他

在观测过程中还看到了一些有意思的点,后续可以研究一下

  1. client内部其实提供了一些metric打点来观测这些client端的一些指标,通过配置metric.reporters,并且实现MetricsReporter接口可以将这部分数据暴露出来

  1. 针对ProducerBatch其实还有些其他有意思的地方,比如ProducerBatch本身其实带有一定的状态信息,比如当client被添加一条数据的时候如何保证不影响 整个send过程的数据流转,这里面有些线程安全的问题需要解决,另外就是内存数据和client的物理数据之间的转换,以及压缩的实现方式和时间节点之间的关系等待。


作者:脱缰哒哈士奇
链接:https://juejin.cn/post/7056359399421378596


文章分类
百科问答
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐