triggerAsync failing in a subprocess

Hi,
I’d like to have some tips about proper AsyncJobExecutor configuration to avoid error(s) like the one on subject when using SubProcesses.

My error scenario (intermittent…) is the following:

image

Is a process that performs a remote directory listing and generates a “list” of elements in a process variable. Then in a sequential subprocess (NOT asyncrounous, because it just won’t start, and neither exclusive) it pulls the file away and then deletes the source. Once all items are processed it ends.

In this image it has failed (well… actually not failed but it stays in RUNNING) and the failure seems to be the “triggerAsync” failing on the last ServiceTask with “Exception data: org.flowable.common.engine.api.FlowableOptimisticLockingException: Execution[ id ‘551608’ ] - activity ‘bpmremotermcommandservice1’ - parent ‘551600’ was updated by another transaction concurrently” :

My System is running on a 2 node cluster where each node (Websphere Liberty 19) contains a webapp embedding a ProcessEngine (6.4.2) which is a custom ProcessEngineCfg (extending CdiJtaProcEngConfig).
I only use ServiceTasks whose are always asynchronous (I need transaction boundaries around them) and some of them are also “triggerable” since they schedule activities on remote systems: in this scenario I got notified via JMS about the end of the remote task and I free the execution with RuntimeService.triggerAsync(executionId, newVariables)
I also have, forced by design, start and end execution listeners in every template I draw (we need to do stuff on our db tables for every procinst starting/ending) and also a JOB_EXECUTION_FAILURE event listener delegate to change the state of the previously mentioned structures whenever some job will be moved to deadletter.

My engine configuration for the AsyncJobExecutor is default (max 1 async job per acquisition) but I can’t figure out why is there some sort of concurrency issue between the two nodes.

The exception is intermittent and happens only when Asyncronous and Triggerable jobs are triggeredAsync inside a subprocess. No matter the load, outside a SubProcess I have no issues.

Any suggestions?

If the error alway occurs on the “rm” service task can you share the code for its delegate (feel free to redact sensitive sections). My understanding is that sends a JMS message to a system that removes the file and sends another JMS message back. A triggerable service task works like this:

  • The execute method runs
  • The process enters a wait state and persists to the database
  • At some future point something triggers it
  • The trigger method runs
  • The process proceeds

If the trigger occurs before the wait state transaction is fully committed, you’ll get an optimistic lock exception. To prevent this you can wrap the JMS call to the external system in a transaction listener like so:

Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
    commandContext -> {
        someService.someMethod();
    });

That is actually what was happening when I was using RuntimeService.trigger(executionId). Now that I’m using the triggerAsync API, I fall in this case not on a regular basis anymore.

My delegate works like:

  • execute method
  • JMS to request the schedule of the RM operation
  • JMS acks my ServiceTask and it “waits” after persistence
  • JMS wakes up a subscriber I’ve set up once RM has been executed
  • I correlate the process and triggerAsync the executionID

In this scenario what would be required to be inside a transactionlistener? All the business logic that is inside everyone of all my async triggerable service tasks?

Moreover: my processengineconfiguration has the property transactionsExternallyManaged set to true, would it be a further issue?

The triggerAsync method sets up a job for the asyncExecutor to do the actual triggering. This way it does not block execution until the process hits its next wait state.

Ideally, there would be as little as possible that could throw an error in the listener since it runs after the transaction has been committed, it will bypass some of Flowable’s auto rollback behavior. Typically we’d build messages outside of the listener and put only the actual sending inside.

I’m uncertain how using something like externally managed JTA for your transactions will impact the listener, if it fails to execute I think it would either be a bug or something that needs to be documented.

Huhm ok.

So since all my custom service tasks are subclasses of a BaseService superclass which implements JavaDelegate only, whether they be triggerable or not, and in the Overridden execute method I do something like this (the interesting part):

@Override
public void execute(DelegateExecution execution) throws BpmServiceException {
    executeServiceInSubclass(...);
}

where executeService(...) is where my business code specific for every service task resides, your idea is to incapsulate the above code, actually for any async triggerable or not, inside a new transaction listener like:

@Override
public void execute(DelegateExecution execution) throws BpmServiceException {
    Context.getTransactionContext().addTransactionListener(TransactionState.COMMITTED,
        commandContext -> {
            executeServiceInSubclass(...);
        });
}

Did I got it right?

