Giter VIP home page Giter VIP logo

Comments (10)

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

Hi, thanks for opening the interesting issue!

Kafka Listener never load the Kafka Consumer Config and never trigger the @kafkaListener

because @EnableKafka and @KafkaListener are registered differently in Spring, processed by KafkaListenerAnnotationBeanPostProcessor:

Bean post-processor that registers methods annotated with {@link KafkaListener}
to be invoked by a Kafka message listener container created under the covers
by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
according to the parameters of the annotation.
Annotated methods can use flexible arguments as defined by {@link KafkaListener}.
This post-processor is automatically registered by Spring's {@link EnableKafka} annotation.
...

this repository contains an example in which controllers created dynamically, in UserDynamicControllerRegister. registerUserController() method which annotated with @PostConstruct, while the ApplicationContext has already been created, and this works because Spring allows to register registerMapping when ApplicationContext is already created, using RequestMappingHandlerMapping

to register dynamic config and@KafkaListener you can use BeanFactoryPostProcessor to register dynamic config and@KafkaListener before KafkaListenerAnnotationBeanPostProcessor invocation, simple example:

MyBeanFactoryPostProcessor implementation with dynamic config and@KafkaListener:

package com.example;

import com.example.MyPayload;
import java.lang.reflect.Modifier;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.MemberAttributeExtension;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.Argument;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;

import static net.bytebuddy.matcher.ElementMatchers.named;

@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        try {
            // registers dynamicKafkaListenerConfig bean
            Class<?> dynamicKafkaListenerConfig = generateDynamicKafkaListenerConfig();
            AbstractBeanDefinition dynamicKafkaListenerConfigBeanDefinition = BeanDefinitionBuilder
                .rootBeanDefinition(dynamicKafkaListenerConfig)
                .getBeanDefinition();
            ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
                "dynamicKafkaListenerConfig", dynamicKafkaListenerConfigBeanDefinition);

            // registers dynamicKafkaListener bean
            Class<?> dynamicKafkaListener = generateDynamicKafkaListener();
            AbstractBeanDefinition dynamicKafkaListenerBeanDefinition = BeanDefinitionBuilder
                .rootBeanDefinition(dynamicKafkaListener)
                .getBeanDefinition();

            ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
                "dynamicKafkaListener", dynamicKafkaListenerBeanDefinition);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // generates dynamicKafkaListenerConfig class
    private Class<?> generateDynamicKafkaListenerConfig() throws Exception {
        return new ByteBuddy()

            .subclass(Object.class)
            .name("DynamicKafkaListenerConfig")
            .annotateType(AnnotationDescription.Builder.ofType(Configuration.class)
                .build(), AnnotationDescription.Builder.ofType(EnableKafka.class)
                .build())

            .make()
            .load(getClass().getClassLoader())
            .getLoaded();
    }

    // generates dynamicKafkaListener class
    private Class<?> generateDynamicKafkaListener() throws Exception {
        return new ByteBuddy()

            .subclass(Object.class)
            .name("DynamicKafkaListener")
            .annotateType(AnnotationDescription.Builder.ofType(Component.class)
                .build())

            .defineMethod("consume", void.class, Modifier.PUBLIC)
            .withParameter(MyPayload.class, "myPayload")
            .annotateParameter(AnnotationDescription.Builder.ofType(Payload.class)
                .build())
            .intercept(MethodDelegation.to(DynamicKafkaConsumerIntercept.class))

            .visit(new MemberAttributeExtension.ForMethod()
                .annotateMethod(AnnotationDescription.Builder.ofType(KafkaListener.class)
                    .defineArray("topics", new String[]{"my-topic"})
                    .define("groupId", "my-groupId")
                    .build())
                .on(named("consume")))

            .make()
            .load(getClass().getClassLoader())
            .getLoaded();
    }

    // simple interceptor for DynamicKafkaConsumer
    public static class DynamicKafkaConsumerIntercept {

        public static void consume(@Argument(0) MyPayload myPayload) {
            System.out.println("Received myPayload: " + myPayload);
        }

    }

}

generated DynamicKafkaListenerConfig.class result:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

@Configuration("")
@EnableKafka
public class DynamicKafkaListenerConfig {
    public DynamicKafkaListenerConfig() {
    }
}

