Different behaviour async executor in flowable5 compatibility mode

Hi,

My config is as follows (leaving out irrelevant info)
@Bean(destroyMethod = “destroy”)
public ProcessEngineFactoryBean processEngine(ManagedAsyncJobExecutor asyncJobExecutor) {
final SpringProcessEngineConfiguration spec = new SpringProcessEngineConfiguration();

    spec.setFlowable5CompatibilityEnabled(true);
    spec.setFlowable5CompatibilityHandlerFactory(springFlowable5CompatibilityHandlerFactory());

    spec.setAsyncExecutorActivate(configuration.isAsyncExecutorActivate());
    spec.setAsyncExecutor(asyncJobExecutor);

    ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
    factoryBean.setProcessEngineConfiguration(spec);
    return factoryBean;
}

@Bean
public ManagedAsyncJobExecutor asyncJobExecutor(ExecutorService executorService) {
    ManagedAsyncJobExecutor jobExecutor = new ManagedAsyncJobExecutor();
    jobExecutor.setExecutorService(executorService);
    jobExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(1000);
    jobExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(1000);


    return jobExecutor;
}

@Bean
public ExecutorService executorService() {
    return Executors.newCachedThreadPool();
}

The behaviour of a flowable5 process seems to be different though compared to when the process is executed by the flowable6 engine. This happens when executing a JavaDelegate that is both set to async and exclusive. The job is locked for 60 seconds in compatibility mode before executing, while this does not happen in flowable 6 engine that executes after the wait time.

While we do not have any cases where we actively use the exclusive flag, this does not matter much, also the delegate is executed after the timeout, but I wanted to report this here and also ask if this is the expected behaviour?

The behavior should be the same, as the v5 engine simply hands it off to the v6 engine, so the 60 seconds is odd (is it always exactly 60 seconds? Or does it vary?). Is there anything specific about your process definition? Can you post the relevant definition snippet?

Hi Joram,

The only thing specific is that the JavaDelegate is set to async and exclusive. I tried replicating the behaviour with a unit test, but I failed to reproduce it. I kept the implementation of the AsyncExecutor the same (DefaultAsyncJobExecutor).

One thing that seems to differ is the following: on my running server, it seems that the async job is placed in the job table, and the lockTime is set in ExecutionEntityManager#updateProcessInstanceLockTime(String processInstanceId). This happens in both scenario’s.

But after that things differ: on my server, the job is picked up in AcquireAsyncJobsDueRunnable and finally the asyncExecutor will execute it. In contract, my unit test seems to continue always immediately when passing ExecuteAsyncRunnable#isHandledByV5Engine().

Screenshot from 2017-09-07 10-40-29

It seems to me that the behaviour of async is vastly different compared to the unit test, which makes me think that this is just a configuration error on my part. However, I have yet to discover why it behaves differently on flowable 6 (once I redeploy the process).

package be.mlippens.test

import org.flowable.engine.*;
import org.flowable.engine.common.AbstractEngineConfiguration;
import org.flowable.engine.common.api.delegate.event.FlowableEvent;
import org.flowable.engine.common.api.delegate.event.FlowableEventListener;
import org.flowable.engine.compatibility.DefaultFlowable5CompatibilityHandlerFactory;
import org.flowable.engine.delegate.event.FlowableEngineEventType;
import org.flowable.engine.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.flowable.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration;
import org.flowable.engine.repository.DeploymentProperties;
import org.flowable.engine.task.Task;
import org.flowable.engine.test.Deployment;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;


public class FlowableAsyncAndExclusiveBehaviourTest {

    private ProcessEngine processEngine;

    private Logger logger = LoggerFactory.getLogger(getClass());

