Spring Boot Kafka Configuration for Consumer
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,5000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,2000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,5000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,2000);
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(100L);
factory.getContainerProperties().setIdleEventInterval(6000L);
factory.getContainerProperties().setErrorHandler(errorHandler);
return factory;
}
public KafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(100L);
factory.getContainerProperties().setIdleEventInterval(6000L);
factory.getContainerProperties().setErrorHandler(errorHandler);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}