FlowableEngineEventType: support for batches

Hi Flowable-Team,

Is it planned to support BATCH related events in the future releases? It is available in Camunda 7 starting from version 7.5, but not in Flowable.

Our use case: we want to make automatic process instance migration on application start, see my Github Flowable PI migration

and analog for Camunda: Camunda PI migration

As we schedule this process using batches in order not to block the application start time, we need a way to observe the migration results. The intuitive way I have expected is global event listener supported by Flowable, but I am missing BATCH_ related events here.

Alternative solution could be some BPMN technical process like this

Would be nice if you could add it guys.

Thanks
Alex

Hey @alex.chabatar,

It is correct that we do not have those batch events. However, you can still achieve the same thing by using the ENTITY_UPDATED event and capture the Batch that gets updated and check it’s status.

e.g. something like

public class BatchCompletedListener extends BaseEntityEventListener {

    public BatchCompletedListener() {
        super(true, Batch.class);
    }

    @Override
    public Collection<? extends FlowableEventType> getTypes() {
        return List.of(FlowableEngineEventType.ENTITY_UPDATED);
    }

    @Override
    protected void onUpdate(FlowableEvent event) {
        FlowableEntityEvent entityEvent = (FlowableEntityEvent) event;
        Batch batch = (Batch) entityEvent.getEntity();
        if (batch.getCompleteTime() != null) {
            // Custom implementation
        }
    }
}

However, if you are interested in higher level events, then we are more than happy to review a PR in this area.

Cheers,
Filip

1 Like

Thanks @filiphr, I will try this.
Cheers, Alex

@filiphr I have tried to integrate it to my test project, but it is not working. Not sure, could you have a look please? Added eventListener but it is not catched if I try to migrate the process (you could reproduce it by start with initial BPMN file deployment, then start the process with REST API, change something simple in BPMN definition & redeploy/restart app). Batch is scheduled and migrates the process instance, but I am not landing in this listener code.

Thanks
Alex

Thanks for sharing that @alex.chabatar. I tried it out, and indeed it does not go through there, due to the fact that the event isn’t through for that.

In any case, I think exposing a more high level event listener for this does make sense.

Cheers,
Filip

Thanks @filiphr, I was able to find a solution with high level listener:

package org.flowable.bpm.examples.springboot.engine;

import static java.util.Collections.emptyList;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.flowable.batch.api.BatchService;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.context.Context;
import org.flowable.engine.impl.jobexecutor.ProcessInstanceMigrationStatusJobHandler;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.impl.persistence.entity.JobEntity;

@Slf4j
public class BatchCompletedListener extends AbstractFlowableEngineEventListener {

    private static final Set<String> JOB_HANDLER_TYPES = Set.of(ProcessInstanceMigrationStatusJobHandler.TYPE);

    private static final String BATCH_ID = "batchId";
    private static final String BATCH_DOCUMENT_JSON = "batchDocumentJson";

    private static final String BATCH_STATUS_COMPLETED = "completed";
    private static final String BATCH_PART_STATUS_SUCCESS = "success";
    private static final String BATCH_PART_STATUS_FAIL = "fail";
    private static final String BATCH_PART_RESULT = "batchPartResult";

    @Override
    public Collection<? extends FlowableEventType> getTypes() {
        return List.of(FlowableEngineEventType.JOB_EXECUTION_SUCCESS);
    }

    @Override
    protected void jobExecutionSuccess(FlowableEngineEntityEvent event) {
        if (event.getEntity() instanceof JobEntity jobEntity && JOB_HANDLER_TYPES.contains(jobEntity.getJobHandlerType())) {
            var batchId = getBatchIdFromHandlerCfg(jobEntity.getJobHandlerConfiguration());
            var batch = batchService().getBatch(batchId);
            if (BATCH_STATUS_COMPLETED.equals(batch.getStatus())) {
                log.info("Completed Migration Batch '{}': {}",
                        batchId, batch.getBatchDocumentJson(BATCH_DOCUMENT_JSON));
                Optional.ofNullable(batchService().findBatchPartsByBatchId(batchId)).orElse(emptyList()).forEach(batchPart -> {
                    var batchPartStatus = batchPart.getStatus();
                    if (BATCH_PART_STATUS_SUCCESS.equals(batchPartStatus)) {
                        log.info("Batch part '{}' (batchId '{}') completed successfully for process instance '{}'",
                                batchPart.getId(), batchId, batchPart.getScopeId());
                    } else if (BATCH_PART_STATUS_FAIL.equals(batchPartStatus)) {
                        log.error("Batch part '{}' (batchId '{}') failed for process instance '{}': {}",
                                batchPart.getId(), batchId, batchPart.getScopeId(),
                                batchPart.getResultDocumentJson(BATCH_PART_RESULT));
                    }
                });
            }
        }
    }

    private ProcessEngineConfigurationImpl processEngineConfiguration() {
        return Context.getProcessEngineConfiguration();
    }

    private BatchService batchService() {
        return processEngineConfiguration().getBatchServiceConfiguration().getBatchService();
    }

    private static String getBatchIdFromHandlerCfg(String handlerCfg) {
        try {
            var cfgAsJson = getObjectMapper().readTree(handlerCfg);
            if (cfgAsJson.has(BATCH_ID)) {
                return cfgAsJson.get(BATCH_ID).asText();
            }
            return null;
        } catch (IOException e) {
            return null;
        }
    }

    private static ObjectMapper getObjectMapper() {
        if (CommandContextUtil.getCommandContext() != null) {
            return CommandContextUtil.getProcessEngineConfiguration().getObjectMapper();
        } else {
            return new ObjectMapper();
        }
    }

}

That’s another solution as well :slight_smile: