Hi,
I have a trouble when I restart the app with async history SQS and ElasticSearch
Settings
@Configuration
@AutoConfigureAfter({ProcessEngineAutoConfiguration.class, SQSConfig.class})
public class AsyncHistory {
@Value("${aws.sqs.flowable.history.queue.name}")
private String queue;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private ObjectMapper objectMapper;
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestinationName(queue);
jmsTemplate.setConnectionFactory(connectionFactory);
return jmsTemplate;
}
@Bean
public SQSMessageBasedJobManager jobManager() {
SQSMessageBasedJobManager jobManager = new SQSMessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate());
jobManager.setQueue(queue);
jobManager.setObjectMapper(objectMapper);
return jobManager;
}
@Bean
@ConditionalOnBean(SQSMessageBasedJobManager.class)
public EngineConfigurationConfigurer<SpringProcessEngineConfiguration> customProcessEngineConfigurer() {
return configuration -> {
configuration.setDbHistoryUsed(false);
configuration.setAsyncHistoryEnabled(true);
configuration.setAsyncHistoryExecutorActivate(true);
configuration.setAsyncHistoryExecutorMessageQueueMode(true);
configuration.setJobManager(jobManager());
};
}
}
@Slf4j
@RequiredArgsConstructor
public class SQSMessageBasedJobManager extends AbstractMessageBasedJobManager {
@Getter @Setter
private JmsTemplate jmsTemplate;
@Getter @Setter
private String queue;
@Getter @Setter
private ObjectMapper objectMapper;
@Override
protected void sendMessage(JobInfo job) {
try {
if (job instanceof HistoryJobEntity) {
HistoryJobEntity jobEntity = (HistoryJobEntity) job;
if (jobEntity.getAdvancedJobHandlerConfigurationByteArrayRef() != null) {
byte[] array = jobEntity.getAdvancedJobHandlerConfigurationByteArrayRef().getBytes();
JsonNode historyJsonNode = objectMapper.readTree(array);
if (historyJsonNode != null && historyJsonNode.has("type")
&& HistoryJsonConstants.TYPE_VARIABLE_CREATED
.equals(historyJsonNode.get("type").asText())) {
jmsTemplate.convertAndSend(queue, job.getId());
}
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
}
StackTrace
java.lang.NullPointerException: null
at org.flowable.job.service.impl.util.CommandContextUtil.getJobByteArrayEntityManager(CommandContextUtil.java:112) ~[flowable-job-service-6.4.1.jar:6.4.1]
at org.flowable.job.service.impl.util.CommandContextUtil.getJobByteArrayEntityManager(CommandContextUtil.java:108) ~[flowable-job-service-6.4.1.jar:6.4.1]
at org.flowable.job.service.impl.persistence.entity.JobByteArrayRef.ensureInitialized(JobByteArrayRef.java:128) ~[flowable-job-service-6.4.1.jar:6.4.1]
at org.flowable.job.service.impl.persistence.entity.JobByteArrayRef.getBytes(JobByteArrayRef.java:53) ~[flowable-job-service-6.4.1.jar:6.4.1]
at ....bpm.async.history.SQSMessageBasedJobManager.sendMessage(SQSMessageBasedJobManager.java:41) ~[main/:na]
at org.flowable.job.service.impl.asyncexecutor.message.AbstractMessageBasedJobManager$1.execute(AbstractMessageBasedJobManager.java:109) ~[flowable-job-service-6.4.1.jar:6.4.1]
at org.flowable.app.spring.SpringTransactionContext$2.afterCommit(SpringTransactionContext.java:79) ~[flowable-app-engine-spring-6.4.1.jar:6.4.1]
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:134) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:122) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:948) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:785) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152) ~[spring-tx-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.flowable.common.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:46) ~[flowable-spring-common-6.4.1.jar:6.4.1]
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30) ~[flowable-engine-common-6.4.1.jar:6.4.1]
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56) ~[flowable-engine-common-6.4.1.jar:6.4.1]
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51) ~[flowable-engine-common-6.4.1.jar:6.4.1]
at org.flowable.job.service.impl.asyncexecutor.ResetExpiredJobsRunnable.resetJobs(ResetExpiredJobsRunnable.java:97) [flowable-job-service-6.4.1.jar:6.4.1]
at org.flowable.job.service.impl.asyncexecutor.ResetExpiredJobsRunnable.run(ResetExpiredJobsRunnable.java:61) [flowable-job-service-6.4.1.jar:6.4.1]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]