Camel usable outside Camel Task? (e.g. in JavaDelegate or Expression)

Hello,

Is there a way to retrieve a Camel object (bean?) or the flowable camel context outside a Camel Task?

  • Either in a JavaDelegate (of a Service Task for example)
  • or in an Expression (in the Expression field of a Service Task using #{camel.callRoute(‘direct:myRoute’)}) for example?

The use-case could be to have a triggerable Service Task (not available for a Camel Task for now) using camel in its JavaDelegate (simulates a triggerable Camel Task) or in its Expression.

Thanks
Best Regards
William

1 Like

if using spring, you could inject the camelcontext into the delegate (spring) bean of your expression.

Hello,
Thanks for your answer :grin:
I have already tried (using @autowired), but the camel context is null.
Would you have a link on an example somewhere, because I was unable to find one?
Or a small example would be helpful.
Thanks again
Have a good day.
William.

I know, that this works out-of-the-box with spring-boot:

http://camel.apache.org/spring-boot.html

The CamelContext is registered as Spring bean “camelContext”.

Hi @wberges,

It’s hard to say why you don’t get the camel context without seeing your spring/camel configuration.

As an alternative you could try creating your own ActivityBehavior based on the original CamelBehavior and add the triggerable behavior. I havent had time to test it but something like this might work for you:

package mypackage;

import org.apache.camel.Exchange;
import org.flowable.camel.ExchangeUtils;
import org.flowable.camel.FlowableEndpoint;
import org.flowable.camel.impl.CamelBehaviorDefaultImpl;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.context.Context;
import org.flowable.engine.compatibility.Flowable5CompatibilityHandler;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.impl.util.Flowable5Util;

public class MyTriggerableCamelBehavior extends CamelBehaviorDefaultImpl {

    @Override
    public void execute(DelegateExecution execution) {
        setAppropriateCamelContext(execution);

        final FlowableEndpoint endpoint = createEndpoint(execution);
        final Exchange exchange = createExchange(execution, endpoint);

        try {
            endpoint.process(exchange);
        } catch (Exception e) {
            throw new FlowableException("Exception while processing exchange", e);
        }
        execution.setVariables(ExchangeUtils.prepareVariables(exchange, endpoint));

        boolean isV5Execution = false;
        if ((Context.getCommandContext() != null && Flowable5Util.isFlowable5ProcessDefinitionId(Context.getCommandContext(), execution.getProcessDefinitionId())) ||
                (Context.getCommandContext() == null && Flowable5Util.getFlowable5CompatibilityHandler() != null)) {

            isV5Execution = true;
        }

        handleCamelException(exchange, execution, isV5Execution);
    }

    @Override
    public void trigger(DelegateExecution execution, String signalName, Object signalData) {
        boolean isV5Execution = false;
        if ((Context.getCommandContext() != null && Flowable5Util.isFlowable5ProcessDefinitionId(Context.getCommandContext(), execution.getProcessDefinitionId())) ||
                (Context.getCommandContext() == null && Flowable5Util.getFlowable5CompatibilityHandler() != null)) {

            isV5Execution = true;
        }

        if (isV5Execution) {
            Flowable5CompatibilityHandler compatibilityHandler = Flowable5Util.getFlowable5CompatibilityHandler();
            compatibilityHandler.leaveExecution(execution);
            return;
        }
        leave(execution);
    }
}

You can then reference this class in your serviceTask (but note that the triggerable flag on it will be ignored, this servicetask will always be triggerable).
Alternatively you can use a CamelTask and set the camelBehaviorClass field extension (available in BPMN but not in modeler) to your class.

You then can trigger the servicetask by using the runtime API or by finishing a camel route with a call to the Flowable producer to("flowable://my-process/myactivityid").

Hope that this helps.

Regards,
Paul

Thanks Paul, yes, it helps! :slight_smile:
In the meantime, I was able to retrieve the Process Engine Camel Context using the following code:

{   // Get CamelContext from the engine configuration (if available).
    ProcessEngineConfiguration engineConfiguration = org.flowable.engine.impl.context.Context.getProcessEngineConfiguration();
    SpringProcessEngineConfiguration springConfiguration = (SpringProcessEngineConfiguration) engineConfiguration;
    String camelContextValue = springConfiguration.getDefaultCamelContext();
    // Get the CamelContext object and set the super's member variable.
    Object ctx = springConfiguration.getApplicationContext().getBean(camelContextValue);
    if (!(ctx instanceof CamelContext)) {
        throw new FlowableException("Could not find CamelContext named " + camelContextValue + ".");
    }
    return (CamelContext) ctx;
} 

Is it valid?

I will also try your solution.

Thanks again
Best Regards
William

Hello again @pstapleton,

It seems that the code I provided above is the content of the setAppropriateCamelContext method you suggested… :slight_smile:

Thanks a lot for the tip!!
I was not aware that we could use another delegate than a basical JavaDelegate.
Now I know that I can use a CamelBehaviorDefaultImpl class as JavaDelegate in my BPMN process and get a Camel EndPoint based on the Id of the service task.

Thanks again for your post, because there are no example of such camel usage (or I was unable to find one).
Best Regards
William

Hello,

After some tests, I was able to trigger my Camel Delegate, thanks @pstapleton :slight_smile:
Unfortunately, no way to retrieve parameters…

The process I use is the following:

  1. Create a BPMN process called myProcessCamel with a ServiceTask

  2. Create a Class extending CamelBehaviorDefaultImpl with both execute and trigger methods

  3. Define a Service Task with ID myDelegateTest in my BPMN (it seems that isTriggerable="true" doesn’t matter)

  4. Define a Camel route with route named
    from("flowable:myProcessCamel:myDelegateTest?copyVariablesToProperties=true")

  5. In the Execute method of the Delegate, create then execute the Camel endpoint associated to this ServiceTask ID (as you mentionned above)

    setAppropriateCamelContext(execution);        
    // Create a Camel EndPoint based on the ServiceTask Id. 
    // Equivalent to from("flowable:myProcessCamel:myTaskId"...) 
    final FlowableEndpoint endpoint = createEndpoint(execution);
    final Exchange exchange = createExchange(execution, endpoint);
    exchange.setProperty("executionId", execution.getId());
    endpoint.process(exchange); // Execute route of endpoint
    
  6. In the Camel route (endpoint), in a process method, extract the executionId (and some other parameters) from properties, store them in the Camel body then call another ActiveMQ route (simulate asynchronous answer)

  7. In this Camel ActiveMQ route, create a map bodyProcessVariables from the body then trigger the Delegate (asynchronous response) using
    runtimeService.triggerAsync(executionId, bodyProcessVariables);

Unfortunately, the only overridable and triggerable method of CamelBehaviorDefaultImpl is the following:

@Override
public void trigger(DelegateExecution execution, String signalName, Object signalData) {

It is well called following my runtimeService.triggerAsync call, but without input parameter.
It’s the same if I call in place

runtimeService.trigger(executionId, processVariables);
or runtimeService.trigger(executionId, processVariables, null);

I was unable to find another overridable method with parameters to call. :frowning:
Any idea about how to retrieve parameters in a trigger method?

Thanks in advance
Best Regards
William

After additional searches, I was able to find the original code of your answer here.

  if (!handleCamelException(exchange, execution, isV5Execution)) {
      //...            
      leave(execution);
  }

It seems to be called in the execute method if any Camel error wasn’t handled.
But should we call it also in the trigger method? What is its purpose in this case?

Hi @wberges,

As you identified, a way to pass properties (etc) from Camel back to the Triggerable ServiceTask was to set them as process variables and access them in the trigger method from the execution.

Regarding the use of leave(execution) I have had a look again and I have seen that there is some behaviour in Flowable that I did not expect.

I created my CamelBehavior by setting the camelBehaviorClass field extension and using a CamelTask. This results in the behavior being created by the class org.flowable.engine.impl.bpmn.parser.factory.DefaultActivityBehaviorFactory and used directly when executing in Flowable. If you do it in this way the trigger(execution, signalEvent, signalData) method needs to call leave(execution) for the process to continue its execution.

However, when you use a normal triggerable ServiceTask with class set to my CamelBehavior there is a difference in behaviour. The CamelBehavior class gets an additional wrapper from org.flowable.engine.impl.bpmn.helper.ClassDelegate which has its trigger method called and then calls the CamelBehavior trigger method.

In the ClassDelegate the code looks like this:

@Override
public void trigger(DelegateExecution execution, String signalName, Object signalData) {
    if (activityBehaviorInstance == null) {
        activityBehaviorInstance = getActivityBehaviorInstance();
    }

    if (activityBehaviorInstance instanceof TriggerableActivityBehavior) {
        ((TriggerableActivityBehavior) activityBehaviorInstance).trigger(execution, signalName, signalData);
        if(triggerable) {
            leave(execution);
        }
    } else {
        throw new FlowableException("signal() can only be called on a " + TriggerableActivityBehavior.class.getName() + " instance");
    }
}

Here an additional leave(execution) is called - once in the CamelBehavior and once in the ClassDelegate - and this is what is causing your issue. So in your case it’s ok to remove the leave(execution) method call, as it gets called elsewhere.

For me this difference in behaviour was quite unexpected and as I hadn’t tested with a ServiceTask I didn’t see the issue earlier.

I hope the above makes sense. Let me know if I should clarify something.

Regards,
Paul

Hello @pstapleton,

Thanks for the detailed inquiry and explanations :slight_smile:

I continue my tests.

Best Regards
William

@pstapleton I’m afraid it isn’t so ‘simple’… :frowning:

I have continued my tests.

  • My first test is a serial process with 3 tasks: 1 Service task (Camel Delegate with trigger) then 2 other Camel Tasks.
    Everything is fine if I don’t call leave(execution) in the trigger. I finish successfully the process.

  • My second test adds a second Service Task (a standard JavaDelegate this time without trigger) just after the first Service Task (Camel Delegate with trigger). It adds also an exclusive gateway just after this second Service Task to go to either Camel Task 1 or 2.
    image
    And in this case… I never go to the second Service Task (Delegate 02).
    I had to re-enable my leave(execution) in the trigger method to be able to go to the next task…

    And no more OptimisticLock exception.

I believed intially it was due to the fact that I had async="true" on the second Service Task, but it’s the same with or without it.
In this configuration, I haven’t found for now the attribute configuration requesting or not the leave(execution).

Annoying isn’t it?

I continue my tests…

I confirm the problem.

I have simplified my process:

image

  <process id="myProcessCamel" name="myProcessCamel" isExecutable="true">
    <startEvent id="startEvent1"></startEvent>
    <sequenceFlow id="start_to_1" sourceRef="startEvent1" targetRef="DelegateTest01"></sequenceFlow>
    <serviceTask id="DelegateTest01" name="Delegate 01" flowable:class="com.tests.services.DelegateTest01"></serviceTask>
    <sequenceFlow id="task1_to_2" sourceRef="DelegateTest01" targetRef="Delegate02"></sequenceFlow>
    <serviceTask id="Delegate02" name="Delegate 02" flowable:async="true" flowable:exclusive="false" flowable:class="com.tests.services.DelegateTest02"></serviceTask>
    <sequenceFlow id="task2_to_end" sourceRef="Delegate02" targetRef="end"></sequenceFlow>
    <endEvent id="end"></endEvent>
  </process>

My First Service Task:

@Service
public class DelegateTest01 extends CamelBehaviorDefaultImpl
{
    @Override
    public void execute(DelegateExecution execution) 
    {   
        try 
        {
            System.out.println("In delegate Execute before endpoint call: " + execution);
            setAppropriateCamelContext(execution);        
            // Create a Camel EndPoint based on the ServiceTask Id. Equivalent to from("flowable:myProcessCamel:myTaskId"...) 
            final FlowableEndpoint endpoint = createEndpoint(execution);
            final Exchange exchange = createExchange(execution, endpoint);
            // Execute route of endpoint
            endpoint.process(exchange);
            execution.setVariables(ExchangeUtils.prepareVariables(exchange, endpoint));
            System.out.println("In delegate Execute after endpoint call: " + execution);
            Boolean isV5Execution = false; // Always false in my config (flowable 6.4.0)
            handleCamelException(exchange, execution, isV5Execution);
        } 
        catch (Exception e) 
        {
            throw new FlowableException("DelegateTest01 Exception", e);
        }
    }
    @Override
    public void trigger(DelegateExecution execution, String signalName, Object signalData) 
    {
        System.out.println("In trigger method: " + execution);
        // DON'T CALL 'leave(execution)' or there will be an OptimisticLock exception!!!!! (or not... :) 
        // leave(execution);
    }  
}

My Second Service Task:

@Service
public class DelegateTest02 implements JavaDelegate
{
    @Override
    public void execute(DelegateExecution execution) 
    {
        try 
        {
            System.out.println("In delegate02 Execute " + execution);
        } 
        catch (Exception e) 
        {
            throw new FlowableException("DelegateTest02 Exception", e);
        }
    }
}

In this configuration (no leave(execution) in the DelegateTest01::trigger method), the execution stops after this DelegateTest01::trigger method (I never go in DelegateTest02::execute).
If I enable leave(execution), it works… :frowning:

Ok, I was able to reproduce the OptimisticLock problem by adding flowable:triggerable="true" to my first Service Task.
<serviceTask id="DelegateTest01" name="Delegate 01" flowable:class="com.tests.services.DelegateTest01" flowable:triggerable="true"></serviceTask>
It’s certainly due to the code if(triggerable) {leave(execution);} you mentioned above.

It means that, in a Camel triggerable task (based on a Service Task), the triggerable flag is not ignored :slight_smile:.
We MUST add the flowable:triggerable="true" attribute if we want to have the same behaviour as the Service triggerable task, meaning no explicit leave(execution) call needed in the trigger method (what makes sense).

And it also means that we really have to be careful about attributes we set (what I can understand) and to know in details how the engine works…

@pstapleton Do you agree with this conclusion? :slight_smile:

BTW, isn’t it a possibility to modify the default implementation of CamelBehaviour::execute in order to also manage the triggerable flag?

It would allow to use the default execute (no override so) if we don’t have additional actions in addition to the default Camel route call, and would move the call to leave(execution) in the trigger method if the task is declared triggerable in the BPMN (like for the standard Service Task).

Cheers
William

Hi @wberges ,

What you wrote makes sense. I was mistaken when I thought the triggerable flag was ignored completely. Though as you have seen, if you set the ServiceTask as not triggerable then the process can get stuck as well. So it really takes a bit of trial and error to get everything right.

I think there are some inconsistencies in the implementation in the engine, and I will try to create a separate post on the issues I have seen. Perhaps there are some improvements which would simplify things here.

Anyway, I hope that it’s working for you now.

Regards,
Paul

Yep, it works, and again, thanks a lot for your support :slight_smile:
Best Regards
William