Process element duplicate execution

Hi,
I’m using Flowable 6.7.2 and we have problem with twice element execution.
Imagine simple proces START->SCRIPT_TASK(asynch)-> STOP.

We have 4 servers each have ThreadPoolSize=1. In flowable there is notion of “asyncJobLockTime”, so when server executes processes it create rows in ACT_RU_JOB with (lock_exp_time) which default is 5 minutes. Each server can create as many rows(jobs) with these state as the value asyncExecutorThreadPoolQueueSize which is default 2048. Now we have problem because these queue is not executed by one thread in 5 minutes, which mean that another server reset “lock_exp_time” and tries to execute it.
From Flowable perspective everything is ok beacause server which executes job from queue whet it it its get FlowableOptimisticLockingException because job has been executed, but in out example the webservice call is duplicated…which is very bad

In my opinion something is wrong in these concept,maybe you should check if job from your queue is still valid to execute (pre optimistick check), or maybe we doing something wrong with Flowable config

If a job is locked for 5 minutes, it’s assumed that the node has crashed, and thus another engine should take over.

Additionally, the 5 minutes timeout is there because async work shouldn’t generally take more than 5 minutes, definitely if you take into account that a database transaction is kept open for 5 minutes too (some db’s don’t even allow this).

You can configure the timeout setting on the process engine configuration, but it is worthwhile checking why the async work takes > 5 minutes.

Thanks for your replay
Our webserice call takes about 10 seconds. Maybe I give you simple example to simulate these behaviur(of course if you have some snippet for creating test case in multi instance env I could write it)

You have to execute these program concurrently (two instances) to simulate two servers on singe db

Program

	public static void main(final String[] args) {
		int threadPoolSize = 1;
		StandaloneProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration();
		cfg.setJdbcUrl("jdbc:postgresql://host:5432/flowable").setJdbcUsername("flowable").setJdbcPassword("flowable")
				.setJdbcDriver("org.postgresql.Driver").setDatabaseSchemaUpdate(AbstractEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);

		cfg.setAsyncExecutorNumberOfRetries(0);
		cfg.setAsyncHistoryExecutorNumberOfRetries(0);

		cfg.setAsyncExecutorActivate(true);
		cfg.setHandleProcessEngineExecutorsAfterEngineCreate(true);
		cfg.setAsyncExecutorMaxPoolSize(threadPoolSize);
		cfg.setAsyncExecutorCorePoolSize(threadPoolSize);
		cfg.setAsyncHistoryExecutorCorePoolSize(threadPoolSize);
		cfg.setAsyncHistoryExecutorMaxPoolSize(threadPoolSize);

		AsyncJobExecutorConfiguration asyncExecutorConfiguration = cfg.getAsyncExecutorConfiguration();
		asyncExecutorConfiguration.setGlobalAcquireLockEnabled(true);
		asyncExecutorConfiguration.setGlobalAcquireLockPrefix("NodeId_" + System.nanoTime());
		asyncExecutorConfiguration.setTimerLockTime(Duration.ofMinutes(1)); //changed default to force error more quickly
		asyncExecutorConfiguration.setAsyncJobLockTime(Duration.ofMinutes(1));//changed default to force error more quickly

		asyncExecutorConfiguration.setResetExpiredJobsInterval(Duration.ofSeconds(5));//changed default to force error more quickly
		asyncExecutorConfiguration.setMoveTimerExecutorPoolSize(threadPoolSize);

		ProcessEngine processEngine = cfg.buildProcessEngine();
		RepositoryService repositoryService = processEngine.getRepositoryService();
//		repositoryService.createDeployment().addClasspathResource("error.bpmn20.xml").deploy(); uncomment these for first time to deploy process
		RuntimeService runtimeService = processEngine.getRuntimeService();

		for (int i = 0; i < 20; i++) {
			runtimeService.startProcessInstanceByKey("error");

		}
	}

Process

<?xml version="1.0" encoding="UTF-8"?>
<definitions
	xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:xsd="http://www.w3.org/2001/XMLSchema"
	xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
	xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
	xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
	xmlns:flowable="http://flowable.org/bpmn"
	typeLanguage="http://www.w3.org/2001/XMLSchema"
	expressionLanguage="http://www.w3.org/1999/XPath"
	targetNamespace="http://www.flowable.org/processdef">

	<process id="error" name="Error"
		isExecutable="true">

		<startEvent id="start" />
		<sequenceFlow sourceRef="start" targetRef="script" />
		<scriptTask id="script" name="Script"
			scriptFormat="groovy" flowable:async="true">
			<script>
				println 'start script '+new Date()+ " processInstanceId="+execution.getProcessInstanceId()
				Thread.sleep(10000)
				println 'stop script '+new Date()+ " processInstanceId="+execution.getProcessInstanceId()
			</script>
		</scriptTask>

<sequenceFlow sourceRef="script" targetRef="end" />
		<endEvent id="end" />

	</process>

</definitions>

