Comments (10)
Hi, thanks for opening the interesting issue!
Kafka Listener never load the Kafka Consumer Config and never trigger the @kafkaListener
because @EnableKafka
and @KafkaListener
are registered differently in Spring
, processed by KafkaListenerAnnotationBeanPostProcessor:
Bean post-processor that registers methods annotated with {@link KafkaListener}
to be invoked by a Kafka message listener container created under the covers
by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
according to the parameters of the annotation.
Annotated methods can use flexible arguments as defined by {@link KafkaListener}.
This post-processor is automatically registered by Spring's {@link EnableKafka} annotation.
...
this repository contains an example in which controllers
created dynamically
, in UserDynamicControllerRegister. registerUserController()
method which annotated with @PostConstruct
, while the ApplicationContext has already been created, and this works because Spring
allows to register registerMapping
when ApplicationContext is already created, using RequestMappingHandlerMapping
to register dynamic config
and@KafkaListener
you can use BeanFactoryPostProcessor to register dynamic config
and@KafkaListener
before KafkaListenerAnnotationBeanPostProcessor
invocation, simple example:
MyBeanFactoryPostProcessor
implementation with dynamic config
and@KafkaListener
:
package com.example;
import com.example.MyPayload;
import java.lang.reflect.Modifier;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.MemberAttributeExtension;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.Argument;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
import static net.bytebuddy.matcher.ElementMatchers.named;
@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
try {
// registers dynamicKafkaListenerConfig bean
Class<?> dynamicKafkaListenerConfig = generateDynamicKafkaListenerConfig();
AbstractBeanDefinition dynamicKafkaListenerConfigBeanDefinition = BeanDefinitionBuilder
.rootBeanDefinition(dynamicKafkaListenerConfig)
.getBeanDefinition();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
"dynamicKafkaListenerConfig", dynamicKafkaListenerConfigBeanDefinition);
// registers dynamicKafkaListener bean
Class<?> dynamicKafkaListener = generateDynamicKafkaListener();
AbstractBeanDefinition dynamicKafkaListenerBeanDefinition = BeanDefinitionBuilder
.rootBeanDefinition(dynamicKafkaListener)
.getBeanDefinition();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
"dynamicKafkaListener", dynamicKafkaListenerBeanDefinition);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// generates dynamicKafkaListenerConfig class
private Class<?> generateDynamicKafkaListenerConfig() throws Exception {
return new ByteBuddy()
.subclass(Object.class)
.name("DynamicKafkaListenerConfig")
.annotateType(AnnotationDescription.Builder.ofType(Configuration.class)
.build(), AnnotationDescription.Builder.ofType(EnableKafka.class)
.build())
.make()
.load(getClass().getClassLoader())
.getLoaded();
}
// generates dynamicKafkaListener class
private Class<?> generateDynamicKafkaListener() throws Exception {
return new ByteBuddy()
.subclass(Object.class)
.name("DynamicKafkaListener")
.annotateType(AnnotationDescription.Builder.ofType(Component.class)
.build())
.defineMethod("consume", void.class, Modifier.PUBLIC)
.withParameter(MyPayload.class, "myPayload")
.annotateParameter(AnnotationDescription.Builder.ofType(Payload.class)
.build())
.intercept(MethodDelegation.to(DynamicKafkaConsumerIntercept.class))
.visit(new MemberAttributeExtension.ForMethod()
.annotateMethod(AnnotationDescription.Builder.ofType(KafkaListener.class)
.defineArray("topics", new String[]{"my-topic"})
.define("groupId", "my-groupId")
.build())
.on(named("consume")))
.make()
.load(getClass().getClassLoader())
.getLoaded();
}
// simple interceptor for DynamicKafkaConsumer
public static class DynamicKafkaConsumerIntercept {
public static void consume(@Argument(0) MyPayload myPayload) {
System.out.println("Received myPayload: " + myPayload);
}
}
}
generated DynamicKafkaListenerConfig.class
result:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration("")
@EnableKafka
public class DynamicKafkaListenerConfig {
public DynamicKafkaListenerConfig() {
}
}
generated DynamicKafkaListener.class
result:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import com.example.MyPayload;
import io.tpd.kafkaexample.MyBeanFactoryAware.DynamicKafkaConsumerIntercept;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component("")
public class DynamicKafkaListener {
@KafkaListener(
concurrency = "",
topics = {"my-topic"},
topicPartitions = {},
errorHandler = "",
autoStartup = "",
topicPattern = "",
id = "",
groupId = "my-groupId",
beanRef = "__listener",
clientIdPrefix = "",
containerGroup = "",
containerFactory = "",
idIsGroup = true
)
public void consume(@Payload(required = true,expression = "",value = "") MyPayload myPayload) {
DynamicKafkaConsumerIntercept.consume(var1);
}
public DynamicKafkaListener() {
}
}
in this case, dynamic config
and@KafkaListener
will work as it will be processed by KafkaListenerAnnotationBeanPostProcessor
does this solution solve your issue?
from poc-spring-boot-dynamic-controller.
MyPayload what does this class contains?
from poc-spring-boot-dynamic-controller.
MyPayload
is simple POJO, based on PracticalAdvice, tested with @Payload PracticalAdvice payload - I removed all listeners and generated the first one with the MyBeanFactoryPostProcessor
.
P.S. I've updated the previous answer - added @Configuration
to the DynamicKafkaListenerConfig.class
from poc-spring-boot-dynamic-controller.
Kindly share MyPayload pojo class if possible, This is great work for me now.
Thank you
from poc-spring-boot-dynamic-controller.
MyPayload
class:
import com.fasterxml.jackson.annotation.JsonProperty;
public class MyPayload {
private final String message;
private final int identifier;
public MyPayload(@JsonProperty("message") final String message,
@JsonProperty("identifier") final int identifier) {
this.message = message;
this.identifier = identifier;
}
public String getMessage() {
return message;
}
public int getIdentifier() {
return identifier;
}
@Override
public String toString() {
return "MyPayload{" +
"message='" + message + '\'' +
", identifier=" + identifier +
'}';
}
}
you are welcome
from poc-spring-boot-dynamic-controller.
This will not work our message of Type org.springframework.messagsing.Message @ payload Message message
message.getHeaders()
message.getPayload()
from poc-spring-boot-dynamic-controller.
I generated @KafkaListener
method with the following parameter and annotation:
public void consume(@Payload MyPayload myPayload){
...
}
in your case you have to generate method with Message parameter without @Payload
annotation:
public void consume(Message<?> message){
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
...
}
since payload is already in the Message
, this should work
from poc-spring-boot-dynamic-controller.
does it work in your case?
from poc-spring-boot-dynamic-controller.
How to deal with Database call FROM BeanFactoryPostProcessor . I want to read metadata from database and create many consumer base on desfine in database
from poc-spring-boot-dynamic-controller.
if you are using SQL
database then you can just use JdbcTemplate, and configure manually (postgresql
example with org.postgresql:postgresql:42.2.16
dependency):
DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
dataSourceBuilder.driverClassName("org.postgresql.Driver");
dataSourceBuilder.url("jdbc:postgresql://127.0.0.1:5432/myDb");
dataSourceBuilder.username("myUser");
dataSourceBuilder.password("MyPassword");
DataSource dataSource = dataSourceBuilder.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from my_table");
_... do something for your case..._
to get property values for DataSourceBuilder
you can use Environment
:
Environment environment = beanFactory.getBean(Environment.class);
String url = environment.getProperty("db-url");
String username = environment.getProperty("db-username");
String password = environment.getProperty("db-password");
...
from poc-spring-boot-dynamic-controller.
Related Issues (1)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from poc-spring-boot-dynamic-controller.