阅读 259

Springboot集成Kafka进行批量消费及踩坑点

本文主要介绍了Springboot集成Kafka进行批量消费及踩坑点,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

目录
  • 引入依赖

  • 创建配置类

  • Kafka 消费者

引入依赖

1
2
3
4
5
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.11.RELEASE</version>
</dependency>

因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

在这里插入图片描述

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:

1
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor


创建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
 * kafka 配置类
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);
 
    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;
 
    @Value("${kafka.group.id}")
    private String kafkaGroupId;
 
    @Value("${kafka.topic}")
    private String kafkaTopic;
 
    public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";
 
    public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";
  
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 设置并发量,小于或者等于 Topic 的分区数
        factory.setConcurrency(5);
        // 设置为批量监听
        factory.setBatchListener(Boolean.TRUE);
        factory.getContainerProperties().setPollTimeout(30000);
        return factory;
    }
 
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        //设置接入点,请通过控制台获取对应Topic的接入点。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        //设置SSL根证书的路径,请记得将XXX修改为自己的路径。
        //与SASL路径类似,该文件也不能被打包到jar中。
        System.setProperty("java.security.auth.login.config", CONFIG_PATH);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);
 
        //根证书存储的密码,保持不变。
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        //接入协议,目前支持使用SASL_SSL协议接入。
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        //SASL鉴权方式,保持不变。
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // 自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //设置单次拉取的量,走公网访问时,该参数会有较大影响。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        //每次Poll的最大数量。
        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        //Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        return props;
    }
}

注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。

Kafka 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * kafka 消息消费类
 */
@Component
public class KafkaMessageListener {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
 
    @KafkaListener(topics = {"${kafka.topic}"})
    public void listen(List<ConsumerRecord<String, String>> recordList) {
        for (ConsumerRecord<String,String> record : recordList) {
            // 打印消息的分区以及偏移量
            LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
            String value = record.value();
            System.out.println("value = " + value);
            // 处理业务逻辑 ...
        }
    }
}

因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。

到此这篇关于Springboot集成Kafka进行批量消费及踩坑点的文章就介绍到这了

    原文链接:https://blog.csdn.net/AlphaBr/article/details/121914316

    文章分类
    百科问答
    版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
    相关推荐