Skip to content

Commit

Permalink
Fixed code highlighting in docs, fixes #105
Browse files Browse the repository at this point in the history
  • Loading branch information
jacek-lewandowski committed Jul 28, 2014
1 parent 1be2b29 commit 9547985
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 149 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Language specific highlighting in the documentation (#105)
* Fixed a bug which caused problems when a column of VarChar type was used
in where clause. (04fd8d9)
* Fixed an AnyObjectFactory bug which caused problems with instantiation of
Expand Down
77 changes: 50 additions & 27 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,68 +30,91 @@ This driver is also compatible with Spark distribution provided in

### Preparing example Cassandra schema
Create a simple keyspace and table in Cassandra. Run the following statements in `cqlsh`:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

```sql
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);
```
Then insert some example data:

INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);
```sql
INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);
```

Now you're ready to write your first Spark program using Cassandra.

### Setting up `SparkContext`
Before creating the `SparkContext`, set the `spark.cassandra.connection.host` property to the address of one
of the Cassandra nodes:

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")

```scala
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
```
Create a `SparkContext`. Substitute `127.0.0.1` with the actual address of your Spark Master
(or use `"local"` to run in local mode):

val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)

```scala
val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
```

Enable Cassandra-specific functions on the `SparkContext` and `RDD`:

import com.datastax.spark.connector._

```scala
import com.datastax.spark.connector._
```

### Setting up `StreamingContext`
Follow the directions above for creating a `SparkConf`

Create a `StreamingContext`:

val ssc = new StreamingContext(conf, Seconds(n))
```scala
val ssc = new StreamingContext(conf, Seconds(n))
```

Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`:
import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._

```scala
import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._
```

Create any of the available or custom Spark streams, for example an Akka Actor stream:

val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK)
```scala
val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK)
```

Writing to Cassandra from a Stream:

val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", Seq("word", "count"))
```scala
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", Seq("word", "count"))
```

### Loading and analyzing data from Cassandra
Use the `sc.cassandraTable` method to view this table as a Spark `RDD`:

val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)
```scala
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)
```

### Saving data from RDD to Cassandra
Add two more rows to the table:

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", Seq("key", "value"))

```scala
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", Seq("key", "value"))
```


[Next - Connecting to Cassandra](1_connecting.md)
34 changes: 18 additions & 16 deletions doc/1_connecting.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ spark.cassandra.auth.conf.factory.class | name of the class implementing `AuthC

Example:

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.username", "cassandra")
.set("spark.cassandra.password", "cassandra")
val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)
```scala
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.username", "cassandra")
.set("spark.cassandra.password", "cassandra")

val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)
```

To import Cassandra-specific functions on `SparkContext` and `RDD` objects, call:

import com.datastax.spark.connector._

```scala
import com.datastax.spark.connector._
```

### Connection management

Expand Down Expand Up @@ -68,14 +71,13 @@ and therefore can be safely used in lambdas passed to Spark transformations.
Assuming an appropriately configured `SparkConf` object is stored in the `conf` variable, the following
code creates a keyspace and a table:

import com.datastax.spark.connector.cql.CassandraConnector

CassandraConnector(conf).withSessionDo { session =>
session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)")
}
```scala
import com.datastax.spark.connector.cql.CassandraConnector

CassandraConnector(conf).withSessionDo { session =>
session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)")
}
```

[Next - Accessing data](2_loading.md)


103 changes: 64 additions & 39 deletions doc/2_loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,36 @@ This section describes how to access data from Cassandra table with Spark.
To get a Spark RDD that represents a Cassandra table,
call the `cassandraTable` method on the `SparkContext` object.

sc.cassandraTable("keyspace name", "table name")

```scala
sc.cassandraTable("keyspace name", "table name")
```

If no explicit type is given to `cassandraTable`, the result of this expression is `CassandraRDD[CassandraRow]`.

Create this keyspace and table in Cassandra using cqlsh:

cqlsh> CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
cqlsh> CREATE TABLE test.words (word text PRIMARY KEY, count int);
```sql
CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.words (word text PRIMARY KEY, count int);
```

Load data into the table:

cqlsh> INSERT INTO test.words (word, count) VALUES ('foo', 20);
cqlsh> INSERT INTO test.words (word, count) VALUES ('bar', 20);
```scala
INSERT INTO test.words (word, count) VALUES ('foo', 20);
INSERT INTO test.words (word, count) VALUES ('bar', 20);
```

Now you can read that table as `RDD`:

val rdd = sc.cassandraTable("test", "words")
// rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.rdd.reader.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:41
```scala
val rdd = sc.cassandraTable("test", "words")
// rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.rdd.reader.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:41

rdd.toArray.foreach(println)
// CassandraRow{word: bar, count: 20}
// CassandraRow{word: foo, count: 20}
rdd.toArray.foreach(println)
// CassandraRow{word: bar, count: 20}
// CassandraRow{word: foo, count: 20}
```

### Reading primitive column values

Expand All @@ -39,33 +47,42 @@ Type conversions are applied on the fly. Use `getOption` variants when you expec

Continuing with the previous example, follow these steps to access individual column values.
Store the first item of the rdd in the firstRow value.