When you run this program your (two instances at once) you shuld get error (after 1 minut) FlowableOptimisticLockingException and on console you see that process instance for example processInstanceId=137551 was executed twice on two JVMs (one on JVM 1 one on JVM 2)

With the settings you have provided, this behavior makes sense:

  • you have 1 thread available
  • your jobs take 10 seconds
  • you’re using the default queue size of 2048 with default acquire settings

This means that the async executor will keep filling up the internal queue with work. However, as you only have one worker thread , after a while the jobs. you’ll take on will have been sitting in the queue for a long time and are deemed lost by the other engines.

If you’re forcing it to 1 thread, you also need to make the queue size smaller.

This is correct - one process instance will be transactionally rolled back and only one process instance will be in the database.

If you don’t want this behavior either make the queue size smaller if you use less threads or make the timeout larger. The suggestion of doing another check is impossible to do, because the job could just have been picked up by another engine and handed off to the threadpool. When this engine then crashes, there would again be no way of knowing whether the job was correctfully executed or not by other engines.

I could change default “internal queue size” but please think about that is dangerous(duplicate execution could have very bad impact of system). Imagine that all my settings in Flowable are on defaults, there is chance that in this queue of 2048 elements exists one/single which could takes 5 minutes(some call to heavy load system) so in multi server env the elements after this “problematic” could execute twice. Rember that Flowable transaction on db != our system transaction so we add some data do our system twice.

Of course I could change “internal queue size” but on which size?. I have no guarantee that in some corner cases in this queue dosen’t appers element wich abnormal execution time which force another servers to reset “exp_time_”. From our perspective duplicate execution is dissaster, now we are thinking about setting these queue on size=1. I understand role of thes queue and performance benefits but not by twice execution price.

Maybe there shiuld be another solution on detection of node crash(some table with node_registry and last ping_time), or maybe node which is alive should have some timer which increse his act_ru_job (exp_time_). So if node is alive it sending update to db like

updata act_ru_job set exp_time_=now()+asyncJobLockTime where lock_owner=my_uuid
(timer run period should be asyncJobLockTime /2 )

Sory that I’m bothering you but this is very important

Hi Mirek,

I tried to address duplicate execution problem some time ago.
The problem was:
async process execution with email sending, sends emails (in some occasions) 2x.
(I can imagine more complicated usecase)

One of the possible solutions:
Custom job handler for email sending and custom email sending behavior. The behavior creates a new job, which will take care about email sending in the future by handler. We do not expect result of email sending in the process. So when process is successfully executed, the email sending job is created. If not, the job does not exists and no email is sent.

May be you could use something similar for your duplicate execution.
Regards
Martin

And please remember that Flowable is a transactional engine. The fact that in your use case this is not matching, doesn’t mean you have imply we made an architectural mistake (this algorithm has been like this since 2010 when Activiti was started).

In this particular use case, you seem to want exactly-once semantics. Flowable is not such an engine. In fact, typically this often would be modeled with compensating actions in case errors are detected, taking into account the relational transactional semantics.

This is not that a common pattern these days. Systems these days are typically idempotent or compensating - which have proven to scale.

That’s an option we could look into. It is also something you could add already yourself today. Do note the timer needs to be not a timer in the sense of Flowable, as it would compete with other jobs.

I understand. But also keep in mind that the async executor is a component that is running at high-scale / high-performance for millions of jobs / hour at many places (and this is for the Flowable customers we know, not even thinking about the open source user we have no visibility on). The blogpost series Handling asynchronous operations with Flowable – Part 1: Introducing the new Async Executor we did a while ago are based on real-life data.

That’s an option indeed for non-transactional service invocations. We’ve seen people implementing this pattern using the post-commit for this to make sure the process instance is in the correct state (which of course then needs special handling in case that fails … ). Another option could be using the external worker task and implement exactly-once semantics in a specialised (micro) service.

1 Like

Ok thanks,
I end with code below which I run after engine creation

DataSource dataSource = cfg.getDataSource(); //cfg is FlowableConfig (StandaloneProcessEngineConfiguration)
	new Thread(() -> {
			while (true) {
				try {
					Thread.sleep(cfg.getAsyncExecutor().getAsyncJobLockTimeInMillis() / 2);
					try (Connection connection = dataSource.getConnection()) {
						try (PreparedStatement prepareStatement = connection.prepareStatement(
								"update act_ru_job set lock_exp_time_=? where lock_owner_=? and lock_exp_time_ is not null")) {
							LocalDateTime timestamp =
									LocalDateTime.now().plus(cfg.getAsyncExecutor().getAsyncJobLockTimeInMillis(), ChronoUnit.MILLIS);
							prepareStatement.setTimestamp(1, Timestamp.valueOf(timestamp));
							prepareStatement.setString(2, cfg.getAsyncExecutor().getLockOwner());
							prepareStatement.execute();
						}
					}
				} catch (Exception e) {
					log.error(e.getMessage(),e);
				}
			}
		}).start();