I’m not sure I’m following, placing all of your business logic inside of the listener is a bad idea, since it bypasses Flowable’s error handling. Additionally things like setting variables may fail. Only the actual code that initiates the JMS message (places it on the queue, etc) should be inside the listener everything else should be outside. You could add a second method afterCommit() with an empty default implementation to your super class and place a call to that in your listener.

Yup I think we’re not understanding each other, my bad. :smile:

My business code in the execute(…), invokes an APIs from a subsytem which instantly answers me with something about: “Ok, I’ve scheduled the job” and my task execution ends and it becomes “triggerable”, I only take care of saving the current ExecutionID.

I’m not hooked on JMS waiting for a response, I’m instantly replied with the returnCode resulting from the schedule.

When this scheduled operation will be properly terminated by the subsystem it will push a message on a queue I’m subscribed to, I intercept it (I’m outside the service task), and using the CDI Injected RuntimeService instance i just call a triggerAsync providing the previously noted ExecutionID, waking up the ServiceTask.

If all of the work is done asynchronously outside of Flowable’s execution context anyway, then your proposal makes some sense (though it still doesn’t feel right to me). But if you need a more synchronous service task in the future you’ll want to implement JavaDelegate directly.

In our usage all ServiceTasks are nothing more than “triggers” for subsystems, and some of them must wait for the remote execution before proceeding to the next “brick”.

I did not understand the part: “implement JavaDelegate directly”. Actually all my service tasks implement it.
AFAIK, since I’m on asynchronous only, the AsyncExecutor polls a queue, gets jobs, persists data before and after the servicetask execution, then processes the next task. If some of those tasks are “triggerable” the process stays in a “wait state” with no executor holding it, since a .trigger or .triggerAsync happens related to its execution.

I had issues with .trigger about concurrent transactions, and it totally makes sense since .trigger API is processed “in realtime” on the engine, and the triggerable job maybe was not yet persisted by the AsyncExecutor holding it. But now on triggerAsync, it should be “triggered” on next queue querying by the AsyncExecutor, so there should NOT be any Executor late on persisting the triggerable job itself.

So I’m still stuck here.
I’ve tried your proposal with the transactionlistener, but every instance is failing with:
Caused by: java.lang.NullPointerException
at
org.flowable.variable.service.impl.persistence.entity.VariableScopeImpl. updateVariableInstance(VariableScopeImpl.java:849)

Question: is there a way, having only an ExecutionID at hand, to “query” the engine to know if that execution is already persisted and waiting for a “trigger” so that I can execute that before calling the triggerAsync(executionID) or (since I will know it’s persisted) a trigger.(executionID)?

Your java delegate also needs to implement the TriggerableActivityBehavior interface, as it needs to know what to do when the trigger is received. At that point, the logic will check if async is needed, and create a job if needed (which might be why it’s not working right now).

Yes, RuntimeService#createExecutionQuery(), passing in the waitState to the id of the wait state activity to the #activityId() method. However, I would try the TriggerableActivityBehavior approach first, as there should be no need for such a query.

Joram is one of the original authors, his advice is likely to be more correct than mine.

A few of my own thoughts:

  • The async executor uses it’s own tread pool and it’s own transactions. It wouldn’t know to wait for an existing transaction (it wouldn’t even know about it). Because you have multiple nodes, it might not even be executing on the same node that used .triggerAsync().
  • Because you’ve gotten a null pointer trying to update a variable, it appears that you are doing at least a little more than triggering another subsystem executeServiceInSubclass. Updating a process variable needs to be inside a transaction and so cannot be in a transaction complete listener.
  • My point regarding implementing implementing JavaDelegate directly was to handle cases where you needed to update a variables, etc.

Yup
I had the privilege to know him and to chat with him, and Tijs, a couple times, last was at FlowFest’19. :wink:

I’m trying providing all my delegates the implementation of the TriggerableActivityBehaviour aswell, if not properly behaving yet, I’ve noticed in debug that before triggering my instance if I watch:

runtimeService.createExecutionQuery().executionId(executionId).list().get(0)

there seems to be correlation to the failing ones in the attribute:

jobCount = 1

I’ll be back with news. =)

I was also at Flowfest, perhaps the only American in the room…

1 Like

I’m a bit of failing in this.
My palette file refers to my class using:

<attribute id="delegateexpression" value="${bpmRemoteRmCommandService}" visible="false"/>