val firstRow = rdd.first
// firstRow: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{word: bar, count: 20}


```scala
val firstRow = rdd.first
// firstRow: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{word: bar, count: 20}
```

Get the number of columns and column names:

rdd.columnNames // Stream(word, count)
rdd.size // 2
```scala
rdd.columnNames // Stream(word, count)
rdd.size // 2
```

Use one of `getXXX` getters to obtain a column value converted to desired type:

firstRow.getInt("count") // 20
firstRow.getLong("count") // 20L
Use one of `getXXX` getters to obtain a column value converted to desired type:
```scala
firstRow.getInt("count") // 20
firstRow.getLong("count") // 20L
```

Or use a generic get to query the table by passing the return type directly:

firstRow.get[Int]("count") // 20
firstRow.get[Long]("count") // 20L
firstRow.get[BigInt]("count") // BigInt(20)
firstRow.get[java.math.BigInteger]("count") // BigInteger(20)
```scala
firstRow.get[Int]("count") // 20
firstRow.get[Long]("count") // 20L
firstRow.get[BigInt]("count") // BigInt(20)
firstRow.get[java.math.BigInteger]("count") // BigInteger(20)
```

### Working with nullable data

When reading potentially `null` data, use the `Option` type on the Scala side to prevent getting a `NullPointerException`.

firstRow.getIntOption("count") // Some(20)
firstRow.get[Option[Int]]("count") // Some(20)
```scala
firstRow.getIntOption("count") // Some(20)
firstRow.get[Option[Int]]("count") // Some(20)
```

### Reading collections

Expand All @@ -78,26 +95,34 @@ Assuming you set up the test keyspace earlier, follow these steps to access a Ca

In the test keyspace, set up a collection set using cqlsh:

cqlsh> CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>);
cqlsh> INSERT INTO test.users (username, emails)
VALUES ('someone', {'[email protected]', '[email protected]'});
```sql
CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>);
INSERT INTO test.users (username, emails)
VALUES ('someone', {'[email protected]', '[email protected]'});
```

Then in your application, retrieve the first row:

val row = sc.cassandraTable("test", "users").first
// row: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{username: someone, emails: [[email protected], [email protected]]}

```scala
val row = sc.cassandraTable("test", "users").first
// row: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{username: someone, emails: [[email protected], [email protected]]}
```

Query the collection set in Cassandra from Spark:

row.getList[String]("emails") // Vector([email protected], [email protected])
row.get[List[String]]("emails") // List([email protected], [email protected])
row.get[Seq[String]]("emails") // List([email protected], [email protected]) :Seq[String]
row.get[IndexedSeq[String]]("emails") // Vector([email protected], [email protected]) :IndexedSeq[String]
row.get[Set[String]]("emails") // Set([email protected], [email protected])
```scala
row.getList[String]("emails") // Vector([email protected], [email protected])
row.get[List[String]]("emails") // List([email protected], [email protected])
row.get[Seq[String]]("emails") // List([email protected], [email protected]) :Seq[String]
row.get[IndexedSeq[String]]("emails") // Vector([email protected], [email protected]) :IndexedSeq[String]
row.get[Set[String]]("emails") // Set([email protected], [email protected])
```

It is also possible to convert a collection to CQL `String` representation:

row.get[String]("emails") // "[[email protected], [email protected]]"
```scala
row.get[String]("emails") // "[[email protected], [email protected]]"
```

A `null` collection is equivalent to an empty collection, therefore you don't need to use `get[Option[...]]`
with collections.
Expand Down
20 changes: 12 additions & 8 deletions doc/3_selection.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ to speed up processing.

For performance reasons, you should not fetch columns you don't need. You can achieve this with the `select` method.

sc.cassandraTable("test", "users").select("username").toArray.foreach(println)
// CassandraRow{username: noemail}
// CassandraRow{username: someone}
```scala
sc.cassandraTable("test", "users").select("username").toArray.foreach(println)
// CassandraRow{username: noemail}
// CassandraRow{username: someone}
```

The `select` method can be chained. Every next call can be used to select a subset of columns already selected.
Selecting a non-existing column would result in throwing an exception.
Expand All @@ -23,12 +25,14 @@ Also, some CPU cycles are wasted serializing and deserializing objects that woul
included in the result. To avoid this overhead, `CassandraRDD` offers the `where` method, which lets you pass
arbitrary CQL condition(s) to filter the row set on the server.

sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
// CassandraRow[id: KF-334L, model: Ford Mondeo]
// CassandraRow[id: MT-8787, model: Hyundai x35]
```scala
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
// CassandraRow[id: KF-334L, model: Ford Mondeo]
// CassandraRow[id: MT-8787, model: Hyundai x35]

sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
// CassandraRow[id: WX-2234, model: Toyota Yaris]
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
// CassandraRow[id: WX-2234, model: Toyota Yaris]
```

Note: Although the `ALLOW FILTERING` clause is implicitly added to the generated CQL query, not all predicates
are currently allowed by the Cassandra engine. This limitation is going to be addressed in the future
Expand Down
Loading

0 comments on commit 9547985

Please sign in to comment.