Hi colleagues,
First of all, I would like to say BIG thanks to the all flowable team and all members. You are doing a great job.
In our project, we are using flowable 6.6.0 as choreographic of microservices with kafka, micronaut, etc. We figured out one random problem when we tried to handle a lot of BPs. To reproduce this issue I created a test BP:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:flowable="http://flowable.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.flowable.org/processdef">
<process id="TestAsyncExecutor" name="TestAsyncExecutor" isExecutable="true">
<documentation>TestAsyncExecutor</documentation>
<startEvent id="startEvent1" flowable:formFieldValidation="true"></startEvent>
<sequenceFlow id="sid-64697C8E-D145-4F95-8E18-5C3B076AA573" sourceRef="startEvent1" targetRef="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55"></sequenceFlow>
<sequenceFlow id="sid-8E184F50-1F3A-449A-BA63-CADE80684E1B" sourceRef="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" targetRef="sid-D6859BEC-779B-4031-B996-7021409F3FF3"></sequenceFlow>
<endEvent id="sid-D6859BEC-779B-4031-B996-7021409F3FF3"></endEvent>
<serviceTask id="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" name="MyServiceTask" flowable:async="true" flowable:exclusive="false" flowable:delegateExpression="${myFutureJavaDelegateBean}"></serviceTask>
</process>
<bpmndi:BPMNDiagram id="BPMNDiagram_TestAsyncExecutor">
<bpmndi:BPMNPlane bpmnElement="TestAsyncExecutor" id="BPMNPlane_TestAsyncExecutor">
<bpmndi:BPMNShape bpmnElement="startEvent1" id="BPMNShape_startEvent1">
<omgdc:Bounds height="30.0" width="30.0" x="270.0" y="163.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="sid-D6859BEC-779B-4031-B996-7021409F3FF3" id="BPMNShape_sid-D6859BEC-779B-4031-B996-7021409F3FF3">
<omgdc:Bounds height="28.0" width="28.0" x="555.0" y="164.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" id="BPMNShape_sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55">
<omgdc:Bounds height="80.0" width="100.0" x="390.0" y="138.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge bpmnElement="sid-8E184F50-1F3A-449A-BA63-CADE80684E1B" id="BPMNEdge_sid-8E184F50-1F3A-449A-BA63-CADE80684E1B">
<omgdi:waypoint x="489.95000000000005" y="178.0"></omgdi:waypoint>
<omgdi:waypoint x="555.0" y="178.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="sid-64697C8E-D145-4F95-8E18-5C3B076AA573" id="BPMNEdge_sid-64697C8E-D145-4F95-8E18-5C3B076AA573">
<omgdi:waypoint x="299.9499992392744" y="178.0"></omgdi:waypoint>
<omgdi:waypoint x="389.9999999999684" y="178.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>
It is very small, just one service delegate task. Delegate task: simple FutureJavaDelegate with code:
class MyFutureJavaDelegateBean : FlowableFutureJavaDelegate<String, String> {
override fun execute(inputData: String): String {
println("execute: $inputData")
return inputData
}
override fun afterExecution(execution: DelegateExecution, executionData: String) {
println("afterExecution: $executionData.id")
}
override fun prepareExecutionData(execution: DelegateExecution): String {
println("prepareExecutionData: $execution.id")
return execution.id
}
}
If we set the asyncExecutorCorePoolSize and asyncExecutorMaxPoolSize to 1 (yes, only one), than this BP is never ended. As DefaultAsyncTaskExecutor in submit methods uses the same thread pool (executor service). In this case, one worker is working on the async task, but the FutureDelegate cannot be finished, as there are no free workers.
@Override
public CompletableFuture<?> submit(Runnable task) {
return CompletableFuture.runAsync(task, **executorService**);
}
@Override
public <T> CompletableFuture<T> submit(Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception exception) {
sneakyThrow(exception);
return null;
}
}, **executorService**);
}
As a workaround we overrided DefaultAsyncTaskExecutor to call CompletableFuture.runAsync and CompletableFuture.supplyAsync without executorService
override fun submit(task: Runnable): CompletableFuture<*> {
return CompletableFuture.runAsync(task)
}
@SuppressWarnings("TooGenericExceptionCaught")
override fun <T : Any?> submit(task: Callable<T>): CompletableFuture<T> {
return CompletableFuture.supplyAsync {
try {
return@supplyAsync task.call()
} catch (exception: Exception) {
ExceptionUtil.sneakyThrow<RuntimeException>(exception)
return@supplyAsync null
}
}
}
P.S. I know, that I can set asyncExecutorMaxPoolSize as 64 or even 128. But with big loading it can happen, in our case, even 16 workers sometimes hung.