My class only implements JavaDelegate, I’ve tried to let it implement TriggerableActivityBehavior aswell but I, doh, dunno what to put inside the “trigger” method that I override there.
I’ve tried to figure it out by peeking at ServiceTaskDelegateExpressionActivityBehavior class but ended with no ideas about how to perform the “trigger” and release the task.

This is all I have there (it’s an inspect of the DelegateExecution execution that I receive in the trigger method, signalEvent and signalData are empty):

idk if something’s missing because of the CDI JavaDelegate or just how to cast that Execution or get stuff to actually remove the wait state on the triggerable.

Ok sorry it was my bad.
I implemented the TriggerableActivityBehaviour on all my custom service tasks, not only the triggerable ones, so it was actually not progressing when hitting a non triggerable by design.

It seems to wait/unlock properly but I keep on having intermittents:

Exception data: org.flowable.common.engine.api.FlowableOptimisticLockingException: Execution[ id '62777' ] - activity 'bpmlogservice2' - parent '62769' was updated by another transaction concurrently

tho. =\

This is the actual scenario:

Having remoteLS as an aynchronous ServiceTask implementing JavaDelegate and TriggerableActivityBehaviour with an empty trigger method implementation and the subsequent subprocess is a sequential one not asynchronous neither exclusive (obviously).

Here the trace:

[1/17/20 18:49:53:570 CET] 00006d19 .asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler E Job 65022 failed
org.flowable.common.engine.api.FlowableOptimisticLockingException: Execution[ id ‘65017’ ] - activity ‘bpmlscommandservice1’ - parent ‘65001’ was upd
ated by another transaction concurrently
at org.flowable.common.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:505)
at org.flowable.common.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:292)
at org.flowable.common.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:191)
at org.flowable.common.engine.impl.interceptor.CommandContext.close(CommandContext.java:61)
at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:80)
at org.flowable.common.engine.impl.interceptor.JtaTransactionInterceptor.execute(JtaTransactionInterceptor.java:66)
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.executeJob(ExecuteAsyncRunnable.java:128)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.run(ExecuteAsyncRunnable.java:116)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:818)

[1/17/20 18:49:53:607 CET] 00006d7b .asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler E Job 65086 failed
org.flowable.common.engine.api.FlowableOptimisticLockingException: Execution[ id ‘65017’ ] - activity ‘subprocess1’ - parent ‘65001’ was updated by another t
ransaction concurrently
at org.flowable.common.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:505)
at org.flowable.common.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:292)
at org.flowable.common.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:191)
at org.flowable.common.engine.impl.interceptor.CommandContext.close(CommandContext.java:61)
at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:80)
at org.flowable.common.engine.impl.interceptor.JtaTransactionInterceptor.execute(JtaTransactionInterceptor.java:66)
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.executeJob(ExecuteAsyncRunnable.java:128)
at org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnable.run(ExecuteAsyncRunnable.java:116)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:818)

Hi,

the issue seems related to the fact that I got my response event, that will led to the trigger of the triggerable service task, before it has committed its transaction on the database.

I thought that the “triggerAsync” API will help me on that, actually (I know… it’s ugly as hell) this is working like a charm:

Thread.sleep(5000);
runtimeService.trigger(executionId, processVariables);

So the question is: since runtimeService is an Injected instance of org.flowable.engine.RuntimeEngine having only that instance and the executionId what is a query I could perform to know if that executionId is committed and then ready and waiting for its trigger?

Yes, that’s indeed the idea:

  • The main transaction commits
  • The async executor will execute (earliest this happens is the post-commit of previous transaction) the job now
  • Part of executing the job is sending the JMS message
  • At the end of the job, the process instance is brought into a state that is ready to receive the trigger.

Did you use the suggestion from above to wrap the JMS logic in a transactionlistener that works on the committed event? That way, the JMS message is sent once the process instance is in the right state.

We’ve discussed internally making the transaction listener the default (or at least a config option), but it’s a very low-level thing. This does make me think we should do it.

And I agree completely, moreover it’s not in my strings aswell.

I didn’t use that hint since everyone of my asynchronous&triggerable ServiceTaskshas a business code that schedule an operation and is considered “completed” if the returncode of that schedule was done properly. So I actually need that the response to that message is part of the transaction itself.

The “issue” then becomes that someone could be blazing fast at executing that schedule, fast enough to beat the transaction held by the ServiceTask.

Not sure if I’m following fully here: are you saying that part of your execution of the delegate task is that you write to a relational database in the same transaction (hence why you can have it in a post commit)?