Event registry-How to activate event and channel definition if we deploy them externally?

I have created simple spring-boot application with REST API to deploy event and channel definition files for KAFKA, where I deploy them using below code.

repositoryService.createDeployment()
.name(“Deployment of Event definition-two”)
.addInputStream(file.getName(), bis)
.deploy();

I want to know once these files are deployed how to activate them so that Kafka-consumer will start working and wait for arriving messages.

NOTE:I have tried putting files in resources folder and that works smoothly when i start the application but i want to deploy files externally.

1 Like

Is the file name correct, i.e. having a .channel and .event extension?

Deploying them with the correct extensions should be doing that. Can you describe what you mean with ‘externally’?

FIle names are correct with extensions, when i put same files in resources->eventRegisty directory everything is working fine.

Here “Externally” means I have create REST endpoint to upload files and deploying them using code mentioned above.

Ok, now I’m following. Have you tried getting the EventRepositoryService from the EventRegistryEngine and deploying programmatically there? That one for sure has the right set of deployers to process those files.

Hello,

i think i have quite the same problem.
I used the Flowable Designer to create a Inbound-Channel and an Event and deployed them.
Now i have an external Spring Boot Application and i try to use the previous deployed Channel-Definition so that the Application connects to the topic specified in the Channel-Definition and can receive Events.

I played around with the EventRegistryEngine, EventRegistry and EventRepositoryService but i can’t get it to work that the application connect to the Kafka-Topic over the deployed Channel-Definition

@JensV : can you share the models (channel/event/process) that you’re using for this? It’s hard to say anything without them.

Hello @joram

it’s really simple at the moment.

I have the following channel

{
  "type": "kafka",
  "channelType": "inbound",
  "deserializerType": "json",
  "topics": [
    "settlementOrderFile-in-0"
  ],
  "channelEventKeyDetection": {
    "jsonField": "eventName"
  },
  "name": "Settlement Order File",
  "key": "settlementOrderFile"
}

and the following event

{
  "payload": [
    {
      "name": "filename",
      "type": "string"
    },
    {
      "name": "file",
      "type": "json"
    },
    {
      "name": "eventName",
      "type": "string"
    }
  ],
  "name": "Test Event Name",
  "key": "testEvent-Name"
}

I can successfully subscribe to the topic via code with the following two classes

@Configuration
public class KafkaChannelRegistrationConfig {

    final EventRegistry eventRegistry;
    final EventRepositoryService eventRepositoryService;
    final KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor;

    @Autowired
    public KafkaChannelRegistrationConfig(
            BeanFactory beanFactory,
            KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor
    ) {
        this.kafkaChannelDefinitionProcessor = kafkaChannelDefinitionProcessor;
        this.kafkaChannelDefinitionProcessor.setBeanFactory(beanFactory);

        EventRegistryEngine eventRegistryEngine = EventRegistryEngines.getDefaultEventRegistryEngine();
        this.eventRepositoryService = eventRegistryEngine.getEventRepositoryService();
        this.eventRegistry = eventRegistryEngine.getEventRegistry();

    }

    @Bean
    public SettlementOrderFileTopicCreator settlementOrderFileTopicCreator() {
        return new SettlementOrderFileTopicCreator(eventRepositoryService, eventRegistry, kafkaChannelDefinitionProcessor);
    }
}

@Slf4j
public class SettlementOrderFileTopicCreator {

    public SettlementOrderFileTopicCreator(
            EventRepositoryService eventRepositoryService,
            EventRegistry eventRegistry,
            KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor
    ) {
        ChannelModel channelModel = eventRepositoryService.getChannelModelByKey("settlementOrderFile");
        kafkaChannelDefinitionProcessor.registerChannelModel(channelModel, "", eventRegistry, eventRepositoryService, false);
    }
}

Then I try to catch the event with the following two classes
but that didn’t work, no event arrived.

@SpringBootApplication
public class SettlementFileServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SettlementFileServiceApplication.class, args);
    }

    @Bean
    public EngineConfigurationConfigurer<SpringAppEngineConfiguration> customAppEngineConfigurer(SettlementFileProcessingService settlementFileProcessingService) {
        return engineConfiguration -> {
            engineConfiguration.addEventRegistryEventConsumer(settlementFileProcessingService.getConsumerKey(), settlementFileProcessingService);
        };
    }
}

