聊聊在springboot项目中如何配置多个kafka消费者

 

前言

不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置

正文

1.通过 @ConfigurationProperties指定KafkaProperties前缀;

@Primary @ConfigurationProperties(prefix = "lybgeek.kafka.one") @Bean public KafkaProperties oneKafkaProperties(){ return new KafkaProperties(); }

如果有多个就配置多个,形如:

@ConfigurationProperties(prefix = "lybgeek.kafka.two") @Bean public KafkaProperties twoKafkaProperties(){ return new KafkaProperties(); } @ConfigurationProperties(prefix = "lybgeek.kafka.three") @Bean public KafkaProperties threeKafkaProperties(){ return new KafkaProperties(); }

2.配置消费者工厂,消费者工厂绑定对应的KafkaProperties;

@Bean public ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){ return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties()); }

3.配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置;

@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO) public KafkaListenerContainerFactory twoKafkaListenerContainerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties, @Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(twoConsumerFactory); factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : twoKafkaProperties.getListener().getConcurrency()); factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode()); return factory; }

完整的配置示例如下:

@Configuration@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)public class OneKafkaComsumeAutoConfiguration { @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE) public KafkaListenerContainerFactory oneKafkaListenerContainerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties, @Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(oneConsumerFactory); factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : oneKafkaProperties.getListener().getConcurrency()); factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode()); return factory; } @Primary @Bean public ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){ return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties()); } @Primary @ConfigurationProperties(prefix = "lybgeek.kafka.one") @Bean public KafkaProperties oneKafkaProperties(){ return new KafkaProperties(); }}折叠

那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错。

@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })public class KafkaAutoConfiguration {private final KafkaProperties properties;private final RecordMessageConverter messageConverter;public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider

THE END
Copyright © 2024 亿华云