Kafka listener - how to implement the handler

I created a flow with the ui-modeler, created the “start event registry event”, created the eventDefinition and channelDefinition. Each time there is a message in Kafka, I got a notification in the execute() method of my service task. However I am not sure how this service task should look like? Should I implement JavaDelegate? In this case, how can I get the message payload? Only through getVariable() method (which is not very convenient)

Another solution is to use a method annotated @KafkaListener(topics = “flowable-pipeline-test”),

then I got the payload (ConsumerRecord), but I kind of bypass Flowable as it works without defining event and channel definition…

So what’s the best practive to implement such a serviceTask ?

Thanks for your help!

Not sure I’m following what you’re trying to achieve: if you have a start event registry event, that’s where the flow startes when the event is received. What follows afterwards, is up to you: it can be an automatic step (a service task) or a user task with form, …

The point of this is, is that for the person modeling the process, the fact that it’s coming over kafka doesn’t matter: an event is simply received and all that follows is business logic.

Thanks Joram.
Actually in your conference :slight_smile: Using Kafka: Anatomy of the Flowable event registry - Joram Barrez & Filip Hrisafov - YouTube), at 25:19, there is this ReviewChannelAdapter class, where you use @KafkaListener, so I was wondering if we use such annotation, why do we need to define the event and channel definition files since messages from Kafka are received thanks to the @kafkaListener annotation.
By the way, I don’t find this class and some others classes that you are showing in Github flowable-examples/flowable-kafka at master · flowable/flowable-examples · GitHub

Thanks again for your quick response.

There are different ways you can receive events. One is to directly use the Kafka inbound event handling from Flowable (which btw is the same as using @KafkaListener from Spring, since we are using Spring Kafka for this), and the other is to use your own @KafkaListener. Between the video from our conference and the final release we improved the handling a bit.

The ReviewChannelAdapter class from the video is obsolete and is replaced by a Json definition channel-reviewInboundChannel.channel.

If you want to use your own inbound channel, then you can use expression as the type and set adapterDelegateExpression to your own @KafkaListener. Your bean needs to then implement InboundEventChannelAdapter which will get the channel. Keep in mind that you shouldn’t reuse the same bean more than once, or implement a correct handling for the InboundEventChannelAdapter#setInboundChannelModel.

I would suggest reading and Introducing the Flowable Event Registry – Flowable Blog and Flowable Business Processing from Kafka Events – Flowable Blog, if you haven’t read them yet

1 Like

Thanks Filip, now all is clear :slight_smile:
Also just read your blog, this was very helpful and exactly what I’m looking for.
Thanks again.
Michel