How to make use of Vertx Eventbus in Service Task?

Hi

I’m using a Vertx Eventbus for communication with a different verticle. This should ideally send a reply back.
This is the code I’m using for that purpose.

public class ServiceValidateDelegate implements JavaDelegate {
	@Override
	public void execute(DelegateExecution execution) {
		String name = execution.getVariable("name").toString();
		Vertx vertx = (Vertx) execution.getVariable("process");
		vertx.eventBus().send("service.validate", name, reply -> {
			System.out.println("service reply : " + reply.result());
		});
	}}

But an exception occurs on this line

vertx.eventBus().send("service.validate", name, reply -> {
    			System.out.println("service reply : " + reply.result());
    		});

java.lang.NullPointerException
at org.srm.Vertx.Service.Flowable.Delegates.ServiceValidateDelegate.execute(ServiceValidateDelegate.java:14)
at org.flowable.engine.impl.delegate.invocation.JavaDelegateInvocation.invoke(JavaDelegateInvocation.java:35)
at org.flowable.engine.impl.delegate.invocation.DelegateInvocation.proceed(DelegateInvocation.java:35)
at org.flowable.engine.impl.delegate.invocation.DefaultDelegateInterceptor.handleInvocation(DefaultDelegateInterceptor.java:26)
at org.flowable.engine.impl.bpmn.behavior.ServiceTaskJavaDelegateActivityBehavior.execute(ServiceTaskJavaDelegateActivityBehavior.java:59)
at org.flowable.engine.impl.bpmn.helper.ClassDelegate.execute(ClassDelegate.java:203)
at org.flowable.engine.impl.agenda.ContinueProcessOperation.executeActivityBehavior(ContinueProcessOperation.java:264)
at org.flowable.engine.impl.agenda.ContinueProcessOperation.executeSynchronous(ContinueProcessOperation.java:158)
at org.flowable.engine.impl.agenda.ContinueProcessOperation.continueThroughFlowNode(ContinueProcessOperation.java:113)
at org.flowable.engine.impl.agenda.ContinueProcessOperation.continueThroughSequenceFlow(ContinueProcessOperation.java:311)
at org.flowable.engine.impl.agenda.ContinueProcessOperation.run(ContinueProcessOperation.java:79)
at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperation(CommandInvoker.java:88)
at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperations(CommandInvoker.java:72)
at org.flowable.engine.impl.interceptor.CommandInvoker.execute(CommandInvoker.java:56)
at org.flowable.engine.impl.interceptor.BpmnOverrideContextInterceptor.execute(BpmnOverrideContextInterceptor.java:25)
at org.flowable.common.engine.impl.interceptor.TransactionContextInterceptor.execute(TransactionContextInterceptor.java:53)
at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:71)
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
at org.flowable.engine.impl.TaskServiceImpl.complete(TaskServiceImpl.java:208)
at org.srm.Vertx.Service.Flowable.FlowableStartEvent.lambda$0(FlowableStartEvent.java:73)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:261)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:239)
at io.vertx.core.eventbus.impl.EventBusImpl$InboundDeliveryContext.next(EventBusImpl.java:565)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$5(EventBusImpl.java:524)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

Any ideas on this?

Hi Sid

Isn’t vertx null?

Regards
Martin

Hi Martin,

You were right, vertx obj was null. It working now. :slightly_smiling_face:
Though I do have a followup question.

vertx works asynchronously
so if i change the code in this manner -

@Override
	public void execute(DelegateExecution execution) {
		String name = execution.getVariable("name").toString();
		Vertx vertx = (Vertx) execution.getVariable("process");
		vertx.eventBus().send("service.validate", name, reply -> {
			**execution.setVariable("approved", reply.result());**
		});
	}}

It doesn’t seem to wait for the vertx task to complete so that it can fetch the result for execution.setVariable and that throws an exception.

Any ideas on what could be done?

I’m assuming that vertx callback will not run in the same context as the one where the execute method is being ran, hence why it probably throws an exception.

