AsyncJobExecutor hangs with async delegate service task (FutureJavaDelegate)

Hi colleagues,

First of all, I would like to say BIG thanks to the all flowable team and all members. You are doing a great job.
In our project, we are using flowable 6.6.0 as choreographic of microservices with kafka, micronaut, etc. We figured out one random problem when we tried to handle a lot of BPs. To reproduce this issue I created a test BP:
image

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:flowable="http://flowable.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.flowable.org/processdef">
  <process id="TestAsyncExecutor" name="TestAsyncExecutor" isExecutable="true">
    <documentation>TestAsyncExecutor</documentation>
    <startEvent id="startEvent1" flowable:formFieldValidation="true"></startEvent>
    <sequenceFlow id="sid-64697C8E-D145-4F95-8E18-5C3B076AA573" sourceRef="startEvent1" targetRef="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55"></sequenceFlow>
    <sequenceFlow id="sid-8E184F50-1F3A-449A-BA63-CADE80684E1B" sourceRef="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" targetRef="sid-D6859BEC-779B-4031-B996-7021409F3FF3"></sequenceFlow>
    <endEvent id="sid-D6859BEC-779B-4031-B996-7021409F3FF3"></endEvent>
    <serviceTask id="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" name="MyServiceTask" flowable:async="true" flowable:exclusive="false" flowable:delegateExpression="${myFutureJavaDelegateBean}"></serviceTask>
  </process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_TestAsyncExecutor">
    <bpmndi:BPMNPlane bpmnElement="TestAsyncExecutor" id="BPMNPlane_TestAsyncExecutor">
      <bpmndi:BPMNShape bpmnElement="startEvent1" id="BPMNShape_startEvent1">
        <omgdc:Bounds height="30.0" width="30.0" x="270.0" y="163.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-D6859BEC-779B-4031-B996-7021409F3FF3" id="BPMNShape_sid-D6859BEC-779B-4031-B996-7021409F3FF3">
        <omgdc:Bounds height="28.0" width="28.0" x="555.0" y="164.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55" id="BPMNShape_sid-60A40660-E7C7-445C-8FAA-0883B3EC8A55">
        <omgdc:Bounds height="80.0" width="100.0" x="390.0" y="138.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge bpmnElement="sid-8E184F50-1F3A-449A-BA63-CADE80684E1B" id="BPMNEdge_sid-8E184F50-1F3A-449A-BA63-CADE80684E1B">
        <omgdi:waypoint x="489.95000000000005" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="555.0" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-64697C8E-D145-4F95-8E18-5C3B076AA573" id="BPMNEdge_sid-64697C8E-D145-4F95-8E18-5C3B076AA573">
        <omgdi:waypoint x="299.9499992392744" y="178.0"></omgdi:waypoint>
        <omgdi:waypoint x="389.9999999999684" y="178.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</definitions>

It is very small, just one service delegate task. Delegate task: simple FutureJavaDelegate with code:

class MyFutureJavaDelegateBean : FlowableFutureJavaDelegate<String, String> {
    override fun execute(inputData: String): String {
        println("execute: $inputData")
        return inputData
    }

    override fun afterExecution(execution: DelegateExecution, executionData: String) {
        println("afterExecution: $executionData.id")
    }

    override fun prepareExecutionData(execution: DelegateExecution): String {
        println("prepareExecutionData: $execution.id")
        return execution.id
    }
}

If we set the asyncExecutorCorePoolSize and asyncExecutorMaxPoolSize to 1 (yes, only one), than this BP is never ended. As DefaultAsyncTaskExecutor in submit methods uses the same thread pool (executor service). In this case, one worker is working on the async task, but the FutureDelegate cannot be finished, as there are no free workers.

@Override
    public CompletableFuture<?> submit(Runnable task) {
        return CompletableFuture.runAsync(task, **executorService**);
    }

    @Override
    public <T> CompletableFuture<T> submit(Callable<T> task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            } catch (Exception exception) {
                sneakyThrow(exception);
                return null;
            }
        }, **executorService**);
    }

As a workaround we overrided DefaultAsyncTaskExecutor to call CompletableFuture.runAsync and CompletableFuture.supplyAsync without executorService

override fun submit(task: Runnable): CompletableFuture<*> {
     return CompletableFuture.runAsync(task)
}

@SuppressWarnings("TooGenericExceptionCaught")
override fun <T : Any?> submit(task: Callable<T>): CompletableFuture<T> {
    return CompletableFuture.supplyAsync {
        try {
            return@supplyAsync task.call()
        } catch (exception: Exception) {
            ExceptionUtil.sneakyThrow<RuntimeException>(exception)
            return@supplyAsync null
        }
    }
}

P.S. I know, that I can set asyncExecutorMaxPoolSize as 64 or even 128. But with big loading it can happen, in our case, even 16 workers sometimes hung.

Hey @KubakhovDmitry,

Indeed combining async tasks with future java delegates could lead to deadlocking yourself.

We are going to discuss internally if we can provide something out of the box.

However, in the meantime you can provide your own instance of the AsyncTaskInvoker to the ProcessEngineConfiguration. If you do that you can provide a different AsyncTaskExecutor that would be responsible for the Future Java Delegates.

Another alternative would be you to override:

    @Override
    default CompletableFuture<Output> execute(DelegateExecution execution, AsyncTaskInvoker taskInvoker) {
        Input inputData = prepareExecutionData(execution);
        return taskInvoker.submit(() -> execute(inputData));
    }

and provide your own way of creating the CompletableFuture.

Cheers,
Filip