kafka 消息丢失和重复消费
在使用mq中间件时,如果消费数据处理不当可能会引起消息丢失和重复消费的问题。本文针对kafka的这两个问题进行场景分析和可行性方案说明。
消息丢失
消息丢失一般指消息发送方producer和消息消费方consumer两方面。
producer
消息发送方可以通过配置request.required.acks属性来保证消息的安全发送,值包括:
0:表示不进行消息接收是否成功的确认
1:表示当Leader接收成功时确认
-1:表示Leader和Follower都接收成功时确认
设置为0,producer不进行消息发送的确认,kafka server可能由于一些原因并没有收到对应消息,从而引起消息丢失。
设置为1,producer在确认到 topic leader 已经接收到消息后,完成发送,此时有可能 follower 并没有接收到对应消息。此时如果 leader 突然宕机,在经过选举之后,没有接到消息的 follower 晋升为 leader,从而引起消息丢失。
设置为-1可能很好的确认kafka server是否已经完成消息的接收和本地化存储,并且可以在producer发送失败时进行重试。
consumer
消费方丢失的情况,是因为在消费过程中出现了异常,但是 对应消息的 offset 已经提交,那么消费异常的消息 将会丢失。 offset的提交包括手动提交和自动提交,可通过kafka.consumer.enable-auto-commit进行配置。 手动提交可以灵活的确认是否将本次消费数据的offset进行提交,可以很好的避免消息丢失的情况 自动提交是引起数据丢失的主要诱因。因为消息的消费并不会影响到offset的提交。 自动提交的触发事件:复制代码
通过 KafkaConsumer.assign()订阅分区
ConsumerCoordinator.poll()方法处理时(maybeAutoCommitOffsetsAsync方法)
.在消费者进行平衡操作前
ConsumerCoordinator 关闭操作
如在消息消费中出现异常,在下次拉取数据之前,会执行ConsumerCoordinator.poll()方法,从而将当前消息的offset进行提交,导致消息丢失。
为了尽可能的保证数据的完整性,尽量选用手动提交的方式。
重复消费
以手动提交offset举例,如果我们在消费中将消息数据处理入库,但是在执行offset提交时,kafka宕机或者网络原
因无法提交offset,当我们重启服务或者rebalance过程触发,consumer将再次消费此消息数据。
重复消费问题的处理主要集中在consumer的编码层面,需要我们在设计代码时以幂等性的角度进行开发设计,保证
同一数据无论多少次执行消费,所造成的结果都一样。处理方式可以在消息体中添加唯一标识,consumer进行确认
此唯一标识是否已经消费过,如果消费过,则不进行之后处理。从而尽可能的避免了重复消费。
作者:昂贵的背影
链接:https://juejin.cn/post/7025858315225661453