Skip to content

Commit

Permalink
#115 Added streaming demo with documentation, new streaming page to d…
Browse files Browse the repository at this point in the history
…ocs, new README for running all demos.
  • Loading branch information
Helena Edelson committed Aug 19, 2014
1 parent 9ad7682 commit 6a6c869
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 79 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The documentation will be generated to:
- [Saving datasets to Cassandra](doc/5_saving.md)
- [Customizing the object mapping](doc/6_advanced_mapper.md)
- [Using Connector in Java](doc/7_java_api.md)
- [Spark Streaming with Cassandra](doc/8_streaming.md)

## License
This software is available under the [Apache License, Version 2.0](LICENSE).
Expand Down
37 changes: 0 additions & 37 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,6 @@ Enable Cassandra-specific functions on the `SparkContext` and `RDD`:
import com.datastax.spark.connector._
```

### Setting up `StreamingContext`
Follow the directions above for creating a `SparkConf`

Create a `StreamingContext`:

```scala
val ssc = new StreamingContext(conf, Seconds(n))
```

Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`:

```scala
import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._
```

Create any of the available or custom Spark streams, for example an Akka Actor stream:

```scala
val stream = ssc.actorStream[String](Props[SimpleStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK)
```

Writing to Cassandra from a Stream:

```scala
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", SomeColumns("word", "count"))
```

Where `saveToCassandra` accepts

- keyspaceName: String, tableName: String
- keyspaceName: String, tableName: String, columnNames: SomeColumns
- keyspaceName: String, tableName: String, columnNames: SomeColumns, batchSize: Int

### Loading and analyzing data from Cassandra
Use the `sc.cassandraTable` method to view this table as a Spark `RDD`:

Expand Down
91 changes: 91 additions & 0 deletions doc/8_streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Documentation
## Spark Streaming with Cassandra
Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams.
Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.

### The Basic Idea

#### Spark Streaming
Here is a basic Spark Streaming sample which writes to the console with `wordCounts.print()`:

Create a StreamingContext with a SparkConf configuration
```scala
val ssc = new StreamingContext(sparkConf, Seconds(1))
```

Create a DStream that will connect to serverIP:serverPort
```scala
val lines = ssc.socketTextStream(serverIP, serverPort)
```

Count each word in each batch
```scala
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
```

Print a few of the counts to the console.
Start the computation.
```scala
wordCounts.print()
ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate
```

#### Spark Streaming With Cassandra
Now let's add the Cassandra-specific functions on the `StreamingContext` and `RDD` into scope,
and we simply replace the print to console with pipe the output to Cassandra:

```scala
import com.datastax.spark.connector.streaming._
wordCounts.saveToCassandra("streaming_test", "words")
```

### Setting up Streaming
Follow the directions for [creating a `SparkConf`](0_quick_start.md)