generated DynamicKafkaListener.class result:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

import com.example.MyPayload;
import io.tpd.kafkaexample.MyBeanFactoryAware.DynamicKafkaConsumerIntercept;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component("")
public class DynamicKafkaListener {
    @KafkaListener(
        concurrency = "",
        topics = {"my-topic"},
        topicPartitions = {},
        errorHandler = "",
        autoStartup = "",
        topicPattern = "",
        id = "",
        groupId = "my-groupId",
        beanRef = "__listener",
        clientIdPrefix = "",
        containerGroup = "",
        containerFactory = "",
        idIsGroup = true
    )
    public void consume(@Payload(required = true,expression = "",value = "") MyPayload myPayload) {
        DynamicKafkaConsumerIntercept.consume(var1);
    }

    public DynamicKafkaListener() {
    }
}

in this case, dynamic config and@KafkaListener will work as it will be processed by KafkaListenerAnnotationBeanPostProcessor

does this solution solve your issue?

from poc-spring-boot-dynamic-controller.

sunil31925 avatar sunil31925 commented on June 24, 2024

MyPayload what does this class contains?

from poc-spring-boot-dynamic-controller.

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

MyPayload is simple POJO, based on PracticalAdvice, tested with @Payload PracticalAdvice payload - I removed all listeners and generated the first one with the MyBeanFactoryPostProcessor.

P.S. I've updated the previous answer - added @Configuration to the DynamicKafkaListenerConfig.class

from poc-spring-boot-dynamic-controller.

sunil31925 avatar sunil31925 commented on June 24, 2024

Kindly share MyPayload pojo class if possible, This is great work for me now.

Thank you

from poc-spring-boot-dynamic-controller.

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

MyPayload class:

import com.fasterxml.jackson.annotation.JsonProperty;

public class MyPayload {

    private final String message;
    private final int identifier;

    public MyPayload(@JsonProperty("message") final String message,
                     @JsonProperty("identifier") final int identifier) {
        this.message = message;
        this.identifier = identifier;
    }

    public String getMessage() {
        return message;
    }

    public int getIdentifier() {
        return identifier;
    }

    @Override
    public String toString() {
        return "MyPayload{" +
            "message='" + message + '\'' +
            ", identifier=" + identifier +
            '}';
    }

}

you are welcome

from poc-spring-boot-dynamic-controller.

sunil31925 avatar sunil31925 commented on June 24, 2024

This will not work our message of Type org.springframework.messagsing.Message @ payload Message message
message.getHeaders()
message.getPayload()

from poc-spring-boot-dynamic-controller.

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

I generated @KafkaListener method with the following parameter and annotation:

  public void consume(@Payload MyPayload myPayload){
      ...
  }

in your case you have to generate method with Message parameter without @Payload annotation:

  public void consume(Message<?> message){
     Object payload = message.getPayload();
     MessageHeaders headers = message.getHeaders();
     ...
  }

since payload is already in the Message, this should work

from poc-spring-boot-dynamic-controller.

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

does it work in your case?

from poc-spring-boot-dynamic-controller.

sunil31925 avatar sunil31925 commented on June 24, 2024

How to deal with Database call FROM BeanFactoryPostProcessor . I want to read metadata from database and create many consumer base on desfine in database

from poc-spring-boot-dynamic-controller.

tsarenkotxt avatar tsarenkotxt commented on June 24, 2024

if you are using SQL database then you can just use JdbcTemplate, and configure manually (postgresql example with org.postgresql:postgresql:42.2.16 dependency):

        DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
        dataSourceBuilder.driverClassName("org.postgresql.Driver");
        dataSourceBuilder.url("jdbc:postgresql://127.0.0.1:5432/myDb");
        dataSourceBuilder.username("myUser");
        dataSourceBuilder.password("MyPassword");
        DataSource dataSource = dataSourceBuilder.build();

        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from my_table");
        _... do something for your case..._

to get property values for DataSourceBuilder you can use Environment:

       Environment environment = beanFactory.getBean(Environment.class);
       String url = environment.getProperty("db-url");
       String username = environment.getProperty("db-username");
       String password = environment.getProperty("db-password");
...

from poc-spring-boot-dynamic-controller.

Related Issues (1)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.