- Demonstrate various ways, with and without Kafka Connect, to get data into Kafka topics and then loaded for use by the Kafka Streams API
KStream
- Show some basic usage of the stream processing API
- Common demo prerequisites
- Confluent Platform 4.1
- Maven command
mvn
to compile Java code - By default the
timeout
command is available on most Linux distributions but not Mac OS. Thistimeout
command is used by the bash scripts to terminate consumer processes after a period of time. To install it on a Mac:
# Install coreutils
brew install coreutils
# Add a "gnubin" directory to your PATH
PATH="/usr/local/opt/coreutils/libexec/gnubin:$PATH"
- Command line
kafka-console-producer
producesString
keys andString
values to a Kafka topic. - Client application reads from the Kafka topic using
Serdes.String()
for both key and value.
KAFKA-2526: one cannot use the --key-serializer
argument in the kafka-console-producer
to serialize the key as a Long
. As a result, in this example the key is serialized as a String
. As a workaround, you could write your own kafka.common.MessageReader (e.g. check out the default implementation of LineMessageReader) and then you can specify --line-reader
argument in the kafka-console-producer
.
- Kafka Connect JDBC source connector produces JSON values, and inserts the key using single message transformations, also known as
SMTs
. This is helpful because by default JDBC source connector does not insert a key. - Client application reads from the Kafka topic using
Serdes.String()
for key and a custom JSON Serde for the value.
This example uses a few SMTs including one to cast the key to an int64
. The key needs a Long
Converter, and until KAFKA-6913 is resolved, it has a custom Long Converter.
- Kafka Connect JDBC source connector produces Avro values, and null
String
keys, to a Kafka topic. - Client application reads from the Kafka topic using
SpecificAvroSerde
for the value and then themap
function to convert the stream of messages to haveLong
keys and custom class values.
This example uses a simple message transformation SetSchemaMetadata
with code that has a fix for KAFKA-5164, allowing the connector to set the namespace in the schema. If you do not have the fix for KAFKA-5164, see Example 3b that uses GenericAvro
instead of SpecificAvro
.
- Kafka Connect JDBC source connector produces Avro values, and null
String
keys, to a Kafka topic. - Client application reads from the Kafka topic using
GenericAvroSerde
for the value and then themap
function to convert the stream of messages to haveLong
keys and custom class values.
This example currently uses GenericAvroSerde
and not SpecificAvroSerde
for a specific reason. JDBC source connector currently doesn't set a namespace when it generates a schema name for the data it is producing to Kafka. For SpecificAvroSerde
, the lack of namespace is a problem when trying to match reader and writer schema because Avro uses the writer schema name and namespace to create a classname and tries to load this class, but without a namespace, the class will not be found.
- Java client produces
Long
keys andSpecificAvro
values to a Kafka topic. - Client application reads from the Kafka topic using
Serdes.Long()
for key andSpecificAvroSerde
for the value.
- Kafka Connect JDBC source connector produces Avro values, and null keys, to a Kafka topic.
- KSQL reads from the Kafka topic and then uses
PARTITION BY
to create a new stream of messages withBIGINT
keys.
All examples in this repo demonstrate the Kafka Streams API methods count
and reduce
.
KAFKA-5245: one needs to provide the Serdes twice, (1) when calling KStreamBuilder#stream()
and (2) when calling KStream#groupByKey()
PR-531: Confluent distribution provides packages for GenericAvroSerde
and SpecificAvroSerde
KAFKA-2378: adds APIs to be able to embed Kafka Connect into client applications
After you run ./start.sh
:
- You should see each of the examples run end-to-end
- If you are running Confluent Enterprise, open your browser and navigate to the Control Center web interface Management -> Kafka Connect tab at http://localhost:9021/management/connect to see the two deployed connectors
- Beyond that, the real value of this demo is to see the provided configurations and client code
1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700
1|Raleigh|3
2|Dusseldorf|2
3|Moscow|3
4|Sydney|1
5|Chennai|1
1|Raleigh|1600
2|Dusseldorf|500
3|Moscow|1100
4|Sydney|200
5|Chennai|400