Hello,
I’m trying to make a PoC of a job orchestrator with BPMN and Flowable, but I’m having trouble with the parallelization of Java tasks.
Here’s the input my BPMN process receives:
[
{
"id": "foo",
"dependencies": []
},
{
"id": "bar",
"dependencies": [ "foo" ]
},
{
"id": "baz",
"dependencies": [ "foo" ]
},
{
"id": "qux",
"dependencies": [ "bar" ]
}
]
Each of the objects in the JSON above represents a job that my job orchestrator is supposed to execute. Also, some jobs depend on others and should only be started once all of their dependencies have been executed. With the input above, the jobs should be executed in the following order:
The first thing I tried was to make batches (List<Job>
) of jobs and execute them via a multi-instance call activity that starts multiple instances of the job execution subprocess in parallel. The batches I used were:
[ "foo" ]
[ "bar", "baz" ]
-
[ "qux" ]
Unfortunately, this is not ideal since in a real-world scenariobar
andbaz
could take a different amount of time to execute, let’s say 1 minute forbar
and 5 forbaz
. Sincebar
would finish much sooner, it would be perfect if I were able to schedulequx
to execute immediately afterwards and not have it wait forbaz
.
The second thing I tried was to use signals and the following two diagrams:
Main process:
Subprocess:

The idea is that the “Start Job Selection” Java task sends a signal that starts the “Job Selection” subprocess. The “Trigger Job Execution” task figures out what jobs can be executed and starts the “Job Execution” subprocess (not the one in the main diagram, but the one from the subprocess diagram). Finally, the “Execute Job” task prints a message and sends a signal that once again starts the “Job Selection” subprocess. An example of how the whole thing works (my implementation can be found here: https://github.com/nictas/flowable-test):
- “Start Job Selection” starts the “Job Selection” subprocess.
- The job selection subprocess figures out that the only job that can currently be executed is
foo
, as it has no dependencies, so it starts the “Job Execution” subprocess for it. -
foo
is executed and the job execution subprocess then sends a signal that once again starts the job selection subprocess. - The job selection subprocess now knows that
foo
has been executed and it then starts two job execution subprocesses in parallel forbar
andbaz
. - Both
bar
andbaz
finish and each job execution subprocess sends a separate signal that starts a new instance of the job selection subprocess. This results in two job selection subprocesses running concurrently and this is where the problem starts. - Both instances of the job selection subprocess figure out that
qux
should be executed and start two job execution subprocesses. As a result,qux
is executed twice, which is a problem. - Finally, the two instances of the job selection subprocess see that all jobs have been executed and send two signals back to the main process (to the “All Jobs Done” catching event) and this results in the following exception:
org.flowable.common.engine.api.FlowableOptimisticLockingException: ActivityInstanceEntity[id=29, activityId=allJobsDoneSignalIntermediateCatchEvent, activityName=All Jobs Done, executionId= 13] was updated by another transaction concurrently
at org.flowable.common.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:577)
at org.flowable.common.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:364)
at org.flowable.common.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)
at org.flowable.common.engine.impl.interceptor.CommandContext.close(CommandContext.java:69)
at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:92)
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.executeJob(ExecuteAsyncRunnable.java:127)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.run(ExecuteAsyncRunnable.java:115)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
I feel like this is not the right way to accomplish what I want, but I’ve ran out of ideas. The use-case seems simple and yet I don’t see a way of implementing it. Can anyone give me a hint of what I can do?