[Event Registry] Cannot start process from Kafka event

Hi everyone,

I’m trying to make a simple demo using spring boot + event registry + kafka, whenever a event comming the flowable engine will trigger create a process

This is my startEvent element

 <startEvent id="requestVideoConferenceEventTask"> <extensionElements>
    <flowable:eventType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceEvent]]></flowable:eventType>
    <flowable:eventName xmlns:flowable="http://flowable.org/bpmn"><![CDATA[Request Video Conference Event]]></flowable:eventName>
    <flowable:eventOutParameter xmlns:flowable="http://flowable.org/bpmn" source="correlationId" sourceType="string" target="correlationId"></flowable:eventOutParameter>
    <flowable:eventCorrelationParameter xmlns:flowable="http://flowable.org/bpmn" name="correlationId" type="string" value="${correlationId}"></flowable:eventCorrelationParameter>
    <flowable:channelKey xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceChannel]]></flowable:channelKey>
    <flowable:channelName xmlns:flowable="http://flowable.org/bpmn"><![CDATA[Reques Video Conference Channel]]></flowable:channelName>
    <flowable:channelType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[kafka]]></flowable:channelType>
    <flowable:channelDestination xmlns:flowable="http://flowable.org/bpmn"><![CDATA[REQUEST_VIDEO_CONFERENCE_TOPIC]]></flowable:channelDestination>
    <flowable:keyDetectionType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[fixedValue]]></flowable:keyDetectionType>
    <flowable:keyDetectionValue xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceEvent]]></flowable:keyDetectionValue>
  </extensionElements>
</startEvent>

In debugger, when I push a message to topic then the message event will go to KafkaChannelMessageListenerAdapter.java, but I don’t see any process is created

Event file:

{
“key”: “requestVideoConferenceEvent”,
“name”: “Request Video Conference Event”,
“inboundChannelKeys”: [
“requestVideoConferenceChannel”
],
“correlationParameters”: [
{
“name”: “correlationId”,
“type”: “string”
}
],
“payload”: [
{
“name”: “correlationId”,
“type”: “string”
}
]
}

Channel file:

{
“key”: “requestVideoConferenceChannel”,
“category”: “channel”,
“name”: “Request Video Conference Channel”,
“channelType”: “inbound”,
“type”: “kafka”,
“topics”: [“REQUEST_VIDEO_CONFERENCE_TOPIC”],
“deserializerType”: “json”,
“channelEventKeyDetection”: {
“jsonField”: “eventKey”
}
}

Any help for that

Thanks.

Finally, I found the issue, because I start event with a correlation parameter, In org.flowable.engine.impl.eventregistry.BpmnEventRegistryEventConsumer.eventReceived(EventInstance eventInstance), the engine will try to findEventSubscriptions(ScopeTypes.BPMN, eventInstance, correlationKeys) which stored at ACT_RU_EVENT_SUBSCR with correlationKey = CONFIGURATION_

Correct Config

 <startEvent id="requestVideoConferenceEventTask"><extensionElements>
    <flowable:eventType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceEvent]]></flowable:eventType>
    <flowable:eventName xmlns:flowable="http://flowable.org/bpmn"><![CDATA[Request Video Conference Event]]></flowable:eventName>
    <flowable:eventOutParameter xmlns:flowable="http://flowable.org/bpmn" source="correlationId" sourceType="string" target="correlationId"></flowable:eventOutParameter>
    <flowable:channelKey xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceChannel]]></flowable:channelKey>
    <flowable:channelName xmlns:flowable="http://flowable.org/bpmn"><![CDATA[Reques Video Conference Channel]]></flowable:channelName>
    <flowable:channelType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[kafka]]></flowable:channelType>
    <flowable:channelDestination xmlns:flowable="http://flowable.org/bpmn"><![CDATA[REQUEST_VIDEO_CONFERENCE_TOPIC]]></flowable:channelDestination>
    <flowable:keyDetectionType xmlns:flowable="http://flowable.org/bpmn"><![CDATA[fixedValue]]></flowable:keyDetectionType>
    <flowable:keyDetectionValue xmlns:flowable="http://flowable.org/bpmn"><![CDATA[requestVideoConferenceEvent]]></flowable:keyDetectionValue>
  </extensionElements>
</startEvent>

Event file:

{
“key”: “requestVideoConferenceEvent”,
“name”: “Request Video Conference Event”,
“inboundChannelKeys”: [
“requestVideoConferenceChannel”
],
“correlationParameters”: ,
“payload”: [
{
“name”: “correlationId”,
“type”: “string”
}
]
}

hi @tannth, i’m wondering why you had to define the KafkaChannelMessageListenerAdapter class. All the tutorials online don’t have this class.
I have defined only the process+event+channel definitions, isn’t this enough to make your process start when an event is produced on kafka? With only these 3 definitions, i’m not able to start my process so i’m wondering if i’m missing something.

Thank you,
Narcisa

Thanks for your response @nrus, KafkaChannelMessageListenerAdapter class is inside the engine, I don’t try to write it.

I can say, we only need defined process+event+channel then the engine will work fine (of couse if your config is correct)

Thanks.
TanNTH