Springboot集成Kafka进行批量消费及踩坑点
本文主要介绍了Springboot集成Kafka进行批量消费及踩坑点,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
目录
引入依赖
创建配置类
Kafka 消费者
引入依赖
1 2 3 4 5 | < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >1.3.11.RELEASE</ version > </ dependency > |
因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。
注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:
1 | java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor |
创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | /** * kafka 配置类 */ @Configuration @EnableKafka public class KafkaConsumerConfig { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig. class ); @Value ( "${kafka.bootstrap.servers}" ) private String kafkaBootstrapServers; @Value ( "${kafka.group.id}" ) private String kafkaGroupId; @Value ( "${kafka.topic}" ) private String kafkaTopic; public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf" ; public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks" ; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发量,小于或者等于 Topic 的分区数 factory.setConcurrency( 5 ); // 设置为批量监听 factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout( 30000 ); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); //设置接入点,请通过控制台获取对应Topic的接入点。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //设置SSL根证书的路径,请记得将XXX修改为自己的路径。 //与SASL路径类似,该文件也不能被打包到jar中。 System.setProperty( "java.security.auth.login.config" , CONFIG_PATH); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根证书存储的密码,保持不变。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient" ); //接入协议,目前支持使用SASL_SSL协议接入。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL" ); //SASL鉴权方式,保持不变。 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN" ); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //两次Poll之间的最大允许间隔。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000 ); //设置单次拉取的量,走公网访问时,该参数会有较大影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000 ); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000 ); //每次Poll的最大数量。 //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30 ); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "" ); return props; } } |
注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。
Kafka 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | /** * kafka 消息消费类 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener. class ); @KafkaListener (topics = { "${kafka.topic}" }) public void listen(List<ConsumerRecord<String, String>> recordList) { for (ConsumerRecord<String,String> record : recordList) { // 打印消息的分区以及偏移量 LOGGER.info( "Kafka Consume partition:{}, offset:{}" , record.partition(), record.offset()); String value = record.value(); System.out.println( "value = " + value); // 处理业务逻辑 ... } } } |
因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。
到此这篇关于Springboot集成Kafka进行批量消费及踩坑点的文章就介绍到这了
原文链接:https://blog.csdn.net/AlphaBr/article/details/121914316