AsyncHistory not resetting jobs in MessageQueueMode

we are using flowable 6.6.0 and trying to write history into another no sql db using kafka as message que by enabling AsyncHistoryExecutorMessageQueueMode.

We are able to send/receive the event using the kafka queue but in case the message queue fails or is down, jobs is not getting processed. from the example document it is mentioned

Async History Executor will be booted in a light mode and reset the job which is not processed.Is there any configuration need to be added for this?

Please find the Configuration used

config.setAsyncHistoryEnabled(true);

                         config.setAsyncHistoryExecutorMessageQueueMode(true);

                         config.setJobManager(jobManager());

                        

                        

                                      public HistoryJobMessageHandler getHistoryJobMessageHandler() {

                         try {

                                       return new HistoryJobMessageHandler();

                         } catch (Exception e) {

                             LOG.error( " Error initializing HistoryJobMessageHandler");

                         }

                         return null;

          }

                         public AsyncHistorySender jobManager() {

                         return new AsyncHistorySender(eventProducerFactory);

          }



                         public AsyncHistoryJobMessageReceiver asyncHistoryJobMessageReceiver() {

                         AsyncHistoryJobMessageReceiver asyncHistoryJobMessageReceiver = new AsyncHistoryJobMessageReceiver(

                                                     springProcessEngineConfigurer().getCommandExecutor(), getHistoryJobMessageHandler(),

                                       springProcessEngineConfigurer().getAsyncExecutor().getJobServiceConfiguration());

          asyncHistoryJobMessageReceiver.setAsyncHistoryJobMessageHandler(getHistoryJobMessageHandler());

                         return asyncHistoryJobMessageReceiver;

          }            

custom message handler looks like

public class HistoryJobMessageHandler implements AsyncHistoryJobMessageHandler {

private ObjectMapper objectMapper = new ObjectMapper();



@Override

public boolean handleJob(HistoryJobEntity historyJobEntity, JsonNode historyData) {



    JsonNode historyJsonNode = null;

    try {



        historyJsonNode = objectMapper.readTree(historyJobEntity.getAdvancedJobHandlerConfigurationByteArrayRef()

                                                                .getBytes(ScopeTypes.BPMN));

        LOG.debug(historyJsonNode.toString());

        getHistoryEventProcessor().processHistoryEvents(historyJsonNode);

    } catch (Exception e) {

      LOG.error("Exception while Processing Job Entity job ID-"+historyJobEntity.getId(), e);

        return false;

    }



    return true;

}                      

                        

                        

                         The custom job manager   is as follows

public class AsyncHistorySender extends AbstractMessageBasedJobManager {

private final EventProducerFactory eventProducerFactory;

@Value("${kafka.enabled:true}")

private boolean kafkaEnabled;



@Value("${kafka.bindings.async-history.topic:async-history}")

private String asyncHistoryTopic;

@Inject

public AsyncHistorySender(EventProducerFactory eventProducerFactory) {

    this.eventProducerFactory = eventProducerFactory;

}



          @Override

          protected void sendMessage(JobInfo job) {

                         try {

                             

         ProducerParms producerParms = KafkaProdParams.builder()

                                                      .topic(asyncHistoryTopic)

                                                      .build();

         var histEvent=HistoryEvent.builder().jobId(job.getId()).build();

         EventProducer<HistoryEvent> producer = eventProducerFactory.getProducer(asyncHistoryTopic);

       boolean status=  producer.submitSync(histEvent, producerParms);


  

 

 } catch (Exception e) {

      throw e;

 }

                        

          }

The reset thread/runnable is initialized here: flowable-engine/AbstractAsyncExecutor.java at main · flowable/flowable-engine · GitHub

Do you see that thread running in your system? In message queue mode, the reset runnable will delete the old job and reinsert a new one with the same data (triggering the message sending again). Can you put a breakpoint in the DefaultJobManager#unacquire to see if it is triggered?