    private String bpmnXml = "<?xml version='1.0' encoding='UTF-8'?>\n" +
            "<definitions xmlns=\"http://www.omg.org/spec/BPMN/20100524/MODEL\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:flowable=\"http://flowable.org/bpmn\" xmlns:bpmndi=\"http://www.omg.org/spec/BPMN/20100524/DI\" xmlns:omgdc=\"http://www.omg.org/spec/DD/20100524/DC\" xmlns:omgdi=\"http://www.omg.org/spec/DD/20100524/DI\" typeLanguage=\"http://www.w3.org/2001/XMLSchema\" expressionLanguage=\"http://www.w3.org/1999/XPath\" targetNamespace=\"http://www.flowable.org/processdef\">\n" +
            "    <process id=\"0cbf54fd-c374-4439-827f-d115376f57b1\" name=\"testmodel_1\" isExecutable=\"true\">\n" +
            "        <documentation>model</documentation>\n" +
            "        <startEvent id=\"startEvent1\"/>\n" +
            "        <userTask id=\"sid-C04D28E5-326D-4FD4-987B-E1CCE1A9EA7E\" name=\"arbitrary task\" flowable:candidateGroups=\"admin\"/>\n" +
            "        <sequenceFlow id=\"sid-8D6CD306-53A8-44E9-AC0A-F12AAD3B601C\" sourceRef=\"startEvent1\" targetRef=\"sid-C04D28E5-326D-4FD4-987B-E1CCE1A9EA7E\"/>\n" +
            "        <scriptTask flowable:exclusive=\"true\" id=\"sid-9348F7BE-9DCD-4449-8FC3-28CC38B472AF\" name=\"a simple script&#xa;\" flowable:async=\"true\" scriptFormat=\"groovy\" flowable:autoStoreVariables=\"false\">\n" +
            "            <script><![CDATA[println \"foobar\"]]></script>\n" +
            "        </scriptTask>\n" +
            "        <sequenceFlow id=\"sid-4DF614E1-6C76-4DBE-B070-F3B3D125CC11\" sourceRef=\"sid-C04D28E5-326D-4FD4-987B-E1CCE1A9EA7E\" targetRef=\"sid-9348F7BE-9DCD-4449-8FC3-28CC38B472AF\"/>\n" +
            "        <endEvent id=\"sid-BAB2326A-A8B2-4901-976B-1A62A1128066\"/>\n" +
            "        <sequenceFlow id=\"sid-D4AA1339-8F5B-478D-86FC-BBC738D17ADB\" sourceRef=\"sid-9348F7BE-9DCD-4449-8FC3-28CC38B472AF\" targetRef=\"sid-BAB2326A-A8B2-4901-976B-1A62A1128066\"/>\n" +
            "    </process>\n" +
            "    <bpmndi:BPMNDiagram id=\"BPMNDiagram_0cbf54fd-c374-4439-827f-d115376f57b1\">\n" +
            "        <bpmndi:BPMNPlane bpmnElement=\"0cbf54fd-c374-4439-827f-d115376f57b1\" id=\"BPMNPlane_0cbf54fd-c374-4439-827f-d115376f57b1\">\n" +
            "            <bpmndi:BPMNShape bpmnElement=\"startEvent1\" id=\"BPMNShape_startEvent1\">\n" +
            "                <omgdc:Bounds height=\"30.0\" width=\"30.0\" x=\"100.0\" y=\"163.0\"/>\n" +
            "            </bpmndi:BPMNShape>\n" +
            "            <bpmndi:BPMNShape bpmnElement=\"sid-C04D28E5-326D-4FD4-987B-E1CCE1A9EA7E\" id=\"BPMNShape_sid-C04D28E5-326D-4FD4-987B-E1CCE1A9EA7E\">\n" +
            "                <omgdc:Bounds height=\"80.0\" width=\"100.0\" x=\"165.0\" y=\"135.0\"/>\n" +
            "            </bpmndi:BPMNShape>\n" +
            "            <bpmndi:BPMNShape bpmnElement=\"sid-9348F7BE-9DCD-4449-8FC3-28CC38B472AF\" id=\"BPMNShape_sid-9348F7BE-9DCD-4449-8FC3-28CC38B472AF\">\n" +
            "                <omgdc:Bounds height=\"80.0\" width=\"100.0\" x=\"163.71427235797933\" y=\"299.99999897820607\"/>\n" +
            "            </bpmndi:BPMNShape>\n" +
            "            <bpmndi:BPMNShape bpmnElement=\"sid-BAB2326A-A8B2-4901-976B-1A62A1128066\" id=\"BPMNShape_sid-BAB2326A-A8B2-4901-976B-1A62A1128066\">\n" +
            "                <omgdc:Bounds height=\"28.0\" width=\"28.0\" x=\"391.7142723579793\" y=\"325.99999897820607\"/>\n" +
            "            </bpmndi:BPMNShape>\n" +
            "            <bpmndi:BPMNEdge bpmnElement=\"sid-8D6CD306-53A8-44E9-AC0A-F12AAD3B601C\" id=\"BPMNEdge_sid-8D6CD306-53A8-44E9-AC0A-F12AAD3B601C\">\n" +
            "                <omgdi:waypoint x=\"129.9932545528355\" y=\"177.5502023634149\"/>\n" +
            "                <omgdi:waypoint x=\"165.0\" y=\"176.5\"/>\n" +
            "            </bpmndi:BPMNEdge>\n" +
            "            <bpmndi:BPMNEdge bpmnElement=\"sid-D4AA1339-8F5B-478D-86FC-BBC738D17ADB\" id=\"BPMNEdge_sid-D4AA1339-8F5B-478D-86FC-BBC738D17ADB\">\n" +
            "                <omgdi:waypoint x=\"263.7142723579793\" y=\"339.99999897820607\"/>\n" +
            "                <omgdi:waypoint x=\"391.7142723579793\" y=\"339.99999897820607\"/>\n" +
            "            </bpmndi:BPMNEdge>\n" +
            "            <bpmndi:BPMNEdge bpmnElement=\"sid-4DF614E1-6C76-4DBE-B070-F3B3D125CC11\" id=\"BPMNEdge_sid-4DF614E1-6C76-4DBE-B070-F3B3D125CC11\">\n" +
            "                <omgdi:waypoint x=\"214.688308448489\" y=\"215.0\"/>\n" +
            "                <omgdi:waypoint x=\"214.02596390949032\" y=\"299.99999897820607\"/>\n" +
            "            </bpmndi:BPMNEdge>\n" +
            "        </bpmndi:BPMNPlane>\n" +
            "    </bpmndi:BPMNDiagram>\n" +
            "</definitions>";

