阅读 94

Kafka多生产者消费者自动配置

背景

项目中不同的业务可能会使用多个kafka,按默认的Kafka配置,最多是支持消费者和生产者使用不同的Kafka,如果两个生产者使用不同的Kafka则需要自定义配置,生成对应的bean。

解决方案

多生产者,多消费者,使用不同的前缀来区分,根据前缀来区分配置,加载配置,实例化对应前缀的KafkaProperties kafkaListenerContainerFactory KafkaTemplate ,每个bean的名称都是带前缀的,使用的时候,按照需要注入对应的bean。

YML配置

spring:   kafka:     product:       bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091       properties:         sasl:           mechanism: PLAIN           jaas:             config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx";         security:           protocol: SASL_PLAINTEXT       producer:         retries: 0         acks: -1         batch-size: 16384         linger-ms: 0         buffer-memory: 33554432       consumer:         group-id: consumer-group-id         enable-auto-commit: true         auto-commit-interval-ms: 1000         auto-offset-reset: latest         session-timeout-ms: 120000          request-timeout-ms: 180000     order:       bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100       properties:         sasl:           mechanism: PLAIN           jaas:             config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx";         security:           protocol: SASL_PLAINTEXT       producer:         retries: 3         acks: -1         batch-size: 16384         linger-ms: 0         buffer-memory: 33554432       consumer:         group-id: order-migration         enable-auto-commit: true         auto-commit-interval-ms: 1000         auto-offset-reset: latest         session-timeout-ms: 120000         request-timeout-ms: 180000 复制代码

自定义KafkaProperties

使用KafkaProperties接收配置,但是需要修改下前缀,但是KafkaProperties源码改不了,新写一个类继承KafkaProperties

@Component @Primary @ConfigurationProperties(prefix = "spring.kafka.order") public class OrderKafkaProperties extends KafkaProperties{ } 复制代码

如果没有Kafka默认配置,Kafka会自动实例化默认的KafkaProperties,如果有多个KafkaProperties实例,需要指定一个首选的bean,否则KafkaAnnotationDrivenConfiguration类中构造函数会报错。

所以在其中一个加上@Primary注解

KafkaTemplate和KafkaListenerContainerFactory配置

有了KafkaProperties就可以生成KafkaTemplateKafkaListenerContainerFactory实例

@Configuration public class KafkaConfig {     @Autowired     private OrderKafkaProperties orderKafkaProperties;     @Bean("orderKafkaTemplate")     public KafkaTemplate<String, String> kafkaTemplate() {         return new KafkaTemplate<>(producerFactory());     }     private ProducerFactory<String, String> producerFactory() {         return new DefaultKafkaProducerFactory<>(producerConfigs());     }     private Map<String, Object> producerConfigs() {         return contractKafkaProperties.buildProducerProperties();     }     @Bean("orderKafkaListenerContainerFactory")     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {         ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory());         factory.setConcurrency(10);         factory.getContainerProperties().setPollTimeout(3000);         return factory;     }     private ConsumerFactory<Integer, String> consumerFactory() {         return new DefaultKafkaConsumerFactory<>(consumerConfigs());     }     private Map<String, Object> consumerConfigs() {         return contractKafkaProperties.buildConsumerProperties();     } } 复制代码

这样就可以在其他地方直接使用了,生产者就直接@Autowired orderKafkaTemplate,如果是消费者,直接在@KafkaListenercontainerFactory属性指定orderKafkaListenerContainerFactory

如果有多个生产者消费者,就增加对应的配置即可。这样简化了配置的读取,除了加了前缀,其他的配置都是和Kafka默认配置一样的,复用Springboot的属性绑定,后续如果有其他配置,加上后能直接生效,无需修改代码。如果修改配置的结构需要代码中读取,然后手动设置,后期修改YML配置和代码都需要修改,比较麻烦。

方案演进

上述方案,如果需要新增一个Kafka的配置,需要新增一个前缀,然后新增对应配置代码,来生成KafkaPropertiesKafkaTemplateKafkaListenerContainerFactory实例,但是不同的前缀生成不同的实例代码都是重复的,而且所有的前缀、属性值都由YML配置可以得到,所以代码中生成带前缀的bean可以由代码自动生成,并注册到spring容器中。根据这个思路,写一个BeanFactoryAware的实现类。(Aware接口是框架提供给用户用户获取框架中一些对象的接口,比如BeanFactoryAware就是获取BeanFactory,框架会调用重写的setBeanFactory方法,将BeanFactory传给我们的实现类)

