Event Registry with My Own Kafka Config

Hello all,

I want to use event registry feature of flowable only with workflow related logic and I want to use my own kafka config with my other business logic. That’s why I already have kafka config in my microservice. That causes this error

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaChannelDefinitionProcessor in org.flowable.spring.boot.eventregistry.EventRegistryAutoConfiguration$EventRegistryKafkaConfiguration required a bean of type 'org.springframework.kafka.core.KafkaOperations' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.kafka.core.KafkaOperations' in your configuration.


Process finished with exit code 1

Is there a better way to do this than just implementing KafkaOperations bean in my config class?

   @Bean
    public KafkaOperations kafkaOperations(){
        return new KafkaOperations() {
            @Override
            public ListenableFuture<SendResult> sendDefault(Object o) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> sendDefault(Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> sendDefault(Integer integer, Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> sendDefault(Integer integer, Long aLong, Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(String s, Object o) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(String s, Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(String s, Integer integer, Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(String s, Integer integer, Long aLong, Object o, Object o2) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(ProducerRecord producerRecord) {
                return null;
            }

            @Override
            public ListenableFuture<SendResult> send(Message message) {
                return null;
            }

            @Override
            public List<PartitionInfo> partitionsFor(String s) {
                return null;
            }

            @Override
            public Map<MetricName, ? extends Metric> metrics() {
                return null;
            }

            @Override
            public Object execute(ProducerCallback producerCallback) {
                return null;
            }

            @Override
            public Object executeInTransaction(OperationsCallback operationsCallback) {
                return null;
            }

            @Override
            public void flush() {

            }

            @Override
            public void sendOffsetsToTransaction(Map map) {

            }

            @Override
            public void sendOffsetsToTransaction(Map map, String s) {

            }

            @Override
            public boolean isTransactional() {
                return false;
            }

            @Override
            public ConsumerRecord receive(String s, int i, long l, Duration duration) {
                return null;
            }

            @Override
            public ConsumerRecords receive(Collection collection, Duration duration) {
                return null;
            }
        };
    }

I don’t want to override all these methods which I don’t know much about. I would appreciate your help

Umut

Hey @umutkazan,

The Flowable Kafka integration is implemented using Spring Kafka. The KafkaOperations bean is a bean that is configured by Spring Boot when Spring Kafka is available.

It is strange that you are getting that error, because the org.flowable.spring.boot.eventregistry.EventRegistryAutoConfiguration$EventRegistryKafkaConfiguration class is only activated when there is a KafkaOperations bean available. Therefore, it is strange that you are even getting the error.

You can run your application with the debug=true enable and share the logs with us.

Cheers,
Filip

How did you solved this error? @umutkazan , @filiphr ??
Could you please share any input, link or configuration used to fix this issue

debug and make sure that your existing Kafka configuration creates a proper KafkaTemplate (a KafkaOperations bean) so that Flowable can use it

Hi we have the same issue. The kafka template is available the autoconfigure accept the kafkaTemplate that I have:
EventRegistryAutoConfiguration.EventRegistryKafkaConfiguration matched:

  • @ConditionalOnBean (types: org.springframework.kafka.core.KafkaOperations; SearchStrategy: all) found bean ‘kafkaTemplate’ (OnBeanCondition)

Then when the beans are trying to create and then that breaks. We don’t want to use kafka with flowable. We want to disable kafka for flowable. Can someone help me with that?

After doing some tests with the generics I have found a solution for my problem. I have to create a kafkaTemplate<Object, Object> in stead off kafkaTemplate<String, Event>. The kafkaTemplate<String, Event> does not matched the KafkaOperations<Object, Object>. I have this problem in spring boot 3.4.6 and flowable 7.1.0. We had this problem in other projects for the kafka consumer side and we changed it from specific generics to Object, Object.