Using signals and messages to parallelize JavaTask executions

Hello,

I’m trying to make a PoC of a job orchestrator with BPMN and Flowable, but I’m having trouble with the parallelization of Java tasks.
Here’s the input my BPMN process receives:

[
  {
    "id": "foo",
    "dependencies": []
  },
  {
    "id": "bar",
    "dependencies": [ "foo" ]
  },
  {
    "id": "baz",
    "dependencies": [ "foo" ]
  },
  {
    "id": "qux",
    "dependencies": [ "bar" ]
  }
]

Each of the objects in the JSON above represents a job that my job orchestrator is supposed to execute. Also, some jobs depend on others and should only be started once all of their dependencies have been executed. With the input above, the jobs should be executed in the following order:
task-execution-order
The first thing I tried was to make batches (List<Job>) of jobs and execute them via a multi-instance call activity that starts multiple instances of the job execution subprocess in parallel. The batches I used were:

  1. [ "foo" ]
  2. [ "bar", "baz" ]
  3. [ "qux" ]
    Unfortunately, this is not ideal since in a real-world scenario bar and baz could take a different amount of time to execute, let’s say 1 minute for bar and 5 for baz. Since bar would finish much sooner, it would be perfect if I were able to schedule qux to execute immediately afterwards and not have it wait for baz.

The second thing I tried was to use signals and the following two diagrams:
Main process:


Subprocess:
sub-process
The idea is that the “Start Job Selection” Java task sends a signal that starts the “Job Selection” subprocess. The “Trigger Job Execution” task figures out what jobs can be executed and starts the “Job Execution” subprocess (not the one in the main diagram, but the one from the subprocess diagram). Finally, the “Execute Job” task prints a message and sends a signal that once again starts the “Job Selection” subprocess. An example of how the whole thing works (my implementation can be found here: https://github.com/nictas/flowable-test):

  1. “Start Job Selection” starts the “Job Selection” subprocess.
  2. The job selection subprocess figures out that the only job that can currently be executed is foo, as it has no dependencies, so it starts the “Job Execution” subprocess for it.
  3. foo is executed and the job execution subprocess then sends a signal that once again starts the job selection subprocess.
  4. The job selection subprocess now knows that foo has been executed and it then starts two job execution subprocesses in parallel for bar and baz.
  5. Both bar and baz finish and each job execution subprocess sends a separate signal that starts a new instance of the job selection subprocess. This results in two job selection subprocesses running concurrently and this is where the problem starts.
  6. Both instances of the job selection subprocess figure out that qux should be executed and start two job execution subprocesses. As a result, qux is executed twice, which is a problem.
  7. Finally, the two instances of the job selection subprocess see that all jobs have been executed and send two signals back to the main process (to the “All Jobs Done” catching event) and this results in the following exception:
org.flowable.common.engine.api.FlowableOptimisticLockingException: ActivityInstanceEntity[id=29, activityId=allJobsDoneSignalIntermediateCatchEvent, activityName=All Jobs Done, executionId= 13] was updated by another transaction concurrently
	at org.flowable.common.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:577)
	at org.flowable.common.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:364)
	at org.flowable.common.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)
	at org.flowable.common.engine.impl.interceptor.CommandContext.close(CommandContext.java:69)
	at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:92)
	at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
	at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
	at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
	at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.executeJob(ExecuteAsyncRunnable.java:127)
	at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.run(ExecuteAsyncRunnable.java:115)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

I feel like this is not the right way to accomplish what I want, but I’ve ran out of ideas. The use-case seems simple and yet I don’t see a way of implementing it. Can anyone give me a hint of what I can do?

Not sure if you have read the True parallel service task execution with Flowable blog post.

Depending whether you need your orchestration to happen in one transaction or multiple you can do something similar that the blog post suggest with 6.6.0.

You can also look at this from another dimension. Instead of creating a BPMN that would be able to handle your JSON input. Why not create a BPMN XML out of your json input using parallel gateways where needed?

If you can make an XML on the fly then you can play with making the different service tasks async (executed in 2 different transactions) or playing with the true parallel service tasks (possible from 6.6.0).

Cheers,
Filip

1 Like

Thanks for the info @filiphr! Just to make sure I understood, by “making a BPMN XML on the fly” you mean that I could construct a BPMN diagram programmatically via the BpmnModel API, right? Like in this example?
Also, that BPMN should look kind-of like this right?

Yes that is what I meant.

Keep in mind that you still need to deploy this BpmnModel via the RepositoryService

repositoryService.createDeployment()
        .addString(jsonResourceName, jsonString)
        .addBpmnModel(jsonResourceName + ".bpmn", bpmnModel)
        .enableDuplicateFiltering()
        .deploy()

Where jsonResourceName can be the name of your json input. jsonString is the JSON input and bpmnModel is the created BPMN XML from json.

Cheers,
Filip

1 Like