Hi,
I am trying to implement an async flow using a camel task followed by receive task. The message queue being used in RabbitMQ. The camel task is able to send message on the queue, however the system doesn’t find receive task when a message is arrived. Is there something I am missing here?
The BPMN file looks like:
<?xml version="1.0" encoding="UTF-8"?> <definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:flowable="http://flowable.org/bpmn" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.flowable.org/processdef"> <process id="PingPongEx" isExecutable="true"> <startEvent id="start"></startEvent> <serviceTask id="pingCamel" name="camelTask" flowable:exclusive="false" flowable:type="camel"></serviceTask> <sequenceFlow id="flow2" sourceRef="pingCamel" targetRef="receiveAsyncTask"></sequenceFlow> <endEvent id="end"></endEvent> <serviceTask id="servicetask1" name="Initialize Users and Products" flowable:class="com.example.tasks.InitVariablesTask"></serviceTask> <sequenceFlow id="flow3" sourceRef="servicetask1" targetRef="pingCamel"></sequenceFlow> <sequenceFlow id="flow4" sourceRef="start" targetRef="servicetask1"></sequenceFlow> <receiveTask id="receiveAsyncTask" name="Receive Task"></receiveTask> <sequenceFlow id="flow5" sourceRef="receiveAsyncTask" targetRef="end"></sequenceFlow> </process> </definitions>
Below is my Camel Config:
this.from("flowable:PingPongEx:pingCamel").log(LoggingLevel.INFO, this.logger, "Sent request body: ${body}") .process(exchange -> System.out.println("Properties: " + exchange.getProperties())) .process(exchange -> { final EventMessage msg = new EventMessage(); msg.setId(exchange.getProperty("PROCESS_ID_PROPERTY", String.class)); // msg.setMessage((String) exchange.getIn().getBody()); exchange.getIn().setBody(this.objectMapper.writeValueAsString(msg)); }) .to("rabbitmq:localhost:5672/tasks?username=xxxxx&password=xxxxx&autoDelete=false&routingKey=camel&queue=flowable-sender"); this.from( "rabbitmq:localhost:5672/tasks?username=xxxxx&password=xxxxx&autoDelete=false&routingKey=camel&queue=flowable-reciever") .log(LoggingLevel.INFO, this.logger, "Rabbit MQ Message Received: ${body}").unmarshal() .json(JsonLibrary.Jackson, EventMessage.class) .log(LoggingLevel.INFO, this.logger, "Unmarshalled Response: ${body.id}").process(exchange -> { final EventMessage msg = exchange.getIn().getBody(EventMessage.class); exchange.getIn().setBody(msg.getMessage()); exchange.setProperty("PROCESS_ID_PROPERTY", msg.getId()); }).to("flowable:PingPongEx:receiveAsyncTask");
Exception in logs
org.flowable.common.engine.api.FlowableException: Could not find activity receiveAsyncTask for processId 45e551be-9984-11e9-a559-f4d108675850 in defined timeout of 5000 ms.
at org.flowable.camel.FlowableProducer.signal(FlowableProducer.java:141) ~[flowable-camel-6.4.1.jar:6.4.1]
at org.flowable.camel.FlowableProducer.process(FlowableProducer.java:73) ~[flowable-camel-6.4.1.jar:6.4.1]
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) ~[camel-core-2.23.2.jar:2.23.2]
at org.apache.camel.component.rabbitmq.RabbitConsumer.doHandleDelivery(RabbitConsumer.java:104) [camel-rabbitmq-2.23.2.jar:2.23.2]
at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:79) [camel-rabbitmq-2.23.2.jar:2.23.2]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_171]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_171]