Can you try this instead of the lines you marked with **:

org.flowable.engine.impl.util.CommandContextUtil.getProcessEngineConfiguration().getRuntimeService().setVariable(execution.getId,“approved”, reply.result() )

This way, you’ll create a new command context that is independent from the original one and goes through the service instead of setting it directly on the execution.

Hi joram,

Thanks for the reply :grinning:
I changed the code this way.
But it seems to throw the same exception after running the code.

    @Override
    	public void execute(DelegateExecution execution) {
    		String name = execution.getVariable("name", false).toString();
    		try {
    			MainDeployment.vertx.eventBus().send("service.validate", name, reply -> {
    				Boolean res = Boolean.parseBoolean(reply.result().body().toString());
    				org.flowable.engine.impl.util.CommandContextUtil.getProcessEngineConfiguration().getRuntimeService()
    				.setVariable(execution.getId(), "approved", res);
    			});
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}

This is the exception being thrown

org.flowable.common.engine.api.FlowableException: Unknown property used in expression: ${approved}
	at org.flowable.common.engine.impl.el.JuelExpression.getValue(JuelExpression.java:50)
	at org.flowable.engine.impl.el.UelExpressionCondition.evaluate(UelExpressionCondition.java:37)
	at org.flowable.engine.impl.util.condition.ConditionUtil.hasTrueCondition(ConditionUtil.java:47)
	at org.flowable.engine.impl.bpmn.behavior.ExclusiveGatewayActivityBehavior.leave(ExclusiveGatewayActivityBehavior.java:75)
	at org.flowable.engine.impl.bpmn.behavior.FlowNodeActivityBehavior.execute(FlowNodeActivityBehavior.java:39)
	at org.flowable.engine.impl.agenda.ContinueProcessOperation.executeActivityBehavior(ContinueProcessOperation.java:264)
	at org.flowable.engine.impl.agenda.ContinueProcessOperation.executeSynchronous(ContinueProcessOperation.java:158)
	at org.flowable.engine.impl.agenda.ContinueProcessOperation.continueThroughFlowNode(ContinueProcessOperation.java:113)
	at org.flowable.engine.impl.agenda.ContinueProcessOperation.continueThroughSequenceFlow(ContinueProcessOperation.java:311)
	at org.flowable.engine.impl.agenda.ContinueProcessOperation.run(ContinueProcessOperation.java:79)
	at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperation(CommandInvoker.java:88)
	at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperations(CommandInvoker.java:72)
	at org.flowable.engine.impl.interceptor.CommandInvoker.execute(CommandInvoker.java:56)
	at org.flowable.engine.impl.interceptor.BpmnOverrideContextInterceptor.execute(BpmnOverrideContextInterceptor.java:25)
	at org.flowable.common.engine.impl.interceptor.TransactionContextInterceptor.execute(TransactionContextInterceptor.java:53)
	at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:71)
	at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
	at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56)
	at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51)
	at org.flowable.engine.impl.TaskServiceImpl.complete(TaskServiceImpl.java:208)
	at org.srm.Vertx.Service.Flowable.FlowableStartEvent.lambda$0(FlowableStartEvent.java:74)
	at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:261)
	at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:239)
	at io.vertx.core.eventbus.impl.EventBusImpl$InboundDeliveryContext.next(EventBusImpl.java:565)
	at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$5(EventBusImpl.java:524)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
	at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I’ve set ${approved} in the bpmn20.xml file.

    <sequenceFlow id="flow2" sourceRef="exclusivegateway1"
    			targetRef="servicetask1">
    			<conditionExpression xsi:type="tFormalExpression"><![CDATA[${approved}]]></conditionExpression>
    		</sequenceFlow>
<sequenceFlow id="flow7" sourceRef="exclusivegateway1"
			targetRef="servicetask3">
			<conditionExpression xsi:type="tFormalExpression"><![CDATA[${!approved}]]></conditionExpression>
		</sequenceFlow>