#### Create A `StreamingContext`
The second required parameter is the `batchDuration` which sets the interval streaming data will be divided into batches:
Note the Spark API provides a Milliseconds, Seconds, Minutes, all of which are accepted as this `Duration`.
This `Duration` is not to be confused with the [scala.concurrent.duration.Duration](http://www.scala-lang.org/api/current/index.html#scala.concurrent.duration.Duration)

```scala
val ssc = new StreamingContext(conf, Seconds(n))
```

#### Enable Saving To Cassandra
Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`:

```scala
import com.datastax.spark.connector.streaming._
```

#### Creating A Stream and Writing to Cassandra
Create any of the available or custom Spark streams. The connector supports Akka Actor streams so far, but
will be supporting many more in the next release. You can extend the provided `import com.datastax.spark.connector.streaming.TypedStreamingActor`:

```scala
val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK)
```

##### Configure and start the computation.
Where `streaming_test` is the keyspace name and `words` is the table name:

Saving data:
```scala
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", SomeColumns("word", "count"))
```

Start the computation:
```scala
ssc.start()
```

For a more detailed description as well as tuning writes, see [Saving Data to Cassandra](5_saving.md).

### Find out more
http://spark.apache.org/docs/latest/streaming-programming-guide.html
12 changes: 5 additions & 7 deletions project/CassandraSparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ object CassandraSparkBuild extends Build {
lazy val connector = LibraryProject("spark-cassandra-connector", Seq(libraryDependencies ++= Dependencies.connector))

lazy val connectorJava = LibraryProject("spark-cassandra-connector-java", Seq(libraryDependencies ++= Dependencies.connector),
Seq(connector % "compile;runtime->runtime;test->test;it->it,test;provided->provided"))
Seq(connector))

lazy val demos = Project(
id = "spark-cassandra-connector-demos",
base = file("spark-cassandra-connector-demos"),
settings = demoSettings ++ Seq(libraryDependencies ++= Dependencies.demos),
dependencies = Seq(connector, connectorJava))
lazy val demos = LibraryProject("spark-cassandra-connector-demos",
demoSettings ++ Seq(libraryDependencies ++= Dependencies.demos), Seq(connector, connectorJava))

def LibraryProject(name: String, dsettings: Seq[Def.Setting[_]], cpd: Seq[ClasspathDep[ProjectReference]] = Seq.empty): Project =
Project(name, file(name), settings = defaultSettings ++ dsettings, dependencies = cpd) configs (IntegrationTest)
Project(name, file(name), settings = defaultSettings ++ dsettings,
dependencies = cpd.map(_.project % "compile;runtime->runtime;test->test;it->it,test;provided->provided")) configs (IntegrationTest)

}

Expand Down
9 changes: 2 additions & 7 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,8 @@ object Settings extends Build {
autoAPIMappings := true
)

lazy val demoSettings = defaultSettings ++ mimaSettings ++ releaseSettings ++ Seq(
javaOptions in run ++= Seq("-Djava.library.path=./sigar","-Xms128m", "-Xmx1024m"),
scalacOptions ++= Seq("-encoding", "UTF-8", s"-target:jvm-${Versions.JDK}", "-deprecation", "-feature", "-language:_", "-unchecked", "-Xlint"),
javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", Versions.JDK, "-target", Versions.JDK, "-Xlint:unchecked", "-Xlint:deprecation"),
ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet,
parallelExecution in ThisBuild := false,
parallelExecution in Global := false
lazy val demoSettings = Seq(
javaOptions in run ++= Seq("-Djava.library.path=./sigar","-Xms128m", "-Xms2G", "-Xmx2G", "-Xmn384M", "-XX:+UseConcMarkSweepGC")
)

lazy val mimaSettings = mimaDefaultSettings ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
####################################
# Streaming Demo Reference Config File #
####################################

spark-cassandra {

# spark://127.0.0.1@7077,127.0.0.2@7077,127.0.0.3@7077
# or a local spark://host@7077
# This defaults to local
spark.master = "local[12]"
# Would normally be `ms` in config but Spark just wants the Long
spark.streaming.batch.duration = 300
spark.cleaner.ttl = 3600
spark.app.name = "Streaming App"
spark.cassandra.connection.host = "127.0.0.1"
spark.cassandra.keyspace = "streaming_test"
# The class that implements com.datastax.spark.connector.extension.NodeGuardian
spark.cassandra.node.guardian.class = "com.datastax.spark.connector.demo.streaming.StreamingAppNodeGuardian"
}

streaming-demo {
data = ["words ", "may ", "count "]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# for production, you should probably set pattern to %c instead of %l.
# (%l is slower.)

# output messages into a rolling log file as well as stdout
log4j.rootLogger=WARN,stdout

# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Adding this to avoid thrift logging disconnect errors.
log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR

# Avoid "no host ID found" when starting a fresh node
log4j.logger.org.apache.cassandra.db.SystemKeyspace=ERROR

log4j.logger.com.datastax.spark.connector=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark.connector.demo
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

object BasicReadWriteDemo extends App with DemoApp {
object BasicReadWriteDemo extends DemoApp {

CassandraConnector(conf).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
Expand All @@ -16,7 +16,7 @@ object BasicReadWriteDemo extends App with DemoApp {

// Read table test.kv and print its contents:
val rdd = sc.cassandraTable("test", "key_value").select("key", "value")
rdd.toArray().foreach(println)
rdd.toArray().foreach(row => log.debug(s"$row"))

// Write two rows to the test.kv table:
val col = sc.parallelize(Seq((4, "fourth row"), (5, "fifth row")))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package com.datastax.spark.connector.demo

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{Logging, SparkContext, SparkConf}

trait DemoApp {
trait DemoApp extends App with Logging {

val sparkMasterHost = "127.0.0.1"
val cassandraHost = "127.0.0.1"
val SparkMasterHost = "127.0.0.1"

val CassandraHost = "127.0.0.1"

// Tell Spark the address of one Cassandra node:
val conf = new SparkConf(true).set("spark.cassandra.connection.host", cassandraHost)
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", CassandraHost)
.set("spark.cleaner.ttl", "3600")
.setMaster("local[12]")
.setAppName("Demo")

// Connect to the Spark cluster:
val sc = new SparkContext("spark://" + sparkMasterHost + ":7077", "demo-program", conf)
lazy val sc = new SparkContext(conf)
}

object DemoApp {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# To run the demos:
Running the demos takes 3 simple steps:

## Start Cassandra
### Using CCM
For a local cluster that's already configured:
(http://www.datastax.com/dev/blog/ccm-a-development-tool-for-creating-local-cassandra-clusters)

sudo ccm start

To use Apache Cassandra binaries start up Cassandra by invoking

$CASSANDRA_HOME/bin/cassandra -f'

## Start Spark
### Start a standalone master server by executing:
./sbin/start-master.sh

Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers
to it, or pass as the “master” argument to SparkContext. You can also find this URL on the master’s web UI,
which is http://localhost:8080 by default.
### Start one or more workers and connect them to the master via:

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
Once you have started a worker, look at the master’s web UI (http://localhost:8080 by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).

## Run the demo
From SBT, run the following on the comman line, then enter the number of the demo you wish to run:

sbt spark-cassandra-connector-demos/run



Or from an IDE, right click on a particular demo and 'run'.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.datastax.spark.connector.demo

import com.datastax.spark.connector.cql.CassandraConnector

object TableCopyDemo extends App with DemoApp {
object TableCopyDemo extends DemoApp {

import com.datastax.spark.connector._

Expand All @@ -21,5 +21,5 @@ object TableCopyDemo extends App with DemoApp {
src.saveToCassandra("test", "destination")

val dest = sc.cassandraTable("test", "destination")
dest.toArray().foreach(println)
dest.toArray().foreach(row => log.debug(s"$row"))
}
Loading

0 comments on commit 6a6c869

Please sign in to comment.