To curtail any message delivery semantics debate please heed that this project implements exactly-once as defined by Kafka's documentation. I am aware of why you can't technically have exactly-once, so if you like, you can think of this as at least once with deduplication assuming guaranteed delivery order of messages in a partition.
Exactly Once semantics with Kafka and Spark Streaming can be implemented as the Kafka documentation recommends: by storing the partition and offset with the data once it's been processed, and then manage the topic's partition and offset on the event of failure/restart. On startup it will query Cassandra for the last offset for each partition and then begin processing from the next offset. To maintain Exactly Once across a whole distributed infrastructure takes some work as you have to manage partition and offset each step of the way, but it can be done fairly easily if designed up front.
- Install
docker
anddocker-compose
(developed against version 1.11.1 and 1.6.2 respectively). NOTE: When on Linux add yourself to the docker group so you don't requiresudo
to execute these commands. - Confirm
ADVERTISED_HOST
environment variable matches your docker host IP inkafka
service located in./docker/sbt-docker-compose.yml
. On Linux you can find out this IP with the commandifconfig docker0
. On MacOSX you will need to use the docker-machine IP which can be found with the commanddocker-machine ip
.
Now you can bring the docker services up and perform ad hoc operations against them, or run the automated tests.
This project uses the sbt-docker-compose
plugin to make it easier to run automated integration tests using services defined in docker/sbt-docker-compose.yml
.
To start up the services for the project.
sbt dockerComposeUp
To stop:
sbt dockerComposeStop
To apply the schema to the Cassandra instance you can execute cqlsh
in the container to get a shell. Then you can define the keyspace and tables. Use docker ps
or inspect the results of sbt dockerComposeUp
to get the container ID.
docker exec -it [CONTAINER_ID] cqlsh
Define the keyspace and tables:
DROP KEYSPACE IF EXISTS exactlyonce;
CREATE KEYSPACE IF NOT EXISTS exactlyonce WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE IF NOT EXISTS exactlyonce.events (
id bigint,
ts timestamp,
partition int,
offset bigint,
source text,
client text,
health int,
PRIMARY KEY (partition, offset)
) WITH CLUSTERING ORDER BY (offset DESC);
CREATE TABLE IF NOT EXISTS exactlyonce.alerts (
ts timestamp,
source text,
event_count int,
PRIMARY KEY (source, ts));
TRUNCATE exactlyonce.events;
TRUNCATE exactlyonce.alerts;
To run automated tests (make sure services are stopped before you do this)
sbt dockerComposeTest
- Bring up docker services
sbt dockerComposeUp
- Create C* schema (schema is above)
- Create a run configuration in IntelliJ: Run -> Edit Configurations -> "Add New Configuration" (+) -> Application -> Enter (including double quotes):
"run-main com.seglo.learningspark.exactlyonce.EventStreamingApp"
-> Add VM parameter:-Dspark.master=local[*] -Dspark.cassandra.connection.host=<DOCKER-HOST-IP>
-> Add Program argument<DOCKER-HOST-IP>:9092
- Run EventStreamingApp using configuration created in step 3 and TestMessageGenerator in parallel
- Stop docker services
sbt dockerComposeStop
- Build a fat jar:
sbt assembly
rsync
to a node on the cluster with spark client utilities. Ex)
rsync -v -e "ssh -i ~/.ssh/myprivatekey.pem" -a ./target/scala-2.10/exactlyonce-1.0.jar centos@[HOST/IP OF SPARK CLIENT]:/jobs/
- Create C* schema (schema is above)
- Create Kafka topic: exactlyonce.events Kafka topic
./bin/kafka-topics.sh --create --topic exactlyonce.events --partitions 3 --replication-factor 3 --zookeeper <ZOOKEEPER-IP>:2181
- Run
spark-submit
with the job.
./bin/spark-submit --class com.seglo.learningspark.exactlyonce.EventStreamingApp --master yarn-client --conf spark.cassandra.connection.host=<CASSANDRA-SEED-IP> /demo/exactlyonce-1.0.jar <KAFKA-BROKER-IP>:6667
- Start
TestMessageGenerator
to produce simulated events onto Kafka
java -cp ./exactlyonce-1.0.jar com.seglo.learningspark.exactlyonce.TestMessageGenerator <KAFKA-BROKER-IP>:6667 exactlyonce.events
CMD: ./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m --jars /jobs/exactlyonce-1.0.jar
NOTES: Don't pass in the cassandra-connector package because it will conflict with hadoop libs on the guava dep. Instead, just pass the "fat" jar created with the sbt assembly plugin and then pass the automatically created SparkContext (sc) into SparkDemoApp.testRdd(sc) to return a CassandraRdd
Run docker-compose (-d to detach). NOTE: This will not work with ./docker/sbt-docker-compose.yml
docker-compose up -d
To attach back to containers to see output/logs:
docker-compose logs
Run cqlsh on the cassandra container.
docker exec -it [CONTAINER_ID] cqlsh
Run kafka-console-consumer on kafka
docker exec -it [CONTAINER_ID] /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Open bash shell
docker exec -it [CONTAINER_ID] /bin/bash
Publish the modified spotify/kafka image (upgraded kafka to 0.9.0.1). sbt-docker-compose doesn't support build:
param, so to use it we have to build and publish it locally first.
docker build -t kafka09 ./demo/docker/containers/kafka09/kafka