Multi timers for parallel user tasks

bpmn20

I have a requirement to run parallel user tasks and each user task should be escalated after the due date. When I try to run the workflow (attached)…the result is not consistent. I am having difficulty making all timers start in parallel. Is there any other way I can design this workflow?

I am still having this issue. Is there anyone who can help me with this. I have also configured Async executor to be true in my yml file.

Hi Santosh,

parallel timers should work.


This is not exactly your case. Could you create a jUni test for your case please?

Martin

I confirm that the parallel timers are working fine most of the time. But randomly it fails with this message:
“Error while evaluating expression: ${divRequestUserServiceImpl.getCandidateUsers(execution)}”.
Most of the times this works but only once in a while one of the timers from ACT_RU_TIMER_JOB fails.
What is the best way to handle this cases? I am thinking of reading the ACT_RU_DEADLETTER_JOB table and putting it back into ACT_RU_TIMER_JOB table and retry. Any suggestions?

bpmn20

package com.gm.gpsc.dpofweb.process;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import org.flowable.engine.ManagementService;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.common.api.FlowableException;
import org.flowable.engine.impl.test.PluggableFlowableTestCase;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.test.Deployment;
import org.flowable.job.api.JobQuery;
import org.flowable.job.api.TimerJobQuery;
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class IntermediateTimerEventTest extends PluggableFlowableTestCase {
private SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd’T’HH:mm:ss”);

@Autowired
private RuntimeService runtimeService;

@Autowired
private ManagementService managementService;

@Autowired
private TaskService taskService;

@Autowired
private ProcessEngineConfiguration processEngineConfiguration;

@Test
@Deployment
public void testParallelTimerEvents() throws Exception {
    // Set the clock fixed
    Date startTime = new Date();

    // After process start, there should be timer created
    ProcessInstance pi = runtimeService.startProcessInstanceByKey("division-request");
    TimerJobQuery jobQuery = managementService.createTimerJobQuery().processInstanceId(pi.getId());
    assertEquals(4, jobQuery.count());        
    // After setting the clock to time '50minutes and 5 seconds', the bouth timers should fire in parralel
    processEngineConfiguration.getClock().setCurrentTime(new Date(startTime.getTime() + ((50 * 60 * 1000) + 5000)));
    try {
        waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(
                this.processEngineConfiguration, jobQuery, 300000L, 250L,true
        );            
        assertEquals(0, jobQuery.count());
        //assertProcessEnded(pi.getProcessInstanceId());
        assertEquals("Timer paths must be executed exactly 4 times without failure repetition",
                4, IntermediateTimerEventTestCounter.getCount());
    } finally {
        processEngineConfiguration.getClock().reset();
    }
}

public static void waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(ProcessEngineConfiguration processEngineConfiguration, TimerJobQuery jobQuery, long maxMillisToWait, long intervalMillis,
        boolean shutdownExecutorWhenFinished) {

    AsyncExecutor asyncExecutor = processEngineConfiguration.getAsyncExecutor();
    asyncExecutor.start();
    processEngineConfiguration.setAsyncExecutorActivate(true);

    try {
        Timer timer = new Timer();
        InterruptTask task = new InterruptTask(Thread.currentThread());
        timer.schedule(task, maxMillisToWait);
        boolean areJobsAvailable = true;
        try {
            while (areJobsAvailable && !task.isTimeLimitExceeded()) {
                Thread.sleep(intervalMillis);
                try {
                    areJobsAvailable = areJobsOrExecutableTimersAvailable(jobQuery);
                } catch (Throwable t) {
                    // Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
                    // isolation level doesn't allow READ of the table
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            timer.cancel();
        }
        if (areJobsAvailable) {
            throw new FlowableException("time limit of " + maxMillisToWait + " was exceeded");
        }

    } finally {
        if (shutdownExecutorWhenFinished) {
            processEngineConfiguration.setAsyncExecutorActivate(false);
            asyncExecutor.shutdown();
        }
    }
}

public static boolean areJobsOrExecutableTimersAvailable(TimerJobQuery jobQuery) {
    boolean emptyJobs = jobQuery.list().isEmpty();
    if (emptyJobs) {
    	emptyJobs = jobQuery.executable().list().isEmpty();
    	return !emptyJobs;
    } else {
        return true;
    }
}

private static class InterruptTask extends TimerTask {

    protected boolean timeLimitExceeded;
    protected Thread thread;

    public InterruptTask(Thread thread) {
        this.thread = thread;
    }

    public boolean isTimeLimitExceeded() {
        return timeLimitExceeded;
    }

    @Override
    public void run() {
        timeLimitExceeded = true;
        thread.interrupt();
    }
}

}

2018-05-10 19:31:14.425 INFO 29544 — [ main] o.f.j.s.i.a.AbstractAsyncExecutor : Shutting down the async job executor [org.flowable.spring.SpringAsyncExecutor].
2018-05-10 19:31:14.425 INFO 29544 — [uire-async-jobs] o.f.j.s.i.a.AcquireAsyncJobsDueRunnable : stopped async job due acquisition
2018-05-10 19:31:14.426 INFO 29544 — [et-expired-jobs] o.f.j.s.i.a.ResetExpiredJobsRunnable : stopped resetting expired jobs
2018-05-10 19:31:14.427 INFO 29544 — [uire-timer-jobs] o.f.j.s.i.a.AcquireTimerJobsRunnable : stopped async job due acquisition

Do you know why

fails?

By default job retry is set to 3. The job executor tries 3 times to execute job. After that job is moved to dead letter queue.

When you want to re-execute job again you can make it active again, but at first you should try to fix possible issue which caused the exception.

Martin