Flowable resume process instance upon receiving a Kafka event in a service task

I am new to Flowable. Basically, I am creating a simple process which performs the followings service tasks (async jobs):

START → fare calculation → wallet deduction → generate passes → close order call → END

In each of the delegate service, I need to perform some interaction with a 3rd party system through kafka messaging. Once we receive data, we validate it before passing to the next delegate.

For example, in the wallet deduction service we are producing kafka event to the 3rd party and 3rd party sends success/failure. If failure we retry 3 times before pushing to dead letter. If success go to next service - pass generation.

So, we have written some Synchronous Kafka reply templates in Spring Boot as follows:

class WalletPayment implements JavaDelegate {
public void execute(DelegateExecution execution) {


RequestReplyFuture<String, String, String> sendAndReceive = replyKafkaTemplate.sendAndReceive(record, Duration.ofSeconds(timeoutInSec));
SendResult<String, String> sendResult = sendAndReceive.getSendFuture().get(); // Need to make this async
if(validate(sendResult)){
execution.setStatus(“status”, “PAID”); // proceed pass generation task…
}else{
throw new RuntimeException(); // causes retry
}


}
}

This is simple and Kafka messaging is just synchronous.

Here we need to wait till we get a reply and also prone to suffer timeout. We want to make Kafka to interact async. It can increase no. of threads in the application at runtime.

Problem is I am not getting how to make kafka async here so that the process instance stops once producer produces data async way and process instance resumes back when the consumer receives the event. Can anyone help me what is the preferred approach here ?

Environment:
Spring Boot, kafka version 2, Flowable 6.5.0

Thanks in advance.

So I have broken down now fare calculation into 2 different tasks.
One is fare calculation service task, where I send an async kafka msg to 3rd party.

I have written down a consumer (async) that will now consume the 3rd party reply and I wish to complete it as a user task. But I am not understanding how I can store the Kafka consumer data in the process instance variable.

List tasks = taskService.createTaskQuery().processInstanceId(pid).list();
Task task = tasks.get(0);
task.setOwner(“CL_USER”);
taskService.claim(task.getId(), “CL_USER”);
taskService.complete(task.getId()); // I want to set kafka msg in the process instance here !!! :sob:

Is it something we can do ?
Thanks in Advance for any sort of help/solution.

Anyone ??? PLease :sob::sob::sob::sob:

Not sure I’m fully following your use case, but why not use the default send/receive for kafka as it’s in Flowable? That construct implements the use case you’re describing, no?

There is a method that accepts a map of variables that will be set on the process instance: void complete(String taskId, Map<String, Object> variables);