Multi-Instance and then waiting for the events

Here is the use-case:

I have a collection of Objects (called Attempts in my code), each one will send a Kafka Message.

Now I need to wait for all responses to come back. The number of request/responses is identical, so if my collection has 10 objects, I need to wait for 10 messages to come back. But the number is dynamic so can’t hardcode it.

I’m lost how to model this.

This is the process I currently have (it’s not working)

<process id="pda-business-rules" name="pda-business-rules" isExecutable="true">
<startEvent id="startEvent1" flowable:formFieldValidation="true"></startEvent>
<serviceTask id="sid-DD13CEBE-7FA0-4A83-809D-14938322A886" name="Invoke BRE">
  <multiInstanceLoopCharacteristics isSequential="false" flowable:collection="attempts" flowable:elementVariable="attempt"></multiInstanceLoopCharacteristics>
</serviceTask>
<sequenceFlow id="sid-36E4E287-39AA-47DF-98D0-AFE55E2C4996" sourceRef="startEvent1" targetRef="sid-DD13CEBE-7FA0-4A83-809D-14938322A886"></sequenceFlow>
<endEvent id="sid-60367E20-1EC4-4064-BBEC-786D5B149638"></endEvent>
<intermediateCatchEvent id="sid-EF59401E-9EDE-4493-8D28-CFB8BF3DB955">
  <messageEventDefinition></messageEventDefinition>
</intermediateCatchEvent>
<sequenceFlow id="sid-8A1DA0FB-88D4-447D-822D-F9C2693CDFC4" sourceRef="sid-DD13CEBE-7FA0-4A83-809D-14938322A886" targetRef="sid-EF59401E-9EDE-4493-8D28-CFB8BF3DB955"></sequenceFlow>
<sequenceFlow id="sid-9DF4AD1D-379C-48BD-8DE7-911D15537C31" sourceRef="sid-EF59401E-9EDE-4493-8D28-CFB8BF3DB955" targetRef="sid-60367E20-1EC4-4064-BBEC-786D5B149638"></sequenceFlow>
<textAnnotation id="sid-06F7DB96-FC42-416E-B61D-3D01EC96E09B">
  <text>number of events = collection.size</text>
</textAnnotation>
<association id="sid-382CF897-49BB-4C8E-B4A1-A62A19B31BEC" sourceRef="sid-EF59401E-9EDE-4493-8D28-CFB8BF3DB955" targetRef="sid-06F7DB96-FC42-416E-B61D-3D01EC96E09B" associationDirection="None"></association>
<textAnnotation id="sid-C81DE594-851B-4F54-B789-5D84E380F3C9">
  <text>multi-instance, will send once per item in the collection</text>
</textAnnotation>
<association id="sid-1152FB3F-1117-4D65-A4A5-A2B3FA84FA19" sourceRef="sid-DD13CEBE-7FA0-4A83-809D-14938322A886" targetRef="sid-C81DE594-851B-4F54-B789-5D84E380F3C9" associationDirection="None"></association>

image

If someone could point me into the right direction, I’m happy to share the working result :wink:

Thanks

Alright, figured out a solution!

The workaround I implemented is like following:

A Collection is passed into the process. Using the multi-instance, I inject one Attempt in the parallel subprocess.
Multi-instance Type: parallel
Collection: attempts
Element variable: attempt

I sent out a message on my first delegate, wait for the response on the Intermediate Catching Event.

Now the real trick is the second delegate puts his output in a key that is attempt+executionId

    execution.setVariable(ProcessConstants.ATTEMPT + "-" + execution.getId(), attempt);

The last arrow contains an execution listener that collects all the attempts.
public class BusinessRulesAggregation implements ExecutionListener {
@Override
public void notify(DelegateExecution execution) {

    List<Attempt> attempts = new ArrayList<>();

    Map<String, Object> variables = execution.getVariables();
    for (String variableName : variables.keySet()) {
        if (variableName.startsWith(ProcessConstants.ATTEMPT + "-")) {
            attempts.add((Attempt) variables.get(variableName));
        }
    }

    execution.setVariable(ProcessConstants.ATTEMPTS_LIST, attempts);
}

}

