I have a requirement to run parallel user tasks and each user task should be escalated after the due date. When I try to run the workflow (attached)…the result is not consistent. I am having difficulty making all timers start in parallel. Is there any other way I can design this workflow?
I am still having this issue. Is there anyone who can help me with this. I have also configured Async executor to be true in my yml file.
Hi Santosh,
parallel timers should work.
This is not exactly your case. Could you create a jUni test for your case please?
Martin
I confirm that the parallel timers are working fine most of the time. But randomly it fails with this message:
“Error while evaluating expression: ${divRequestUserServiceImpl.getCandidateUsers(execution)}”.
Most of the times this works but only once in a while one of the timers from ACT_RU_TIMER_JOB fails.
What is the best way to handle this cases? I am thinking of reading the ACT_RU_DEADLETTER_JOB table and putting it back into ACT_RU_TIMER_JOB table and retry. Any suggestions?
package com.gm.gpsc.dpofweb.process;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.flowable.engine.ManagementService;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.common.api.FlowableException;
import org.flowable.engine.impl.test.PluggableFlowableTestCase;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.test.Deployment;
import org.flowable.job.api.JobQuery;
import org.flowable.job.api.TimerJobQuery;
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class IntermediateTimerEventTest extends PluggableFlowableTestCase {
private SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd’T’HH:mm:ss”);
@Autowired
private RuntimeService runtimeService;
@Autowired
private ManagementService managementService;
@Autowired
private TaskService taskService;
@Autowired
private ProcessEngineConfiguration processEngineConfiguration;
@Test
@Deployment
public void testParallelTimerEvents() throws Exception {
// Set the clock fixed
Date startTime = new Date();
// After process start, there should be timer created
ProcessInstance pi = runtimeService.startProcessInstanceByKey("division-request");
TimerJobQuery jobQuery = managementService.createTimerJobQuery().processInstanceId(pi.getId());
assertEquals(4, jobQuery.count());
// After setting the clock to time '50minutes and 5 seconds', the bouth timers should fire in parralel
processEngineConfiguration.getClock().setCurrentTime(new Date(startTime.getTime() + ((50 * 60 * 1000) + 5000)));
try {
waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(
this.processEngineConfiguration, jobQuery, 300000L, 250L,true
);
assertEquals(0, jobQuery.count());
//assertProcessEnded(pi.getProcessInstanceId());
assertEquals("Timer paths must be executed exactly 4 times without failure repetition",
4, IntermediateTimerEventTestCounter.getCount());
} finally {
processEngineConfiguration.getClock().reset();
}
}
public static void waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(ProcessEngineConfiguration processEngineConfiguration, TimerJobQuery jobQuery, long maxMillisToWait, long intervalMillis,
boolean shutdownExecutorWhenFinished) {
AsyncExecutor asyncExecutor = processEngineConfiguration.getAsyncExecutor();
asyncExecutor.start();
processEngineConfiguration.setAsyncExecutorActivate(true);
try {
Timer timer = new Timer();
InterruptTask task = new InterruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
try {
areJobsAvailable = areJobsOrExecutableTimersAvailable(jobQuery);
} catch (Throwable t) {
// Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
// isolation level doesn't allow READ of the table
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new FlowableException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
if (shutdownExecutorWhenFinished) {
processEngineConfiguration.setAsyncExecutorActivate(false);
asyncExecutor.shutdown();
}
}
}
public static boolean areJobsOrExecutableTimersAvailable(TimerJobQuery jobQuery) {
boolean emptyJobs = jobQuery.list().isEmpty();
if (emptyJobs) {
emptyJobs = jobQuery.executable().list().isEmpty();
return !emptyJobs;
} else {
return true;
}
}
private static class InterruptTask extends TimerTask {
protected boolean timeLimitExceeded;
protected Thread thread;
public InterruptTask(Thread thread) {
this.thread = thread;
}
public boolean isTimeLimitExceeded() {
return timeLimitExceeded;
}
@Override
public void run() {
timeLimitExceeded = true;
thread.interrupt();
}
}
}
2018-05-10 19:31:14.425 INFO 29544 — [ main] o.f.j.s.i.a.AbstractAsyncExecutor : Shutting down the async job executor [org.flowable.spring.SpringAsyncExecutor].
2018-05-10 19:31:14.425 INFO 29544 — [uire-async-jobs] o.f.j.s.i.a.AcquireAsyncJobsDueRunnable : stopped async job due acquisition
2018-05-10 19:31:14.426 INFO 29544 — [et-expired-jobs] o.f.j.s.i.a.ResetExpiredJobsRunnable : stopped resetting expired jobs
2018-05-10 19:31:14.427 INFO 29544 — [uire-timer-jobs] o.f.j.s.i.a.AcquireTimerJobsRunnable : stopped async job due acquisition
Do you know why
fails?
By default job retry is set to 3. The job executor tries 3 times to execute job. After that job is moved to dead letter queue.
When you want to re-execute job again you can make it active again, but at first you should try to fix possible issue which caused the exception.
Martin