we are using flowable 6.6.0 and trying to write history into another no sql db using kafka as message que by enabling AsyncHistoryExecutorMessageQueueMode.
We are able to send/receive the event using the kafka queue but in case the message queue fails or is down, jobs is not getting processed. from the example document it is mentioned
Async History Executor will be booted in a light mode and reset the job which is not processed.Is there any configuration need to be added for this?
Please find the Configuration used
config.setAsyncHistoryEnabled(true);
config.setAsyncHistoryExecutorMessageQueueMode(true);
config.setJobManager(jobManager());
public HistoryJobMessageHandler getHistoryJobMessageHandler() {
try {
return new HistoryJobMessageHandler();
} catch (Exception e) {
LOG.error( " Error initializing HistoryJobMessageHandler");
}
return null;
}
public AsyncHistorySender jobManager() {
return new AsyncHistorySender(eventProducerFactory);
}
public AsyncHistoryJobMessageReceiver asyncHistoryJobMessageReceiver() {
AsyncHistoryJobMessageReceiver asyncHistoryJobMessageReceiver = new AsyncHistoryJobMessageReceiver(
springProcessEngineConfigurer().getCommandExecutor(), getHistoryJobMessageHandler(),
springProcessEngineConfigurer().getAsyncExecutor().getJobServiceConfiguration());
asyncHistoryJobMessageReceiver.setAsyncHistoryJobMessageHandler(getHistoryJobMessageHandler());
return asyncHistoryJobMessageReceiver;
}
custom message handler looks like
public class HistoryJobMessageHandler implements AsyncHistoryJobMessageHandler {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public boolean handleJob(HistoryJobEntity historyJobEntity, JsonNode historyData) {
JsonNode historyJsonNode = null;
try {
historyJsonNode = objectMapper.readTree(historyJobEntity.getAdvancedJobHandlerConfigurationByteArrayRef()
.getBytes(ScopeTypes.BPMN));
LOG.debug(historyJsonNode.toString());
getHistoryEventProcessor().processHistoryEvents(historyJsonNode);
} catch (Exception e) {
LOG.error("Exception while Processing Job Entity job ID-"+historyJobEntity.getId(), e);
return false;
}
return true;
}
The custom job manager is as follows
public class AsyncHistorySender extends AbstractMessageBasedJobManager {
private final EventProducerFactory eventProducerFactory;
@Value("${kafka.enabled:true}")
private boolean kafkaEnabled;
@Value("${kafka.bindings.async-history.topic:async-history}")
private String asyncHistoryTopic;
@Inject
public AsyncHistorySender(EventProducerFactory eventProducerFactory) {
this.eventProducerFactory = eventProducerFactory;
}
@Override
protected void sendMessage(JobInfo job) {
try {
ProducerParms producerParms = KafkaProdParams.builder()
.topic(asyncHistoryTopic)
.build();
var histEvent=HistoryEvent.builder().jobId(job.getId()).build();
EventProducer<HistoryEvent> producer = eventProducerFactory.getProducer(asyncHistoryTopic);
boolean status= producer.submitSync(histEvent, producerParms);
} catch (Exception e) {
throw e;
}
}