Skip to content

Latest commit

 

History

History
 
 

kogito-kafka-quickstart-quarkus

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

Kogito with Kafka

Description

A quickstart project that deals with traveller processing carried by rules. It illustrates how easy it is to make the Kogito processes and rules to work with Apache Kafka

This example shows

  • consuming events from a Kafka topic and for each event start new process instance
  • each process instance is expecting a traveller information in JSON format
  • traveller is then processed by rules and based on the outcome of the processing (processed or not) traveller is
    • if successfully processed traveller information is logged and then updated information is send to another Kafka topic
    • if not processed traveller info is logged and then process instance finishes without sending reply to Kafka topic

  • Diagram Properties (top)

  • Diagram Properties (bottom)

  • Diagram Properties (process variables)

  • Start Message

  • Start Message (Assignments)

  • Process Traveler Business Rule (top)

  • Process Traveler Business Rule (bottom)

  • Process Traveler Business Rule (Assignments)

  • Process Traveler Gateway

  • Process Traveler Gateway Yes Connector

  • Process Traveler Gateway No Connector

  • Log Traveler Script Task

  • Skip Traveler Script Task

  • Processed Traveler End Message

  • Processed Traveler End Message (Assignments)

  • Skip Traveler End

Infrastructure requirements

This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost.

  • Install and Startup Kafka Server / Zookeeper

https://kafka.apache.org/quickstart

In addition to that two topics are needed

  • travellers
  • processedtravellers
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic travellers
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic processedtravellers

These topics are expected to be without key

Build and run

Prerequisites

You will need:

  • Java 11+ installed
  • Environment variable JAVA_HOME set accordingly
  • Maven 3.6.2+ installed

When using native image compilation, you will also need:

  • GraalVM 19.3+ installed
  • Environment variable GRAALVM_HOME set accordingly
  • GraalVM native image needs as well native-image extension: https://www.graalvm.org/docs/reference-manual/native-image/
  • Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too, please refer to GraalVM installation documentation for more details.

Compile and Run in Local Dev Mode

mvn clean package quarkus:dev    

NOTE: With dev mode of Quarkus you can take advantage of hot reload for business assets like processes, rules, decision tables and java code. No need to redeploy or restart your running application.

Compile and Run using Local Native Image

Note that this requires GRAALVM_HOME to point to a valid GraalVM installation

mvn clean package -Pnative

To run the generated native executable, generated in target/, execute

./target/kogito-kafka-quickstart-{version}-runner

Use the application

To make use of this application it is as simple as putting a message on travellers topic with following content (cloud event format)

  • To examine ProcessedTravellers topic and verify upcoming messages will be processed

Execute in a separate terminal session

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processedtravellers 
  • Send message that should be processed to Topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers

Content (cloud event format)

{
  "specversion": "0.3",
  "id": "21627e26-31eb-43e7-8343-92a696fd96b1",
  "source": "",
  "type": "TravellersMessageDataEvent_3", 
  "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]",
  "data": { 
	"firstName" : "Jan", 
	"lastName" : "Kowalski", 
	"email" : "[email protected]", 
	"nationality" : "Polish"
	}
}

One liner

{"specversion": "0.3","id": "21627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "TravellersMessageDataEvent_3", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "Jan", "lastName" : "Kowalski", "email" : "[email protected]", "nationality" : "Polish"}}

this will then trigger the successful processing of the traveller and put another message on processedtravellers topic with following content (cloud event format)

{
  "specversion": "0.3",
  "id": "86f69dd6-7145-4188-aeaa-e44622eeec86",
  "source": "",
  "type": "TravellersMessageDataEvent_3", 
  "time": "2019-10-03T16:22:40.373523+02:00[Europe/Warsaw]",
  "data": {
    "firstName": "Jan",
    "lastName": "Kowalski",
    "email": "[email protected]",
    "nationality": "Polish",
    "processed": true
  },
  "kogitoProcessinstanceId": "4fb091c2-82f7-4655-8687-245a4ab07483",
  "kogitoParentProcessinstanceId": null,
  "kogitoRootProcessinstanceId": null,
  "kogitoProcessId": "Travellers",
  "kogitoRootProcessId": null,
  "kogitoProcessinstanceState": "1",
  "kogitoReferenceId": null
}

there are bunch of extension attributes that starts with kogito to provide some context of the execution and the event producer.

To take the other path of the process put following message on travellers topic

  • Send Message that should be skipped to Topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers

With the following content (Cloud Event Format)

{
  "specversion": "0.3",
  "id": "31627e26-31eb-43e7-8343-92a696fd96b1",
  "source": "",
  "type": "TravellersMessageDataEvent_3", 
  "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]",
  "data": { 
	"firstName" : "John", 
	"lastName" : "Doe", 
    "email" : "[email protected]", 
    "nationality" : "American"
	}
}

One Liner

{"specversion": "0.3","id": "31627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "TravellersMessageDataEvent_3", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "John", "lastName" : "Doe", "email" : "[email protected]", "nationality" : "American"}}

this will not result in message being send to processedtravelers topic.