diff --git a/README.md b/README.md index 123199a4e..c76b79c7e 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ The documentation will be generated to: - [Saving datasets to Cassandra](doc/5_saving.md) - [Customizing the object mapping](doc/6_advanced_mapper.md) - [Using Connector in Java](doc/7_java_api.md) + - [Spark Streaming with Cassandra](doc/8_streaming.md) ## License This software is available under the [Apache License, Version 2.0](LICENSE). diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index 63ce0bb9b..e0df2831b 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -67,43 +67,6 @@ Enable Cassandra-specific functions on the `SparkContext` and `RDD`: import com.datastax.spark.connector._ ``` -### Setting up `StreamingContext` -Follow the directions above for creating a `SparkConf` - -Create a `StreamingContext`: - -```scala -val ssc = new StreamingContext(conf, Seconds(n)) -``` - -Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`: - -```scala -import com.datastax.spark.connector._ -import com.datastax.spark.connector.streaming._ -``` - -Create any of the available or custom Spark streams, for example an Akka Actor stream: - -```scala -val stream = ssc.actorStream[String](Props[SimpleStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK) -``` - -Writing to Cassandra from a Stream: - -```scala -val wc = stream.flatMap(_.split("\\s+")) - .map(x => (x, 1)) - .reduceByKey(_ + _) - .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) -``` - -Where `saveToCassandra` accepts - -- keyspaceName: String, tableName: String -- keyspaceName: String, tableName: String, columnNames: SomeColumns -- keyspaceName: String, tableName: String, columnNames: SomeColumns, batchSize: Int - ### Loading and analyzing data from Cassandra Use the `sc.cassandraTable` method to view this table as a Spark `RDD`: diff --git a/doc/8_streaming.md b/doc/8_streaming.md new file mode 100644 index 000000000..f44eceafe --- /dev/null +++ b/doc/8_streaming.md @@ -0,0 +1,91 @@ +# Documentation +## Spark Streaming with Cassandra +Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. +Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra. + +### The Basic Idea + +#### Spark Streaming +Here is a basic Spark Streaming sample which writes to the console with `wordCounts.print()`: + +Create a StreamingContext with a SparkConf configuration +```scala + val ssc = new StreamingContext(sparkConf, Seconds(1)) +``` + +Create a DStream that will connect to serverIP:serverPort +```scala + val lines = ssc.socketTextStream(serverIP, serverPort) +``` + +Count each word in each batch +```scala + val words = lines.flatMap(_.split(" ")) + val pairs = words.map(word => (word, 1)) + val wordCounts = pairs.reduceByKey(_ + _) +``` + +Print a few of the counts to the console. +Start the computation. +```scala + wordCounts.print() + ssc.start() + ssc.awaitTermination() // Wait for the computation to terminate +``` + +#### Spark Streaming With Cassandra +Now let's add the Cassandra-specific functions on the `StreamingContext` and `RDD` into scope, +and we simply replace the print to console with pipe the output to Cassandra: + +```scala + import com.datastax.spark.connector.streaming._ + wordCounts.saveToCassandra("streaming_test", "words") +``` + +### Setting up Streaming +Follow the directions for [creating a `SparkConf`](0_quick_start.md) + +#### Create A `StreamingContext` +The second required parameter is the `batchDuration` which sets the interval streaming data will be divided into batches: +Note the Spark API provides a Milliseconds, Seconds, Minutes, all of which are accepted as this `Duration`. +This `Duration` is not to be confused with the [scala.concurrent.duration.Duration](http://www.scala-lang.org/api/current/index.html#scala.concurrent.duration.Duration) + +```scala + val ssc = new StreamingContext(conf, Seconds(n)) +``` + +#### Enable Saving To Cassandra +Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`: + +```scala + import com.datastax.spark.connector.streaming._ +``` + +#### Creating A Stream and Writing to Cassandra +Create any of the available or custom Spark streams. The connector supports Akka Actor streams so far, but +will be supporting many more in the next release. You can extend the provided `import com.datastax.spark.connector.streaming.TypedStreamingActor`: + +```scala + val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK) +``` + +##### Configure and start the computation. +Where `streaming_test` is the keyspace name and `words` is the table name: + +Saving data: +```scala + val wc = stream.flatMap(_.split("\\s+")) + .map(x => (x, 1)) + .reduceByKey(_ + _) + .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) +``` + +Start the computation: +```scala + ssc.start() +``` + +For a more detailed description as well as tuning writes, see [Saving Data to Cassandra](5_saving.md). + +### Find out more +http://spark.apache.org/docs/latest/streaming-programming-guide.html \ No newline at end of file diff --git a/project/CassandraSparkBuild.scala b/project/CassandraSparkBuild.scala index 74dff77c3..861a7d281 100644 --- a/project/CassandraSparkBuild.scala +++ b/project/CassandraSparkBuild.scala @@ -31,16 +31,14 @@ object CassandraSparkBuild extends Build { lazy val connector = LibraryProject("spark-cassandra-connector", Seq(libraryDependencies ++= Dependencies.connector)) lazy val connectorJava = LibraryProject("spark-cassandra-connector-java", Seq(libraryDependencies ++= Dependencies.connector), - Seq(connector % "compile;runtime->runtime;test->test;it->it,test;provided->provided")) + Seq(connector)) - lazy val demos = Project( - id = "spark-cassandra-connector-demos", - base = file("spark-cassandra-connector-demos"), - settings = demoSettings ++ Seq(libraryDependencies ++= Dependencies.demos), - dependencies = Seq(connector, connectorJava)) + lazy val demos = LibraryProject("spark-cassandra-connector-demos", + demoSettings ++ Seq(libraryDependencies ++= Dependencies.demos), Seq(connector, connectorJava)) def LibraryProject(name: String, dsettings: Seq[Def.Setting[_]], cpd: Seq[ClasspathDep[ProjectReference]] = Seq.empty): Project = - Project(name, file(name), settings = defaultSettings ++ dsettings, dependencies = cpd) configs (IntegrationTest) + Project(name, file(name), settings = defaultSettings ++ dsettings, + dependencies = cpd.map(_.project % "compile;runtime->runtime;test->test;it->it,test;provided->provided")) configs (IntegrationTest) } diff --git a/project/Settings.scala b/project/Settings.scala index 2a32cd3b6..d0103bfb4 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -57,13 +57,8 @@ object Settings extends Build { autoAPIMappings := true ) - lazy val demoSettings = defaultSettings ++ mimaSettings ++ releaseSettings ++ Seq( - javaOptions in run ++= Seq("-Djava.library.path=./sigar","-Xms128m", "-Xmx1024m"), - scalacOptions ++= Seq("-encoding", "UTF-8", s"-target:jvm-${Versions.JDK}", "-deprecation", "-feature", "-language:_", "-unchecked", "-Xlint"), - javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", Versions.JDK, "-target", Versions.JDK, "-Xlint:unchecked", "-Xlint:deprecation"), - ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet, - parallelExecution in ThisBuild := false, - parallelExecution in Global := false + lazy val demoSettings = Seq( + javaOptions in run ++= Seq("-Djava.library.path=./sigar","-Xms128m", "-Xms2G", "-Xmx2G", "-Xmn384M", "-XX:+UseConcMarkSweepGC") ) lazy val mimaSettings = mimaDefaultSettings ++ Seq( diff --git a/spark-cassandra-connector-demos/src/main/resources/application.conf b/spark-cassandra-connector-demos/src/main/resources/application.conf new file mode 100644 index 000000000..b2bf1cb04 --- /dev/null +++ b/spark-cassandra-connector-demos/src/main/resources/application.conf @@ -0,0 +1,23 @@ +#################################### +# Streaming Demo Reference Config File # +#################################### + +spark-cassandra { + + # spark://127.0.0.1@7077,127.0.0.2@7077,127.0.0.3@7077 + # or a local spark://host@7077 + # This defaults to local + spark.master = "local[12]" + # Would normally be `ms` in config but Spark just wants the Long + spark.streaming.batch.duration = 300 + spark.cleaner.ttl = 3600 + spark.app.name = "Streaming App" + spark.cassandra.connection.host = "127.0.0.1" + spark.cassandra.keyspace = "streaming_test" + # The class that implements com.datastax.spark.connector.extension.NodeGuardian + spark.cassandra.node.guardian.class = "com.datastax.spark.connector.demo.streaming.StreamingAppNodeGuardian" +} + +streaming-demo { + data = ["words ", "may ", "count "] +} \ No newline at end of file diff --git a/spark-cassandra-connector-demos/src/main/resources/log4j.properties b/spark-cassandra-connector-demos/src/main/resources/log4j.properties new file mode 100644 index 000000000..a668b9524 --- /dev/null +++ b/spark-cassandra-connector-demos/src/main/resources/log4j.properties @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set pattern to %c instead of %l. +# (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=WARN,stdout + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Adding this to avoid thrift logging disconnect errors. +log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR + +# Avoid "no host ID found" when starting a fresh node +log4j.logger.org.apache.cassandra.db.SystemKeyspace=ERROR + +log4j.logger.com.datastax.spark.connector=INFO diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/BasicReadWriteDemo.scala b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/BasicReadWriteDemo.scala index 07b7315d3..67f4f835d 100644 --- a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/BasicReadWriteDemo.scala +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/BasicReadWriteDemo.scala @@ -3,7 +3,7 @@ package com.datastax.spark.connector.demo import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector -object BasicReadWriteDemo extends App with DemoApp { +object BasicReadWriteDemo extends DemoApp { CassandraConnector(conf).withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") @@ -16,7 +16,7 @@ object BasicReadWriteDemo extends App with DemoApp { // Read table test.kv and print its contents: val rdd = sc.cassandraTable("test", "key_value").select("key", "value") - rdd.toArray().foreach(println) + rdd.toArray().foreach(row => log.debug(s"$row")) // Write two rows to the test.kv table: val col = sc.parallelize(Seq((4, "fourth row"), (5, "fifth row"))) diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/DemoApp.scala b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/DemoApp.scala index c28d122f7..df9d7e434 100644 --- a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/DemoApp.scala +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/DemoApp.scala @@ -1,17 +1,22 @@ package com.datastax.spark.connector.demo -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{Logging, SparkContext, SparkConf} -trait DemoApp { +trait DemoApp extends App with Logging { - val sparkMasterHost = "127.0.0.1" - val cassandraHost = "127.0.0.1" + val SparkMasterHost = "127.0.0.1" + + val CassandraHost = "127.0.0.1" // Tell Spark the address of one Cassandra node: - val conf = new SparkConf(true).set("spark.cassandra.connection.host", cassandraHost) + val conf = new SparkConf(true) + .set("spark.cassandra.connection.host", CassandraHost) + .set("spark.cleaner.ttl", "3600") + .setMaster("local[12]") + .setAppName("Demo") // Connect to the Spark cluster: - val sc = new SparkContext("spark://" + sparkMasterHost + ":7077", "demo-program", conf) + lazy val sc = new SparkContext(conf) } object DemoApp { diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/README.md b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/README.md new file mode 100644 index 000000000..db0ae8409 --- /dev/null +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/README.md @@ -0,0 +1,37 @@ +# To run the demos: +Running the demos takes 3 simple steps: + +## Start Cassandra +### Using CCM +For a local cluster that's already configured: +(http://www.datastax.com/dev/blog/ccm-a-development-tool-for-creating-local-cassandra-clusters) + + sudo ccm start + +To use Apache Cassandra binaries start up Cassandra by invoking + + $CASSANDRA_HOME/bin/cassandra -f' + +## Start Spark +### Start a standalone master server by executing: + ./sbin/start-master.sh + +Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers +to it, or pass as the “master” argument to SparkContext. You can also find this URL on the master’s web UI, +which is http://localhost:8080 by default. +### Start one or more workers and connect them to the master via: + + ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT + +Once you have started a worker, look at the master’s web UI (http://localhost:8080 by default). +You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS). + +## Run the demo +From SBT, run the following on the comman line, then enter the number of the demo you wish to run: + + sbt spark-cassandra-connector-demos/run + + + +Or from an IDE, right click on a particular demo and 'run'. + diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/TableCopyDemo.scala b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/TableCopyDemo.scala index 888347e34..3ac28c760 100644 --- a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/TableCopyDemo.scala +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/TableCopyDemo.scala @@ -2,7 +2,7 @@ package com.datastax.spark.connector.demo import com.datastax.spark.connector.cql.CassandraConnector -object TableCopyDemo extends App with DemoApp { +object TableCopyDemo extends DemoApp { import com.datastax.spark.connector._ @@ -21,5 +21,5 @@ object TableCopyDemo extends App with DemoApp { src.saveToCassandra("test", "destination") val dest = sc.cassandraTable("test", "destination") - dest.toArray().foreach(println) + dest.toArray().foreach(row => log.debug(s"$row")) } diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/AkkaStreamingDemo.scala b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/AkkaStreamingDemo.scala new file mode 100644 index 000000000..5dde41e76 --- /dev/null +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/AkkaStreamingDemo.scala @@ -0,0 +1,238 @@ +package com.datastax.spark.connector.demo.streaming + +import scala.collection.immutable +import akka.actor._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.streaming.TypedStreamingActor + +/** + * This demo can run against a single node, local or remote. + * See the README for running the demos. + * 1. Start Cassandra + * 2. Start Spark: + * 3. Run the demo from SBT with: sbt spark-cassandra-connector-demos/run + * Then enter the number for: com.datastax.spark.connector.demo.streaming.AkkaStreamingDemo + * Or right click to run in an IDE + */ +object AkkaStreamingDemo extends App { + + val TableName = "words" + + /* Initialize Akka, Cassandra and Spark */ + val settings = new SparkCassandraSettings() + import settings._ + + /** Configures Spark. */ + val conf = new SparkConf(true) + .set("spark.cassandra.connection.host", CassandraSeed) + .set("spark.cleaner.ttl", SparkCleanerTtl.toString) + .setMaster(SparkMaster) + .setAppName(SparkAppName) + + /** Creates the keyspace and table in Cassandra. */ + CassandraConnector(conf).withSessionDo { session => + session.execute(s"CREATE KEYSPACE IF NOT EXISTS $CassandraKeyspace WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") + session.execute(s"CREATE TABLE IF NOT EXISTS $CassandraKeyspace.$TableName (word TEXT PRIMARY KEY, count COUNTER)") + session.execute(s"TRUNCATE $CassandraKeyspace.$TableName") + } + + /** Connect to the Spark cluster: */ + lazy val sc = new SparkContext(conf) + + /** Creates the Spark Streaming context. */ + val ssc = new StreamingContext(sc, Milliseconds(300)) + + /** Captures Spark's Akka ActorSystem. */ + lazy val sparkActorSystem = SparkEnv.get.actorSystem + + /** Creates the demo's Akka ActorSystem to easily insure dispatchers are separate and no naming conflicts. + * Unfortunately Spark does not allow users to pass in existing ActorSystems. */ + val system = ActorSystem("DemoApp") + + import akka.japi.Util.immutableSeq + val data = immutableSeq(system.settings.config.getStringList("streaming-demo.data")).toSet + + /** Creates the root supervisor of this simple Akka `ActorSystem` node that you might deploy across a cluster. */ + val guardian = system.actorOf(Props(new NodeGuardian(ssc, settings, TableName, data)), "node-guardian") + +} + +/** + * The NodeGuardian actor is the root supervisor of this simple Akka application's ActorSystem node that + * you might deploy across a cluster. + * + * Being an Akka supervisor actor, it would normally orchestrate its children and any fault tolerance policies. + * For a simple demo no policies are employed save that embedded, in the Akka actor API. + * + * Demo data for a simple but classic WordCount: + * {{{ + * val data = immutable.Set("words ", "may ", "count ") + * }}} + * + * The NodeGuardian spins up three child actors (not in this order): + * + * 1. Streamer + * A simple Akka actor which extends [[com.datastax.spark.connector.streaming.TypedStreamingActor]] and ultimately + * implements a Spark `Receiver`. This simple receiver calls + * {{{ + * Receiver.pushBlock[T: ClassTag](data: T) + * }}} + * when messages of type `String` (for simplicity of a demo), are received. This would typically be data in some + * custom envelope of a Scala case class that is Serializable. + * + * 2. Sender + * A simple Akka actor which generates a pre-set number of random tuples based on initial input `data` noted above, + * and sends each random tuple to the [[Streamer]]. The random messages are generated and sent to the stream every + * millisecond, with an initial wait of 2 milliseconds. + * + * 3. Reporter + * A simple Akka actor which when created, starts a scheduled task which runs every millisecond. This task simply + * checks whether the expected data has been successfully submitted to and stored in Cassandra. Once the successful + * assertion can be made, it signals its supervisor, the NodeGuardian, that the work is completed and expected state + * successfully verified. It does this by calling the following on the `StreamingContext` (ssc) to know when the + * expected number of entries has been streamed to Spark, and `scale` (the number of messages sent to the stream), + * computed, and saved to Cassandra: + * {{{ + * val rdd = ssc.cassandraTable[WordCount](keyspaceName, tableName).select("word", "count") + * rdd.collect.nonEmpty && rdd.map(_.count).reduce(_ + _) == scale * 2 + * }}} + * + * Where `data` represents the 3 words we computed, we assert the expected three columns were created: + * {{{ + * assert(rdd.collect.length == data.size) + * }}} + * + *@param ssc the Spark `StreamingContext` + * + * @param settings the [[SparkCassandraSettings]] from config + * + * @param tableName the Cassandra table name to use + * + * @param data the demo data for a simple WordCount + */ +class NodeGuardian(ssc: StreamingContext, settings: SparkCassandraSettings, tableName: String, data: immutable.Set[String]) + extends Actor with Logging { + + import scala.concurrent.duration._ + import akka.util.Timeout + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions + import com.datastax.spark.connector._ + import InternalStreamingEvent._ + import settings._ + import context.dispatcher + + implicit val timeout = Timeout(5.seconds) + + private val actorName = "stream" + + private val sas = SparkEnv.get.actorSystem + + private val path = ActorPath.fromString(s"$sas/user/Supervisor0/$actorName") + + private val reporter = context.actorOf(Props(new Reporter(ssc, CassandraKeyspace, tableName, data)), "reporter") + + private val stream = ssc.actorStream[String](Props[Streamer], actorName, StorageLevel.MEMORY_AND_DISK) + + /* Defines the work to do in the stream. Placing the import here to explicitly show + that this is where the implicits are used for the DStream's 'saveToCassandra' functions: */ + import com.datastax.spark.connector.streaming._ + + private val wc = stream.flatMap(_.split("\\s+")) + .map(x => (x, 1)) + .reduceByKey(_ + _) + .saveToCassandra("streaming_test", "words", SomeColumns("word", "count"), 1) + + /** Once the stream and sender actors are created, the spark stream's compute configured, the `StreamingContext` is started. */ + ssc.start() + log.info(s"Streaming context started.") + + /* Note that the [[Streamer]] actor is in the Spark actor system. We watch it from the demo + application's actor system. The [[Sender]] will send data to the [[Streamer]] actor. */ + for (actor <- sas.actorSelection(path).resolveOne()) { + + /** For the purposes of the demo, we put an Akka DeathWatch on the stream actor, because this actor stops itself once its + * work is `done` (again, just for a simple demo that does work and stops once expectations are met). */ + context.watch(actor) + + /** Then we inject the [[Sender]] actor with the [[Streamer]] actor ref so it can easily send data to the stream. */ + context.actorOf(Props(new Sender(data.toArray, actor))) + } + + def receive: Actor.Receive = { + /** Akka DeathWatch notification that `ref`, the [[Streamer]] actor we are watching, has terminated itself. + * We message the [[Reporter]], which triggers its scheduled validation task. */ + case Terminated(ref) => reporter ! Report + + /** NodeGuardian actor receives confirmation from the [[Reporter]] of a successful validation. + * We trigger a system shutdown of the Akka node, which calls `shutdown()`. */ + case Completed => shutdown() + } + + /** Stops the ActorSyste, the Spark `StreamingContext` and its underlying Spark system. */ + def shutdown(): Unit = { + import scala.concurrent.{Future, Await} + + log.info(s"Stopping '$ssc' and shutting down.") + context.system.shutdown() + Await.result(Future(context.system.isTerminated), 2.seconds) + ssc.stop(true) + } + +} + +/** Simply showing what the streaming actor does for the sake of the demo. It is a + * `org.apache.spark.streaming.receivers.Receiver`. This receiver tracks the number + * of blocks of data pushed to Spark so that the demo can shut down once we assert + * the expected data has been saved to Cassandra. + * + * The additional behavior of a Counter simply supports the demo shutdown once + * the Stream has sent all the randomly generated data to the Spark `DStream` for + * processing. Once completed, the [[Streamer]] triggers an Akka DeathWatch by + * {{{ self ! PoisonPill }}} + * + * {{{trait CounterActor extends Actor with Logging { + * protected val scale = 30 + * private var count = 0 + * + * protected def increment(): Unit = { + * count += 1 + * if (count == scale) self ! PoisonPill + * } + * } }}} + */ +class Streamer extends TypedStreamingActor[String] with CounterActor { + + override def push(e: String): Unit = { + super.push(e) + increment() + } +} + +/** A simple Akka actor which generates a pre-set number of random tuples based on initial input + * `data`, and sends each random tuple to the [[Streamer]]. The random messages are generated + * and sent to the stream every millisecond, with an initial wait of 2 milliseconds. */ +class Sender(val data: Array[String], val to: ActorRef) extends Actor { + import context.dispatcher + +import scala.concurrent.duration._ + + private val rand = new scala.util.Random() + + val task = context.system.scheduler.schedule(2.second, 1.millis) { + to ! createMessage() + } + + override def postStop(): Unit = task.cancel() + + def createMessage(): String = { + val x = rand.nextInt(3) + data(x) + data(2 - x) + } + + def receive: Actor.Receive = { + case _ => + } +} diff --git a/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/StreamingDemo.scala b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/StreamingDemo.scala new file mode 100644 index 000000000..bb37dddd7 --- /dev/null +++ b/spark-cassandra-connector-demos/src/main/scala/com/datastax/spark/connector/demo/streaming/StreamingDemo.scala @@ -0,0 +1,132 @@ +package com.datastax.spark.connector.demo.streaming + +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.actor.{Actor, PoisonPill} +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{Logging, SparkEnv} +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.demo.DemoApp + +/** + * Creates the `org.apache.spark.streaming.StreamingContext` then write async to the stream. + * This is the base for all streaming demos. + */ +trait StreamingDemo extends DemoApp { + + val keyspaceName = "streaming_test" + + val tableName = "words" + + val data = immutable.Set("words ", "may ", "count ") + + CassandraConnector(conf).withSessionDo { session => + session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspaceName WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") + session.execute(s"CREATE TABLE IF NOT EXISTS $keyspaceName.$tableName (word TEXT PRIMARY KEY, count COUNTER)") + session.execute(s"TRUNCATE $keyspaceName.$tableName") + } + + val ssc = new StreamingContext(sc, Milliseconds(300)) + + lazy val sparkActorSystem = SparkEnv.get.actorSystem + +} + + +/* Initializes Akka, Cassandra and Spark settings. */ +final class SparkCassandraSettings(rootConfig: Config) { + def this() = this(ConfigFactory.load) + + protected val config = rootConfig.getConfig("spark-cassandra") + + val SparkMaster: String = config.getString("spark.master") + + val SparkAppName: String = config.getString("spark.app.name") + + val SparkCleanerTtl: Int = config.getInt("spark.cleaner.ttl") + + val CassandraSeed: String = config.getString("spark.cassandra.connection.host") + + val CassandraKeyspace = config.getString("spark.cassandra.keyspace") + + val SparkStreamingBatchDuration: Long = config.getLong("spark.streaming.batch.duration") +} + +trait CounterActor extends Actor with Logging { + + protected val scale = 30 + + private var count = 0 + + protected def increment(): Unit = { + count += 1 + if (count == scale) self ! PoisonPill + } +} + +private[demo] object InternalStreamingEvent { + sealed trait Status + case class Pushed(data: AnyRef) extends Status + case object Completed extends Status + case object Report extends Status + case class WordCount(word: String, count: Int) +} + +/** When called upon, the Reporter starts a task which checks at regular intervals whether + * the produced amount of data has all been written to Cassandra from the stream. This allows + * the demo to stop on its own once this assertion is true. It will stop the task and ping + * the `NodeGuardian`, its supervisor, of the `Completed` state. + */ +class Reporter(ssc: StreamingContext, keyspaceName: String, tableName: String, data: immutable.Set[String]) extends CounterActor { + import akka.actor.Cancellable + import com.datastax.spark.connector.streaming._ + import InternalStreamingEvent._ + import context.dispatcher + + private var task: Option[Cancellable] = None + + def receive: Actor.Receive = { + case Report => report() + } + + def done: Actor.Receive = { + case Completed => complete() + } + + def report(): Unit = { + task = Some(context.system.scheduler.schedule(Duration.Zero, 1.millis) { + val rdd = ssc.cassandraTable[WordCount](keyspaceName, tableName).select("word", "count") + if (rdd.collect.nonEmpty && rdd.map(_.count).reduce(_ + _) == scale * 2) { + context.become(done) + self ! Completed + } + }) + } + + def complete(): Unit = { + task map (_.cancel()) + val rdd = ssc.cassandraTable[WordCount](keyspaceName, tableName).select("word", "count") + assert(rdd.collect.length == data.size) + log.info(s"Saved data to Cassandra.") + context.parent ! Completed + } +} + +/** + * TODO + * {{{ + * val stream: ReceiverInputDStream[(String, String)] = + * KafkaUtils.createStream(ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + * }}} + */ +trait KafkaStreamingDemo extends StreamingDemo + +/** + * TODO + * ZeroMQ + * {{{ + * val stream: ReceiverInputDStream[String] = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + * }}} + */ +trait ZeroMQStreamingDemo extends StreamingDemo diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/ActorStreamSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/ActorStreamSpec.scala index a276350a3..c806aebc2 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/ActorStreamSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/ActorStreamSpec.scala @@ -2,12 +2,12 @@ package com.datastax.spark.connector.streaming import akka.actor.{Props, Terminated, ActorSystem} import akka.testkit.TestKit -import com.datastax.spark.connector.SomeColumns import org.apache.spark.SparkEnv import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.{Milliseconds, StreamingContext} import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.testkit._ class ActorStreamingSpec extends ActorSpec with CounterFixture { @@ -22,7 +22,7 @@ class ActorStreamingSpec extends ActorSpec with CounterFixture { "actorStream" must { "write from the actor stream to cassandra table: streaming_test.words" in { - val stream = ssc.actorStream[String](Props[SimpleStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK) + val stream = ssc.actorStream[String](Props[TestStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK) val wc = stream.flatMap(_.split("\\s+")) .map(x => (x, 1)) @@ -41,13 +41,22 @@ class ActorStreamingSpec extends ActorSpec with CounterFixture { expectMsgPF(duration) { case Terminated(ref) => val rdd = ssc.cassandraTable[WordCount]("streaming_test", "words").select("word", "count") - awaitCond(rdd.toArray().nonEmpty && rdd.map(_.count).reduce(_ + _) == scale * 2) - rdd.toArray().length should be (data.size) + awaitCond(rdd.collect.nonEmpty && rdd.map(_.count).reduce(_ + _) == scale * 2) + rdd.collect.length should be (data.size) } } } } +/** A very basic Akka actor which streams `String` event data to spark. */ +class TestStreamingActor extends TypedStreamingActor[String] with Counter { + + override def push(e: String): Unit = { + super.push(e) + increment() + } +} + abstract class ActorSpec(val ssc: StreamingContext, _system: ActorSystem) extends TestKit(_system) with StreamingSpec { diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/StreamingSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/StreamingSpec.scala index 7a2874c5f..13dc47b55 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/StreamingSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/streaming/StreamingSpec.scala @@ -1,7 +1,5 @@ package com.datastax.spark.connector.streaming -import scala.concurrent.duration._ -import akka.actor.Actor import com.datastax.spark.connector.testkit._ /** @@ -35,6 +33,7 @@ import com.datastax.spark.connector.testkit._ */ trait StreamingSpec extends AbstractSpec with CassandraServer with SparkCassandraFixture { import org.apache.spark.streaming.StreamingContext + import scala.concurrent.duration._ val duration = 10.seconds @@ -48,16 +47,4 @@ trait StreamingSpec extends AbstractSpec with CassandraServer with SparkCassandr } } -/** A very basic Akka actor which streams `String` event data to spark. */ -private [streaming] class SimpleStreamingActor extends SparkStreamingActor with Counter { - - def receive: Actor.Receive = { - case e: String => push(e) - } - - def push(e: String): Unit = { - pushBlock(e) - increment() - } -} diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/streaming/StreamingContextFunctions.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/streaming/StreamingContextFunctions.scala index 49e861176..7ff652361 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/streaming/StreamingContextFunctions.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/streaming/StreamingContextFunctions.scala @@ -6,6 +6,8 @@ import com.datastax.spark.connector.rdd.reader.RowReaderFactory import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.receivers.Receiver +import scala.reflect.ClassTag + /** Provides Cassandra-specific methods on `org.apache.spark.streaming.StreamingContext`. * @param ssc the Spark Streaming context */ @@ -21,4 +23,14 @@ class StreamingContextFunctions (ssc: StreamingContext) extends SparkContextFunc /** Simple akka.actor.Actor mixin to implement further with Spark 1.0.1 upgrade. */ trait SparkStreamingActor extends Actor with Receiver +abstract class TypedStreamingActor[T : ClassTag] extends SparkStreamingActor { + + def receive: Actor.Receive = { + case e: T => push(e) + } + + def push(event: T): Unit = + pushBlock(event) + +}