@SpringBootApplication
public class SettlementFileServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SettlementFileServiceApplication.class, args);
    }

    @Bean
    public EngineConfigurationConfigurer<SpringAppEngineConfiguration> customAppEngineConfigurer(SettlementFileProcessingService settlementFileProcessingService) {
        return engineConfiguration -> {
            engineConfiguration.addEventRegistryEventConsumer(settlementFileProcessingService.getConsumerKey(), settlementFileProcessingService);
        };
    }
}

Up to this point I do not use the *.event and *.channel Files. I subscribe to the topic via code.
On the log-output i can see that my application successfully subscribe to the topic

2021-05-17 09:39:13.771  INFO [settlement-file-service,                ,                ,     ] 16472 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = xxxxxxx
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#settlementOrderFile-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#settlementOrderFile
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

If i put the two files (*.channel and *.event i postet before) into the eventregistry-folder on the classpath all works fine.

The Process looks like this

In that last screenshot, it looks like you didn’t configure the inbound event (only mapped the event). Can you give that a go.

Also, since I see you’re using Flowable Design, you can follow this guide: How-To: Getting Started with Channels and Events · Flowable Enterprise Documentation

Continuing the discussion from Event registry-How to activate event and channel definition if we deploy them externally?:

Hi joram,

I too trying to deploy the event definition and channel definition by programatically, below is the code

public FlowableEventRegistryApplication(RepositoryService repositoryService,
                                            EventRepositoryService eventRepositoryService, RuntimeService runtimeService) {
        this.repositoryService = repositoryService;
        this.eventRepositoryService = eventRepositoryService;
        this.runtimeService = runtimeService;
        try {
            String parentPath = "/Users/appasahebneelawani/Composition/Flowable/configFiles/eventregistry/";

            File eventFile = new File(parentPath + "event-one.event"); //Don't forget the .event extension!

            byte[] fileContent = Files.readAllBytes(eventFile.toPath());

            this.eventRepositoryService.createDeployment()
                .name("My event")
                .addEventDefinitionBytes(eventFile.getName(), fileContent)
                .deploy();

            File inboundChannelFile = new File(parentPath + "inbound.channel"); //Don't forget the .channel extension!
            byte[] fileContentInbound = Files.readAllBytes(inboundChannelFile.toPath());

            File outboundChannelFile = new File(parentPath + "outbound.channel"); //Don't forget the .channel extension!
            byte[] fileContentOutbound = Files.readAllBytes(outboundChannelFile.toPath());

            this.eventRepositoryService.createDeployment()
                .name("Test channel")
                .addChannelDefinitionBytes(inboundChannelFile.getName(),  fileContentInbound)
                .deploy();

            this.eventRepositoryService.createDeployment()
                .name("Test channel")
                .addChannelDefinitionBytes(outboundChannelFile.getName(),  fileContentOutbound)
                .deploy();



            String bpmnXmlFilePath =
                "/Users/appasahebneelawani/Composition/Flowable/configFiles/processes/event-test1-process" +
                    ".bpmn20.xml";

            File bpmnXmlFile = new File(bpmnXmlFilePath); //Don't forget the .channel extension!

            deployment = this.repositoryService.createDeployment()
                .addInputStream(bpmnXmlFile.getName(), new FileInputStream(bpmnXmlFile))
                .deploy();
            

        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

For channel and event definition I am using the example from below link.

When I push the message from rabbitMq, I am able to start the process but the message is not getting received properly.

2024-05-03 15:22:57.854 INFO 57375 — [ main] c.n.e.f.FlowableEventRegistryApplication : Application started.
Found process definition : Process to test events
2024-05-03 15:22:58.141 INFO 57375 — [ main] c.n.e.f.FlowableEventRegistryApplication : Processes deployed: 1
2024-05-03 15:22:58.147 INFO 57375 — [ main] c.n.e.f.FlowableEventRegistryApplication : Events deployed: 3
2024-05-03 15:23:02.240 INFO 57375 — [ task-1] c.n.e.flowableeventregistry.Logger : LOG: Process eventTest1:1:ebcfe630-0932-11ef-bba6-aa6652ee0e59 activity theTask: null
2024-05-03 15:23:02.241 DEBUG 57375 — [ task-1] c.n.e.flowableeventregistry.Logger : LOG: var customer := null

It will be great help you could provide some hints.

Thanks,
Appasaheb