Spring Boot Application with Event Registry

Hi All,

I am using Spring Boot 2.2.6.RELEASE with flowable version 6.5.0. My main objective for using this version is to use Kafka as we have an event driven architecture.

This is the process that I am using to understand how I can work with event registry.

  <?xml version="1.0" encoding="UTF-8"?>
<!--
  ~ Copyright (c) 2020 nexiles GmbH.  All rights reserved.
  -->

<definitions
        xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
        xmlns:flowable="http://flowable.org/bpmn"
        targetNamespace="Examples">

    <process id="eventTest" name="Process to test events">
        <startEvent id="theStart" >
            <extensionElements>
                <flowable:eventType xmlns:flowable="http://flowable.org/bpmn">carparkEnter</flowable:eventType>
                <flowable:eventOutParameter xmlns:flowable="http://flowable.org/bpmn"
                                            source="vehicleNumber"
                                            sourceType="string"
                                            target="vehicleNumber"/>
                <flowable:eventOutParameter xmlns:flowable="http://flowable.org/bpmn"
                                            source="carpark"
                                            sourceType="string"
                                            target="carpark"/>
            </extensionElements>
        </startEvent>
        <sequenceFlow id="flow1" sourceRef="theStart" targetRef="theTask" />
        <serviceTask id="theTask" name="event test task" flowable:delegateExpression="${testTask}" flowable:async="true"
        />
        <sequenceFlow id="flow2" sourceRef="theTask" targetRef="theEnd" />
        <endEvent id="theEnd" />
    </process>

</definitions>

The xml has been placed under resources/processes.

This is my .event file

{
  "key": "carparkEnter",
  "name": "Carpark Enter",
  "inboundChannelKeys": [
    "carparkenterChannel"
  ],
  "correlationParameters": [],
  "payload": [
    {
      "name": "vehicleNumber",
      "type": "string"
    },
    {
      "name": "carpark",
      "type": "string"
    }
  ]
}

And this is my .channel file

{
  "key": "carparkenterChannel",
  "name": "Carpark Enter channel",
  "description": "Carpark Enter Channel",
  "channelType": "inbound",
  "type": "kafka",
  "deserializerType": "json",
  "channelEventKeyDetection": {
    "fixedValue": "carparkEnterEvent"
  },
  "topics": ["LPRS_CARPARK_ENTER_JSON"]
}

The event and channel file are placed under resources/eventregistry folder

My application.properties file has the following entries

Enable and configure Kafka

flowable.task.app.kafka-enabled=true
spring.kafka.consumer.group-id=marvel-heimdall-carpark-enter
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.bootstrap-servers=${BOOTSTRAP_SERVER}

When I start my spring boot application and produce a json in the topic, I dont see my process getting started. Do I need to write any other configuration?

I have the following dependency in pom.xml

<dependency>
            <groupId>org.flowable</groupId>
            <artifactId>flowable-spring-boot-starter-process</artifactId>
            <version>${flowable.version}</version>
        </dependency>

Please help and thanks in advance!

Hi All,
I have crossed this initial hurdle. I forgot to add the kafka dependency

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

So now I can see the application is detecting event registry changes.

But now I have a different ptoblem.

When the application starts up I can see the below error

Failed to start bean 'eventEngineConfiguration'; nested exception is java.lang.AbstractMethodError: org.flowable.eventregistry.spring.kafka.SimpleKafkaListenerEndpoint.getTopicPartitions()

And later I can see this

Caused by: java.lang.IllegalStateException: EntityManagerFactory is closed

And when an event comes I see this exception

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed

Hey @sukalpo,

Have you looked at the flowable-kafka example?

When you are creating your own application then you need to add the appropriate dependencies to activate Kafka.

The exception you are seeing seems like you have some JPA dependency on the classpath, but it is not properly configured. Can you please share your entire pom?

For the AbstractMethodError can you please share the entire stacktrace. Can you verify which spring-kafka version you have on your classpath? Is it 2.3.x or is it newer?

Cheers,
Filip

Hi @filiphr,

Thanks for the response. My spring boot starter dependency was 2.1.1. After upgrading it to 2.2.6 Release now the Entitymanager exception is solved.

Now I am getting the following when i push message in kafka:-

2020-04-29 08:32:54 [org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#carparkenterChannel-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = LPRS_CARPARK_ENTER_JSON, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1588149174831, serialized key size = -1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"vehicleNumber":"123","carpark":"3A"})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.flowable.common.engine.api.FlowableObjectNotFoundException: no event definitions deployed with key 'carparkEnterEvent'

I am using this in the channel file

"channelEventKeyDetection": {
    "fixedValue": "carparkEnterEvent"
  },

The problem is that the event you are deploying has the following:

  "key": "carparkEnter",

and your channel has carparkEnterEvent align those and it should work.

And indeed it works like a charm. Thanks @filiphr

Hi @filiphr , I am trying the same as above but not in spring framework. I am trying to implement same in Micronaut and its not not working. I do not get any error but the control is not coming to my delegates. Although , I am able to do the same with spring boot. Below are the configurations I have added:

 implementation 'io.micronaut.kafka:micronaut-kafka:3.3.3'
    implementation "org.flowable:flowable-engine:6.6.0"

My Bean:

@Bean
    @Singleton
    public ProcessEngine processEngine() {

        ProcessEngineConfiguration cfg = ProcessEngineConfiguration.createStandaloneProcessEngineConfiguration()
                .setJdbcUrl("dbUrl")
                .setJdbcUsername("username")
                .setJdbcPassword("password")
                .setJdbcDriver("driver class name")
                .setAsyncExecutorActivate(true)                .setDatabaseSchemaUpdate(AbstractEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);

        List<VariableType> customTypes = new ArrayList<>();
        ((StandaloneProcessEngineConfiguration) cfg).setCustomPreVariableTypes(customTypes);

        ProcessEngine processEngine = cfg.buildProcessEngine();

         processEngine.getRepositoryService().createDeployment()
                    .addClasspathResource("test.bpmn20.xml")
                    .addClasspathResource("eventregistry/event-idvCompleteEvent.event")
                    .addClasspathResource("eventregistry/channel-idvCompleteChannel.channel")
                    .deploy();

        return processEngine;
    }

My delegate:

public class MyDelegate implements JavaDelegate {

    @Override
    public void execute(DelegateExecution execution) {

        String field = execution.getVariable("fieldNamePassingFromKafkaTopicMessage", String.class);
    }
}

Could you please suggest me what’s wrong in my implementation?

Thanks
Harish