There are downsides to this approach:

  • You can only use this approach once per process execution. You’ll have to clean up the execution first.
  • Doesn’t work in parallel

It would be great to be able to collect all the objects from a parallel process and merge them automatically in a collection. Would that be on the roadmap?
.

In case anyone wants to see the final process:

  <message id="bre_response_message" name="bre_response_message"></message>
  <process id="pda-business-rules" name="pda-business-rules" isExecutable="true">
    <startEvent id="startEvent1" flowable:formFieldValidation="true"></startEvent>
    <subProcess id="sid-F9EC6CE9-E8BE-4081-BD66-F49D64C8DAF2" name="subProcess">
      <multiInstanceLoopCharacteristics isSequential="false" flowable:collection="attempts" flowable:elementVariable="attempt"></multiInstanceLoopCharacteristics>
      <startEvent id="sid-00AF2F46-B7DB-4062-9A69-8F74BBDA20EF" flowable:formFieldValidation="true"></startEvent>
      <serviceTask id="sid-A3377DF9-9B98-4822-B2B0-1C733B73E451" name="Send to BRE" flowable:async="true" flowable:exclusive="false" flowable:delegateExpression="${businessRulesDelegate}"></serviceTask>
      <intermediateCatchEvent id="sid-2D8DE6C0-36CC-45E1-B759-13D21E6BDE31">
        <messageEventDefinition messageRef="bre_response_message"></messageEventDefinition>
      </intermediateCatchEvent>
      <endEvent id="sid-892EB7CE-E9E7-4B52-B412-4C5E536E6857"></endEvent>
      <serviceTask id="sid-1976648B-E793-4103-8F6F-EB9B7F97A625" name="Map BRE response to Attempt" flowable:async="true" flowable:exclusive="false" flowable:delegateExpression="${breResponseDelegate}"></serviceTask>
      <sequenceFlow id="sid-8B426163-DA99-4978-89A4-79C232AAEE12" sourceRef="sid-00AF2F46-B7DB-4062-9A69-8F74BBDA20EF" targetRef="sid-A3377DF9-9B98-4822-B2B0-1C733B73E451"></sequenceFlow>
      <sequenceFlow id="sid-8E8ADB49-824D-489A-B2AD-19E30397DCCD" sourceRef="sid-A3377DF9-9B98-4822-B2B0-1C733B73E451" targetRef="sid-2D8DE6C0-36CC-45E1-B759-13D21E6BDE31"></sequenceFlow>
      <sequenceFlow id="sid-590C60D0-490F-4622-9CC5-351E51BAC09F" sourceRef="sid-2D8DE6C0-36CC-45E1-B759-13D21E6BDE31" targetRef="sid-1976648B-E793-4103-8F6F-EB9B7F97A625"></sequenceFlow>
      <sequenceFlow id="sid-0CCC3EA1-9061-49A8-AFFD-742E5FCD483E" sourceRef="sid-1976648B-E793-4103-8F6F-EB9B7F97A625" targetRef="sid-892EB7CE-E9E7-4B52-B412-4C5E536E6857"></sequenceFlow>
    </subProcess>
    <sequenceFlow id="sid-36E4E287-39AA-47DF-98D0-AFE55E2C4996" sourceRef="startEvent1" targetRef="sid-F9EC6CE9-E8BE-4081-BD66-F49D64C8DAF2"></sequenceFlow>
    <endEvent id="sid-54FFD1DE-32E6-4596-AA63-94D9FD098AA8"></endEvent>
    <sequenceFlow id="sid-F156D259-2BA9-4B3A-9112-9FA0386380B8" sourceRef="sid-F9EC6CE9-E8BE-4081-BD66-F49D64C8DAF2" targetRef="sid-54FFD1DE-32E6-4596-AA63-94D9FD098AA8">
      <extensionElements>
        <flowable:executionListener event="take" delegateExpression="${businessRulesAggregation}"></flowable:executionListener>
      </extensionElements>
    </sequenceFlow>
  </process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_pda-business-rules">
    <bpmndi:BPMNPlane bpmnElement="pda-business-rules" id="BPMNPlane_pda-business-rules">
      <bpmndi:BPMNShape bpmnElement="startEvent1" id="BPMNShape_startEvent1">
        <omgdc:Bounds height="30.0" width="30.0" x="45.0" y="332.5"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-F9EC6CE9-E8BE-4081-BD66-F49D64C8DAF2" id="BPMNShape_sid-F9EC6CE9-E8BE-4081-BD66-F49D64C8DAF2">
        <omgdc:Bounds height="193.0" width="585.5" x="240.0" y="248.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-00AF2F46-B7DB-4062-9A69-8F74BBDA20EF" id="BPMNShape_sid-00AF2F46-B7DB-4062-9A69-8F74BBDA20EF">
        <omgdc:Bounds height="30.0" width="30.0" x="285.0" y="318.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-A3377DF9-9B98-4822-B2B0-1C733B73E451" id="BPMNShape_sid-A3377DF9-9B98-4822-B2B0-1C733B73E451">
        <omgdc:Bounds height="80.0" width="100.0" x="368.5" y="293.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-2D8DE6C0-36CC-45E1-B759-13D21E6BDE31" id="BPMNShape_sid-2D8DE6C0-36CC-45E1-B759-13D21E6BDE31">
        <omgdc:Bounds height="30.0" width="30.0" x="510.0" y="318.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-892EB7CE-E9E7-4B52-B412-4C5E536E6857" id="BPMNShape_sid-892EB7CE-E9E7-4B52-B412-4C5E536E6857">
        <omgdc:Bounds height="28.0" width="28.0" x="765.0" y="319.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-1976648B-E793-4103-8F6F-EB9B7F97A625" id="BPMNShape_sid-1976648B-E793-4103-8F6F-EB9B7F97A625">
        <omgdc:Bounds height="80.0" width="100.0" x="615.0" y="293.0"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape bpmnElement="sid-54FFD1DE-32E6-4596-AA63-94D9FD098AA8" id="BPMNShape_sid-54FFD1DE-32E6-4596-AA63-94D9FD098AA8">
        <omgdc:Bounds height="28.0" width="28.0" x="900.0" y="330.5"></omgdc:Bounds>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge bpmnElement="sid-0CCC3EA1-9061-49A8-AFFD-742E5FCD483E" id="BPMNEdge_sid-0CCC3EA1-9061-49A8-AFFD-742E5FCD483E">
        <omgdi:waypoint x="714.9499999998662" y="333.0"></omgdi:waypoint>
        <omgdi:waypoint x="765.0" y="333.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-36E4E287-39AA-47DF-98D0-AFE55E2C4996" id="BPMNEdge_sid-36E4E287-39AA-47DF-98D0-AFE55E2C4996">
        <omgdi:waypoint x="74.94998098050706" y="347.47581859798817"></omgdi:waypoint>
        <omgdi:waypoint x="240.0" y="347.2097422019166"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-8E8ADB49-824D-489A-B2AD-19E30397DCCD" id="BPMNEdge_sid-8E8ADB49-824D-489A-B2AD-19E30397DCCD">
        <omgdi:waypoint x="468.45000000000005" y="333.0"></omgdi:waypoint>
        <omgdi:waypoint x="510.0" y="333.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-F156D259-2BA9-4B3A-9112-9FA0386380B8" id="BPMNEdge_sid-F156D259-2BA9-4B3A-9112-9FA0386380B8">
        <omgdi:waypoint x="825.45" y="344.49999999999994"></omgdi:waypoint>
        <omgdi:waypoint x="900.0" y="344.49999999999994"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-8B426163-DA99-4978-89A4-79C232AAEE12" id="BPMNEdge_sid-8B426163-DA99-4978-89A4-79C232AAEE12">
        <omgdi:waypoint x="314.94999869873214" y="333.0"></omgdi:waypoint>
        <omgdi:waypoint x="368.49999999995447" y="333.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge bpmnElement="sid-590C60D0-490F-4622-9CC5-351E51BAC09F" id="BPMNEdge_sid-590C60D0-490F-4622-9CC5-351E51BAC09F">
        <omgdi:waypoint x="539.9499990675947" y="333.0"></omgdi:waypoint>
        <omgdi:waypoint x="615.0" y="333.0"></omgdi:waypoint>
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</definitions>

Thanks for sharing your detailed solution!