Created by gh-md-toc
This is the code repo containing examples from the "Multiple Events in one topic" presentation at the Kafka Summit-Europe 2021
We are going to run all the examples in Confluent! So before starting with the code, you'll need to make sure you have a few things set-up.
First, if you don't already have an account on Confluent go ahead and set one up now. Next you'll need to install the Confluent CLI. With these two steps out of the way, you're all set!
For the Kafka broker and Schema Registry instances you're going to use the ccloud-stack
utility to get everything up and running.
The open source library ccloud_library.sh has functions for interacting
with Confluent, including ccloud-stack
.
Run this command to get ccloud_library.sh
:
wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh
With that done, let's create the stack of Confluent resources:
EXAMPLE=schema-registry-kafka-summit-2021-emea
ccloud::create_ccloud_stack
NOTE: Make sure you destroy all resources when the workshop concludes.
The create
command generates a local config file, java-service-account-NNNNN.config
when it completes. The NNNNN
represents the service account id.
Let's take a quick look at the file:
cat stack-configs/java-service-account-*.config
You should see something like this:
# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>
We'll use these properties for connecting the KafkaProducer
, KafkaConsumer
, and Kafka Streams
applications to the
brokers and Schema Registry on Confluent.
To get started running any code you'll need to configure the required properties. In the src/main/resources
directory
you'll find a file named config.properties.orig
that contains some required configuration settings.
Run this command to make a copy of the file:
cat src/main/resources/config.properties.orig > src/main/resources/config.properties
Then run this command to get the required Confluent configurations into the properties files:
cat stack-configs/java-service-account-*.config >> src/main/resources/config.properties
We need to take this step as the configuration file contains the key and password that the code will use to
interact with the broker and Schema Registry on Confluent.
As a result this repo ignores the config.properties
file, so it should never get checked in!
NOTE: It's important that you complete this step as directed step number 5 relies on having the src/main/resources/config.properties
properly set-up.
Now you need to create topics for the examples to use. Using the properties you've copied over in a previous step,
run this gradle command:
NOTE: If you don't have Gradle installed, run gradle wrapper
first
./gradlew createTopics
You'll see some output on the console concluding with something like:
Created all topics successfully
Under the covers, the gradle command executes the main method of io.confluent.developer.Topics
. Topics
is helper class
that uses the Admin interface to create topics
on your brokers in Confluent. You can log into Confluent now and inspect the topics through the UI now. Keep the Confluent
UI open as you'll need it in the next step.
Next you'll need to register some schemas. If you recall from the presentation, when you have an Avro schema where the
top level element is a union
you need to register the individual schemas in the union first.
Then you'll register the container schema itself along with references to the schemas making up the union element.
Fortunately, the gradle SR plugin makes this easy for us. Here's the configuration in the build.gradle
file:
register {
subject('page-view', 'src/main/avro/page_view.avsc', 'AVRO')
subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
subject('avro-events-value', 'src/main/avro/all_events.avsc', 'AVRO')
.addReference("io.confluent.developer.avro.PageView", "page-view", 1)
.addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
}
Now to register these schemas, run this in the command line:
./gradlew registerSchemasTask
Under the covers, the registerSchemasTask
uses the config.properties
file to set the
Schema Registry endpoint, username, and password on Confluent.
This task runs quickly, and you should see some text followed by this result in the console:
Build Successful
You don't need to register schemas for the Protobuf example as you'll run the producer with auto.commit=true
and Protobuf
will recursively register any proto files included in the main schema. For the second Avro example, which uses a record to
wrap the union
, you've already registered the referenced schemas, and we can use auto-registration on the "wrapper" record.
Using the Confluent UI you opened in the previous step, you can view the uploaded schemas by clicking in the Schema Registry
tab and click on the individual schemas to inspect them.
Now let's produce some records to Confluent brokers. The io.confluent.developer.clients.DataProducer
class runs three
KafkaProducer
clients serially, producing records to the avro-events
, avro-wrapped-events
, and proto-event
topics.
Each producer sends different event types to each topic in either Avro or Protobuf format.
To produce the records, run this command:
./gradlew runProducer
You should see a few statements as each producer sends records to the Confluent brokers
Next, let's take a look at the io.confluent.developer.clients.MultiEventConsumer
class. The MultiEventConsumer
runs three
KafkaConsumer
instances, serially, and prints some details about the consumed records to the console. The point of this
example shows one possible approach to working with multiple event-types in a single topic.
To run the consumer execute:
./gradlew runConsumer
Then you should see some details about each record in the console
Last, but not least, we have a basic Kafka Streams application demonstrating one possible approach to handling multiple event types from a topic.
The Kafka Streams app, io.confluent.developer.streams.MultiEventKafkaStreamsExample
uses a ValueTransformerWithKey to pull
out details from each record and build up new record type, a CustomerInfo
object.
To run the streams application use this command:
./gradlew runKafkaStreamsExample
After a few seconds, you'll see some details on the console about the new CustomerInfo
record created by extracting
fields from the different event types coming from the source topic. After you see the print statements on the console,
enter a CTRL+C
to stop the streams application.
This concludes the demo from the presentation. Please stick around and view the code and schema files. Play around and experiment some to get a feel for working with schema references and multi-event topics.
Make sure you tear down the Confluent resources you created at the begging of this mini-tutorial by running:
ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID
Where the $SERVICE_ACCOUNT_ID
is the number on the java-service-account-NNNNNNN.config
file.