@Component @Slf4j public class EmallBeanFactoryAware implements BeanFactoryAware {     @Autowired     private Environment environment;     private static final String SPRING_KAFKA_PREFIX = "spring.kafka";     @Override     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {         if (beanFactory instanceof DefaultListableBeanFactory) {             DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;             Binder binder = Binder.get(environment);             //将YML中属性值映射到MAP中,后面根据配置前缀生成bean并注册到容器中,TODO 绑定可能有异常,加try catch稳一点             BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));             if (!bindResultWithPrefix.isBound()) {                 return;             }             Map map = bindResultWithPrefix.get();             Set set = map.keySet();             Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();             //如果配置多个primary, 只设置第一个,TODO项目启动过程中,这个变量是否有并发问题             boolean hasSetPrimary = false;             //实例化每个带前缀的KafkaProperties、KafkaTemplate、             for (Object object : set) {                 String prefix = object.toString();                 if (kafkaPropertyFiledNames.contains(prefix)) {                     //不带前缀的正常配置忽略                     continue;                 }                 String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;                 BindResult<KafkaProperties> kafkaPropertiesBindResult;                 try {                     kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));                     if (!kafkaPropertiesBindResult.isBound()) {                         continue;                     }                 } catch (Exception e) {                     //一些配置不是在KafkaProperties属性,但是也不是前缀配置,在这一步会绑定失败,比如spring.kafka.topics配置,                     //一些配置的名称是带-,KafkaProperties属性是驼峰,绑定是会出异常的,异常忽略                     log.error("auto register kafka properties error, prefix is: {}", configPrefix);                     continue;                 }                 //注册生产者(TODO 没配置生产者是否会报错)                 KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();                 String propertiesBeanName = prefix + "KafkaProperties";                 boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);                 if (!isBeanExist) {                     String primaryConfig = configPrefix + ".primary";                     //没有默认的kafka配置,需要设置下primary                     BindResult<Boolean> primaryBindResult = binder.bind(primaryConfig, Bindable.of(Boolean.class));                     if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {                         BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);                         defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());                         defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);                         defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);                         hasSetPrimary = true;                     } else {                         defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);                     }                 } //注册生产者KafkaTemplate                 String templateBeanName = prefix + "KafkaTemplate";                 if (!defaultListableBeanFactory.containsBean(templateBeanName)) {                     KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(                             new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));                     defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);                 }                 String beanName = prefix + "KafkaListenerContainerFactory";                 if (!defaultListableBeanFactory.containsBean(beanName)) {                     //注册消费者listener(TODO 没配置消费者是否会报错)                     ConcurrentKafkaListenerContainerFactory<Integer, String> factory =                             new ConcurrentKafkaListenerContainerFactory<>();                     factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));                     factory.setConcurrency(10);                     factory.getContainerProperties().setPollTimeout(3000);                     defaultListableBeanFactory.registerSingleton(beanName, factory);                 }             }         }     }     private static Set<String> getKafkaPropertyFiledNames () {         Set<String> names = new HashSet<>();         Field[] declaredFields = KafkaProperties.class.getDeclaredFields();         if (declaredFields.length == 0) {             return names;         }         for (Field declaredField : declaredFields) {             names.add(declaredField.getName());         }         return names;     } } 复制代码

遇到的问题

手动注册的bean代码中@Autowire无法注入

手动注册的无法@Autowire,直接加@Lazy注解,先忽略bean注册的先后顺序

多个KafkaProperties实例,无法确定使用哪一个

因为使用前缀的配置方式,bean名称也是带前缀的,没有默认的Kafka配置,框架会自动生成对应的bean,KafkaAnnotationDrivenConfiguration中的KafkaProperties 属性是根据类型注入的,如果配置有多个前缀,注入的时候无法确定使用哪一个,所以增加一个primary配置,自动生成的时候设置下。

既有带前缀,又有不带前缀使用默认配置的

自动配置代码中有一段是根据yml中配置的key,判断是否是KafkaProperties类中的字段,如果是就忽略,让框架自动按默认配置,有些字段yml中是带-,如bootstrap-serversKafkaProperties中是驼峰命名bootstrapServers,绑定的时候会抛异常,影响应用启动,这种异常可以忽略,直接用try catch捕获。

设置Bean为Primary

第二个问题中,多个相同类型的Bean如何设置其中一个bean为Primary,手动注册bean,如果有实例对象,可以直接使用BeanFactoryregisterSingleton(beanName, object),如果没有实例对象,可以直接使用类名,通过BeanFactoryregisterBeanDefinition(beanName, beanDefinition)来注册,如果要设置bean为Primary,必须通过BeanDefinition来设置,但是通过框架的绑定是直接生成实例对象的,如果通过registerSingleton来注册,通过beanName获取BeanDefinition是会抛异常的,因为没有BeanDefinition,所以需要将对象实例和BeanDefinition关联起来,就是上面这段代码

//注册BeanDefinition defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition()); //注册对象实例,使用相同的bean名称 defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties); //再获取BeanDefinition就能获取到,而且这个bean就是上面注册的实例对象 defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);


作者:AE86_Jay
链接:https://juejin.cn/post/7169910747810496543


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