This example demonstrates the following features in Kafka Streams along with an HTTP based interactive query service:
- Data ingestion
- Data transformation using a Kafka Streams DSL-based implementation
- Managing local state with key-value stores
- Interactive query service with HTTP end points
The implementation is based on the ClarkNet dataset, which has to be downloaded in a local folder.
By default the application runs through an embedded local Kafka Server. In case you want to run separate instances of Kafka and Zookeeper servers, change kafka.localserver
to false
in application.conf
.
To run the application, do the following steps.
This example application depends on kafka-streams-scala and kafka-streams-query. Ensure that you have the proper versions of these libraries in your classpath. Note that in this example Scala 2.12.4 and Kafka 1.0.0 are used.
If you've made local changes kafka-streams-query
then you'll need to publish them to your local ivy repository using sbt publishLocal
from within the ./lib/
directory.
This is only required if the setting of
kafka.localserver
isfalse
inapplication.conf
. If this is set totrue
, the application runs with an embedded local Kafka server. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.
Start ZooKeeper and Kafka, if not already running. You can download Kafka 1.0.0 for Scala 2.12 here, then follow the Quick Start instructions for running ZooKeeper and Kafka, steps 1 and 2.
Download the ClarkNet dataset and put it in a convenient local folder.
Copy src/main/resources/application-dsl.conf.template
to src/main/resources/application-dsl.conf
.
Edit src/main/resources/application-dsl.conf
and set the entry for directorytowatch
to match the folder name where you installed the ClarkNet dataset.
And note that you can run the application with a bundled local Kafka server by setting kafka.localserver
to true
in the application.conf
file.
This is only required if the setting of
kafka.localserver
isfalse
inapplication.conf
. If this is set totrue
, the application runs with an embedded local Kafka server and creates all necessary topics on its own. However, note that if you want to run the application in a distributed mode (see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server. If you're running in distributed mode then topics should have more than 1 partition.
Create the topics using the kafka-topics.sh
command that comes with the Kafka distribution. We'll refer to the directory where you installed Kafka as $KAFKA_HOME
. Run the following commands:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic logerr-dsl
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic server-log-dsl
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic processed-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic summary-access-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic windowed-summary-access-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic summary-payload-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic windowed-summary-payload-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic avro-topic
Now run the application as follows:
$ sbt
> clean
> compile
> dsl
This will start the application. Now you can query on the global state using curl
:
$ ## The example application has a timer to `touch` the files in the watched
$ ## directory 1 minute after the app starts to trigger the streaming to begin. Touch
$ ## the ClarkNet dataset again, or add new files, to stream more entries.
$
$ ## Fetch the number of accesses made to the host world.std.com as per the downloaded
$ ## data file
$ curl http://localhost:7070/weblog/access/world.std.com
15
$
$ ## If you specify ALL as the key-name then it will fetch a list of all key-values
$ ## from all the stores that has the access information with the same application id
$ curl http://localhost:7070/weblog/access/ALL
[["204.249.225.59",1],["access9.accsyst.com",2],["cssu24.cs.ust.hk",1],["cyclom1-1-6.intersource.com",1],["d24-1.cpe.Brisbane.aone.net.au",1],["er6.rutgers.edu",1],["world.std.com",3]]
$
$ ## If you specify COUNT as the key-name then it will fetch the sum of count of all
$ ## approximate number of entries from all the stores that has the access information
$ ## with the same application id
$ curl http://localhost:7070/weblog/access/COUNT
7
$ ## Query access counts by a range of keys. The "from" key must be less than the "to"
$ ## key. For example, "a.com" < "z.org"
$ curl http://localhost:7070/weblog/access/range/a.com/z.org
[["access9.accsyst.com",4],["cssu24.cs.ust.hk",2],["cyclom1-1-6.intersource.com",2],["d24-1.cpe.Brisbane.aone.net.au",2],["er6.rutgers.edu",2],["reddit.com",2],["world.std.com",6]]
$
$ ## Query a time window for a key. The "from" and "to" parameters must be represented
$ ## as a milliseconds since epoch long number. The "from" time must be less than the
$ ## "to time. Stream elements are windowed using ingest time and not event time. For
$ ## example, get all time windows for world.std.com from epoch 0 to current epoch.
$ curl http://localhost:7070/weblog/access/win/world.std.com/0/$(date +%s%3N)
[[1517518200000,6],[1517518260000,3]]
$ ##
$ ## Fetch the number of bytes in the reply for queries to the host
$ ## world.std.com as per the downloaded data file
$ curl http://localhost:7070/weblog/bytes/world.std.com
124532
The http query layer is designed to work even when your application runs in the distributed mode. Running your Kafka Streams application in the distributed mode means that all the instances must have the same application id.
In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set
kafka.localserver
tofalse
to enable this setting.
Here are the steps that you need to follow to run the application in distributed mode. We assume here you are running both the instances in the same node with different port numbers. It's fairly easy to scale this on different nodes.
$ sbt
> dslPackage/universal:packageZipTarball
This creates a distribution under a folder <project home>/build
.
$ pwd
<project home>
$ cd build/dsl/target/universal
$ ls
dslpackage-0.0.1.tgz
## unpack the distribution
$ tar xvfz dslpackage-0.0.1.tgz
$ cd dslpackage-0.0.1
$ ls
bin conf lib
$ cd conf
$ ls
application.conf logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/dslpackage-0.0.1
Ensure the following:
- Zookeeper and Kafka are running
- All topics mentioned above are created with more than 1 partition
- The folder mentioned in
directoryToWatch
inapplication.conf
has the data file
$ pwd
<...>/dslpackage-0.0.1
$ bin/dslpackage
This starts the single instance of the application. After some time you will see data printed in the console regarding the host access information as present from the data file.
In the log file, created under <...>/dslpackage-0.0.1/logs
, check if the REST service has started and note the host and port details. It should be something like localhost:7070
(the default setting in application.conf
).
If you decide to run multiple instances of the application you may choose to split the dataset into 2 parts and keep them in different folders. Also you need to copy the current distribution in some other folder and start the second instance from there, since you need to run it with changed settings in application.conf
. Say we want to copy in a folder named clarknet-2
.
$ cp <project home>/build/dsl/target/universal/dslpackage-0.0.1.tgz clarknet-2
$ cd clarknet-2
$ tar xvfz dslpackage-0.0.1.tgz
## unpack the distribution
$ cd dslpackage-0.0.1
$ ls
bin conf lib
$ cd conf
$ ls
application.conf logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/dslpackage-0.0.1
The following settings need to be changed in application.conf
before you can run the second instance:
dcos.kafka.statestoredir
- This is the folder where the local state information gets persisted by Kafka streams. This has to be different for every new instance set up.dcos.kafka.loader.directorytowatch
- The data folder because we would like to ingest different data for the 2 instances.dcos.http.interface
anddcos.http.port
- The REST service endpoints. If the node is not different then it can belocalhost
for both.
$ pwd
<...>/dslpackage-0.0.1
$ bin/dslpackage
This will start the second instance. Check the log file to verify that the REST endpoints are properly started.
The idea of a distributed interactive query interface is to allow the user to query for all keys using any of the end points where the REST service are running. Assume that the 2 instances are running at localhost:7070
and localhost:7071
.
Here are a few examples:
## world.std.com was loaded by the first instance of the app
## Query using the end points corresponding to the first instance gives correct result
$ curl localhost:7070/weblog/access/world.std.com
14
## we get correct result even if we query using the end points of of the second instance
$ curl localhost:7071/weblog/access/world.std.com
14
## ppp19.glas.apc.org was loaded by the second instance of the app
## Query using the end points corresponding to the first instance also gives correct result
$ curl localhost:7070/weblog/access/ppp19.glas.apc.org
17
When running in distributed mode, Kafka Streams event stores are backed by internal Kafka Streams topics so that state
can be restored on different instances of the app if there's a failure. To reset to a clean state you can use the
Kafka Streams Application Reset tool
This will delete internal Kafka Streams topics associated with a specified application id. Note that you must have
delete.topics.enable
set to true in your Broker configuration to delete topics.
An example of run this tool:
$ ./kafka-streams-application-reset.sh \
--application-id kstream-weblog-processing \
--bootstrap-servers kafka-0-broker:9092 \
--zookeeper localhost:2181
No input or intermediate topics specified. Skipping seek.
Deleting all internal/auto-created topics for application kstream-weblog-processing
Topic kstream-weblog-processing-windowed-access-count-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-payload-size-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-access-count-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-payload-size-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-payload-size-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-access-count-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-payload-size-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-access-count-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Done.