    @Before
    public void setup(){
        StandaloneInMemProcessEngineConfiguration configuration = (StandaloneInMemProcessEngineConfiguration)ProcessEngineConfiguration.createStandaloneInMemProcessEngineConfiguration();
        configuration.setFlowable5CompatibilityEnabled(true);
        configuration.setFlowable5CompatibilityHandlerFactory(new DefaultFlowable5CompatibilityHandlerFactory());

        configuration.setAsyncExecutorActivate(true);
        configuration.setAsyncExecutor(new DefaultAsyncJobExecutor());

        configuration.setDatabaseSchemaUpdate(AbstractEngineConfiguration.DB_SCHEMA_UPDATE_DROP_CREATE);


        processEngine = configuration.buildProcessEngine();

        processEngine.getRuntimeService().addEventListener(new FlowableEventListener() {
            @Override
            public void onEvent(FlowableEvent flowableEvent) {
                if (flowableEvent.getType().equals(FlowableEngineEventType.PROCESS_COMPLETED)) {
                    logger.info("Process completed!");
                }
            }

            @Override
            public boolean isFailOnException() {
                return false;
            }
        });
    }

    @After
    public void teardown(){
        processEngine.close();
    }

    @Test
    public void testWithFlowable5Deployment() throws Exception {
        RepositoryService repositoryService = processEngine.getRepositoryService();
        RuntimeService runtimeService = processEngine.getRuntimeService();
        TaskService taskService = processEngine.getTaskService();

        repositoryService.createDeployment()
                .disableSchemaValidation()
                .addInputStream("test-process.bpmn20.xml", new ByteArrayInputStream(bpmnXml.getBytes(StandardCharsets.UTF_8)))
                .deploymentProperty(DeploymentProperties.DEPLOY_AS_FLOWABLE5_PROCESS_DEFINITION, true)
                .deploy();

        runtimeService
                .createProcessInstanceBuilder()
                .processDefinitionKey("0cbf54fd-c374-4439-827f-d115376f57b1")
                .start();


        Task task = taskService
                .createTaskQuery()
                .singleResult();

        taskService.complete(task.getId());
    }
}

Thanks for posting the unit test. At first glance, the unit tests seems to be okay. In fact, the async executor is booted up in the regular way (the way it’s used for real) and not in a unit-test way. So that’s all good.

Not sure what’s going on that would explain the 1 minute delay (although it’s too round of a number to be coincidence)… so not sure where to look next … if you find any bit of information extra, please post here.