From 9547985d5eb620189b439c36832747353ee690f2 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Mon, 28 Jul 2014 14:43:57 +0200 Subject: [PATCH] Fixed code highlighting in docs, fixes #105 --- CHANGES.txt | 1 + doc/0_quick_start.md | 77 +++++++++++++++++++---------- doc/1_connecting.md | 34 +++++++------ doc/2_loading.md | 103 ++++++++++++++++++++++++--------------- doc/3_selection.md | 20 +++++--- doc/4_mapper.md | 32 ++++++------ doc/5_saving.md | 15 +++--- doc/6_advanced_mapper.md | 88 ++++++++++++++++++--------------- 8 files changed, 221 insertions(+), 149 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 83ec79b17..90873f473 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,4 @@ + * Language specific highlighting in the documentation (#105) * Fixed a bug which caused problems when a column of VarChar type was used in where clause. (04fd8d9) * Fixed an AnyObjectFactory bug which caused problems with instantiation of diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index c5f15a1ab..54aa66609 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -30,68 +30,91 @@ This driver is also compatible with Spark distribution provided in ### Preparing example Cassandra schema Create a simple keyspace and table in Cassandra. Run the following statements in `cqlsh`: - - CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; - CREATE TABLE test.kv(key text PRIMARY KEY, value int); + +```sql +CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; +CREATE TABLE test.kv(key text PRIMARY KEY, value int); +``` Then insert some example data: - INSERT INTO test.kv(key, value) VALUES ('key1', 1); - INSERT INTO test.kv(key, value) VALUES ('key2', 2); +```sql +INSERT INTO test.kv(key, value) VALUES ('key1', 1); +INSERT INTO test.kv(key, value) VALUES ('key2', 2); +``` Now you're ready to write your first Spark program using Cassandra. ### Setting up `SparkContext` Before creating the `SparkContext`, set the `spark.cassandra.connection.host` property to the address of one of the Cassandra nodes: - - val conf = new SparkConf(true) - .set("spark.cassandra.connection.host", "127.0.0.1") + +```scala +val conf = new SparkConf(true) + .set("spark.cassandra.connection.host", "127.0.0.1") +``` Create a `SparkContext`. Substitute `127.0.0.1` with the actual address of your Spark Master (or use `"local"` to run in local mode): - - val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf) + +```scala +val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf) +``` Enable Cassandra-specific functions on the `SparkContext` and `RDD`: - - import com.datastax.spark.connector._ + +```scala +import com.datastax.spark.connector._ +``` ### Setting up `StreamingContext` Follow the directions above for creating a `SparkConf` Create a `StreamingContext`: - val ssc = new StreamingContext(conf, Seconds(n)) +```scala +val ssc = new StreamingContext(conf, Seconds(n)) +``` Enable Cassandra-specific functions on the `StreamingContext`, `DStream` and `RDD`: - import com.datastax.spark.connector._ - import com.datastax.spark.connector.streaming._ + +```scala +import com.datastax.spark.connector._ +import com.datastax.spark.connector.streaming._ +``` Create any of the available or custom Spark streams, for example an Akka Actor stream: - val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK) +```scala +val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK) +``` Writing to Cassandra from a Stream: - val wc = stream.flatMap(_.split("\\s+")) - .map(x => (x, 1)) - .reduceByKey(_ + _) - .saveToCassandra("streaming_test", "words", Seq("word", "count")) +```scala +val wc = stream.flatMap(_.split("\\s+")) + .map(x => (x, 1)) + .reduceByKey(_ + _) + .saveToCassandra("streaming_test", "words", Seq("word", "count")) +``` ### Loading and analyzing data from Cassandra Use the `sc.cassandraTable` method to view this table as a Spark `RDD`: - val rdd = sc.cassandraTable("test", "kv") - println(rdd.count) - println(rdd.first) - println(rdd.map(_.getInt("value")).sum) +```scala +val rdd = sc.cassandraTable("test", "kv") +println(rdd.count) +println(rdd.first) +println(rdd.map(_.getInt("value")).sum) +``` ### Saving data from RDD to Cassandra Add two more rows to the table: - - val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) - collection.saveToCassandra("test", "kv", Seq("key", "value")) + +```scala +val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) +collection.saveToCassandra("test", "kv", Seq("key", "value")) +``` [Next - Connecting to Cassandra](1_connecting.md) diff --git a/doc/1_connecting.md b/doc/1_connecting.md index 431baa270..37219bde5 100644 --- a/doc/1_connecting.md +++ b/doc/1_connecting.md @@ -22,17 +22,20 @@ spark.cassandra.auth.conf.factory.class | name of the class implementing `AuthC Example: - val conf = new SparkConf(true) - .set("spark.cassandra.connection.host", "192.168.123.10") - .set("spark.cassandra.username", "cassandra") - .set("spark.cassandra.password", "cassandra") - - val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf) +```scala +val conf = new SparkConf(true) + .set("spark.cassandra.connection.host", "192.168.123.10") + .set("spark.cassandra.username", "cassandra") + .set("spark.cassandra.password", "cassandra") + +val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf) +``` To import Cassandra-specific functions on `SparkContext` and `RDD` objects, call: - import com.datastax.spark.connector._ - +```scala +import com.datastax.spark.connector._ +``` ### Connection management @@ -68,14 +71,13 @@ and therefore can be safely used in lambdas passed to Spark transformations. Assuming an appropriately configured `SparkConf` object is stored in the `conf` variable, the following code creates a keyspace and a table: - import com.datastax.spark.connector.cql.CassandraConnector - - CassandraConnector(conf).withSessionDo { session => - session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") - session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)") - } +```scala +import com.datastax.spark.connector.cql.CassandraConnector +CassandraConnector(conf).withSessionDo { session => + session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") + session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)") +} +``` [Next - Accessing data](2_loading.md) - - diff --git a/doc/2_loading.md b/doc/2_loading.md index 3dd5cf7b4..1aa659678 100644 --- a/doc/2_loading.md +++ b/doc/2_loading.md @@ -8,28 +8,36 @@ This section describes how to access data from Cassandra table with Spark. To get a Spark RDD that represents a Cassandra table, call the `cassandraTable` method on the `SparkContext` object. - sc.cassandraTable("keyspace name", "table name") - +```scala +sc.cassandraTable("keyspace name", "table name") +``` + If no explicit type is given to `cassandraTable`, the result of this expression is `CassandraRDD[CassandraRow]`. Create this keyspace and table in Cassandra using cqlsh: - cqlsh> CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }; - cqlsh> CREATE TABLE test.words (word text PRIMARY KEY, count int); +```sql +CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }; +CREATE TABLE test.words (word text PRIMARY KEY, count int); +``` Load data into the table: - cqlsh> INSERT INTO test.words (word, count) VALUES ('foo', 20); - cqlsh> INSERT INTO test.words (word, count) VALUES ('bar', 20); +```scala +INSERT INTO test.words (word, count) VALUES ('foo', 20); +INSERT INTO test.words (word, count) VALUES ('bar', 20); +``` Now you can read that table as `RDD`: - val rdd = sc.cassandraTable("test", "words") - // rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.rdd.reader.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:41 +```scala +val rdd = sc.cassandraTable("test", "words") +// rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.rdd.reader.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:41 - rdd.toArray.foreach(println) - // CassandraRow{word: bar, count: 20} - // CassandraRow{word: foo, count: 20} +rdd.toArray.foreach(println) +// CassandraRow{word: bar, count: 20} +// CassandraRow{word: foo, count: 20} +``` ### Reading primitive column values @@ -39,33 +47,42 @@ Type conversions are applied on the fly. Use `getOption` variants when you expec Continuing with the previous example, follow these steps to access individual column values. Store the first item of the rdd in the firstRow value. - - val firstRow = rdd.first - // firstRow: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{word: bar, count: 20} - + +```scala +val firstRow = rdd.first +// firstRow: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{word: bar, count: 20} +``` + Get the number of columns and column names: - rdd.columnNames // Stream(word, count) - rdd.size // 2 +```scala +rdd.columnNames // Stream(word, count) +rdd.size // 2 +``` -Use one of `getXXX` getters to obtain a column value converted to desired type: - - firstRow.getInt("count") // 20 - firstRow.getLong("count") // 20L +Use one of `getXXX` getters to obtain a column value converted to desired type: +```scala +firstRow.getInt("count") // 20 +firstRow.getLong("count") // 20L +``` Or use a generic get to query the table by passing the return type directly: - firstRow.get[Int]("count") // 20 - firstRow.get[Long]("count") // 20L - firstRow.get[BigInt]("count") // BigInt(20) - firstRow.get[java.math.BigInteger]("count") // BigInteger(20) +```scala +firstRow.get[Int]("count") // 20 +firstRow.get[Long]("count") // 20L +firstRow.get[BigInt]("count") // BigInt(20) +firstRow.get[java.math.BigInteger]("count") // BigInteger(20) +``` ### Working with nullable data When reading potentially `null` data, use the `Option` type on the Scala side to prevent getting a `NullPointerException`. - firstRow.getIntOption("count") // Some(20) - firstRow.get[Option[Int]]("count") // Some(20) +```scala +firstRow.getIntOption("count") // Some(20) +firstRow.get[Option[Int]]("count") // Some(20) +``` ### Reading collections @@ -78,26 +95,34 @@ Assuming you set up the test keyspace earlier, follow these steps to access a Ca In the test keyspace, set up a collection set using cqlsh: - cqlsh> CREATE TABLE test.users (username text PRIMARY KEY, emails SET); - cqlsh> INSERT INTO test.users (username, emails) - VALUES ('someone', {'someone@email.com', 's@email.com'}); +```sql +CREATE TABLE test.users (username text PRIMARY KEY, emails SET); +INSERT INTO test.users (username, emails) + VALUES ('someone', {'someone@email.com', 's@email.com'}); +``` Then in your application, retrieve the first row: - - val row = sc.cassandraTable("test", "users").first - // row: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{username: someone, emails: [someone@email.com, s@email.com]} + +```scala +val row = sc.cassandraTable("test", "users").first +// row: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{username: someone, emails: [someone@email.com, s@email.com]} +``` Query the collection set in Cassandra from Spark: - row.getList[String]("emails") // Vector(someone@email.com, s@email.com) - row.get[List[String]]("emails") // List(someone@email.com, s@email.com) - row.get[Seq[String]]("emails") // List(someone@email.com, s@email.com) :Seq[String] - row.get[IndexedSeq[String]]("emails") // Vector(someone@email.com, s@email.com) :IndexedSeq[String] - row.get[Set[String]]("emails") // Set(someone@email.com, s@email.com) +```scala +row.getList[String]("emails") // Vector(someone@email.com, s@email.com) +row.get[List[String]]("emails") // List(someone@email.com, s@email.com) +row.get[Seq[String]]("emails") // List(someone@email.com, s@email.com) :Seq[String] +row.get[IndexedSeq[String]]("emails") // Vector(someone@email.com, s@email.com) :IndexedSeq[String] +row.get[Set[String]]("emails") // Set(someone@email.com, s@email.com) +``` It is also possible to convert a collection to CQL `String` representation: - row.get[String]("emails") // "[someone@email.com, s@email.com]" +```scala +row.get[String]("emails") // "[someone@email.com, s@email.com]" +``` A `null` collection is equivalent to an empty collection, therefore you don't need to use `get[Option[...]]` with collections. diff --git a/doc/3_selection.md b/doc/3_selection.md index 131e7d039..0d3448c53 100644 --- a/doc/3_selection.md +++ b/doc/3_selection.md @@ -8,9 +8,11 @@ to speed up processing. For performance reasons, you should not fetch columns you don't need. You can achieve this with the `select` method. - sc.cassandraTable("test", "users").select("username").toArray.foreach(println) - // CassandraRow{username: noemail} - // CassandraRow{username: someone} +```scala +sc.cassandraTable("test", "users").select("username").toArray.foreach(println) +// CassandraRow{username: noemail} +// CassandraRow{username: someone} +``` The `select` method can be chained. Every next call can be used to select a subset of columns already selected. Selecting a non-existing column would result in throwing an exception. @@ -23,12 +25,14 @@ Also, some CPU cycles are wasted serializing and deserializing objects that woul included in the result. To avoid this overhead, `CassandraRDD` offers the `where` method, which lets you pass arbitrary CQL condition(s) to filter the row set on the server. - sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println) - // CassandraRow[id: KF-334L, model: Ford Mondeo] - // CassandraRow[id: MT-8787, model: Hyundai x35] +```scala +sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println) +// CassandraRow[id: KF-334L, model: Ford Mondeo] +// CassandraRow[id: MT-8787, model: Hyundai x35] - sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println) - // CassandraRow[id: WX-2234, model: Toyota Yaris] +sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println) +// CassandraRow[id: WX-2234, model: Toyota Yaris] +``` Note: Although the `ALLOW FILTERING` clause is implicitly added to the generated CQL query, not all predicates are currently allowed by the Cassandra engine. This limitation is going to be addressed in the future diff --git a/doc/4_mapper.md b/doc/4_mapper.md index 385af7968..89f39f833 100644 --- a/doc/4_mapper.md +++ b/doc/4_mapper.md @@ -7,12 +7,13 @@ This section describes how to store Cassandra rows in Scala tuples or objects of Instead of mapping your Cassandra rows to objects of the `CassandraRow` class, you can directly unwrap column values into tuples of desired type. - sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray - // Array((bar,20), (foo,10)) +```scala +sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray +// Array((bar,20), (foo,10)) - sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray - // Array((20,bar), (10,foo)) - +sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray +// Array((20,bar), (10,foo)) +``` ### Mapping rows to (case) objects Define a case class with properties named the same as the Cassandra columns. @@ -20,9 +21,11 @@ For multi-word column identifiers, separate each word by an underscore in Cassan and use the camel case convention on the Scala side. Then provide the explicit class name when invoking `cassandraTable`: - case class WordCount(word: String, count: Int) - sc.cassandraTable[WordCount]("test", "words").toArray - // Array(WordCount(bar,20), WordCount(foo,10)) +```scala +case class WordCount(word: String, count: Int) +sc.cassandraTable[WordCount]("test", "words").toArray +// Array(WordCount(bar,20), WordCount(foo,10)) +``` The column-property naming convention is: @@ -48,10 +51,11 @@ The class doesn't necessarily need to be a case class. The only requirements are Property values might be also set by Scala-style setters. The following class is also compatible: - class WordCount extends Serializable { - var word: String = "" - var count: Int = 0 - } - - +```scala +class WordCount extends Serializable { + var word: String = "" + var count: Int = 0 +} +``` + [Next - Saving data](5_saving.md) diff --git a/doc/5_saving.md b/doc/5_saving.md index 06b386f6f..4d7dca9a2 100644 --- a/doc/5_saving.md +++ b/doc/5_saving.md @@ -10,8 +10,10 @@ keyspace name, table name and a list of columns. Make sure to include at least a ## Saving a collection of tuples - collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) - collection.saveToCassandra("test", "words", Seq("word", "count")) +```scala +collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) +collection.saveToCassandra("test", "words", Seq("word", "count")) +``` cqlsh:test> select * from words; @@ -29,10 +31,11 @@ When saving a collection of objects of a user-defined class, the items to be sav must provide appropriately named public property accessors for getting every column to be saved. This example provides more information on property-column naming conventions is described [here](mapper.md). - case class WordCount(word: String, count: Long) - collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) - collection.saveToCassandra("test", "words", Seq("word", "count")) - +```scala +case class WordCount(word: String, count: Long) +collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) +collection.saveToCassandra("test", "words", Seq("word", "count")) +``` cqlsh:test> select * from words; diff --git a/doc/6_advanced_mapper.md b/doc/6_advanced_mapper.md index 515ec2e1f..dc62709fe 100644 --- a/doc/6_advanced_mapper.md +++ b/doc/6_advanced_mapper.md @@ -16,21 +16,23 @@ implementations are included. To work with Java classes, use `JavaBeanColumnMapper`. Make sure your objects are `Serializable`, otherwise Spark won't be able to send them over the network. - import com.datastax.spark.connector.mapper.JavaBeanColumnMapper - class WordCount extends Serializable { - private var _word: String = "" - private var _count: Int = 0 - def setWord(word: String) { _word = word } - def setCount(count: Int) { _count = count } - override def toString = _word + ":" + _count - } - - object WordCount { - implicit object Mapper extends JavaBeanColumnMapper[WordCount] - } - - sc.cassandraTable[WordCount]("test", "words").toArray - // Array(bar:20, foo:10) +```scala +import com.datastax.spark.connector.mapper.JavaBeanColumnMapper +class WordCount extends Serializable { + private var _word: String = "" + private var _count: Int = 0 + def setWord(word: String) { _word = word } + def setCount(count: Int) { _count = count } + override def toString = _word + ":" + _count +} + +object WordCount { + implicit object Mapper extends JavaBeanColumnMapper[WordCount] +} + +sc.cassandraTable[WordCount]("test", "words").toArray +// Array(bar:20, foo:10) +``` To save objects of class `WordCount`, you'll need to define getters. @@ -38,18 +40,20 @@ To save objects of class `WordCount`, you'll need to define getters. If for some reason you wish to associate a column of a different name than the property, you may pass a column translation `Map` to a `DefaultColumnMapper` or `JavaBeanColumnMapper`: - case class WordCount(w: String, c: Int) - - object WordCount { - implicit object Mapper extends DefaultColumnMapper[WordCount]( - Map("w" -> "word", "c" -> "count")) - } +```scala +case class WordCount(w: String, c: Int) + +object WordCount { + implicit object Mapper extends DefaultColumnMapper[WordCount]( + Map("w" -> "word", "c" -> "count")) +} - sc.cassandraTable[WordCount]("test", "words").toArray - // Array(WordCount(bar,20), WordCount(foo,10)) +sc.cassandraTable[WordCount]("test", "words").toArray +// Array(WordCount(bar,20), WordCount(foo,10)) - sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40))) - .saveToCassandra("test", "words", Seq("word", "count")) +sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40))) + .saveToCassandra("test", "words", Seq("word", "count")) +``` ### Writing custom `ColumnMapper` implementations To define column mappings for your classes, create an appropriate implicit object implementing @@ -59,26 +63,30 @@ To define column mappings for your classes, create an appropriate implicit objec To map a Cassandra column to a field of user-defined type, register custom `TypeConverter` implementations. For example, imagine you want emails to be stored in custom `Email` class, wrapping a string: - case class EMail(email: String) +```scala +case class EMail(email: String) +``` To tell the connector how to read and write fields of type `EMail`, you need to define two type converters - from `String` to `Email` and from `Email` to `String`: - import com.datastax.spark.connector.types._ - import scala.reflect.runtime.universe._ +```scala +import com.datastax.spark.connector.types._ +import scala.reflect.runtime.universe._ - object StringToEMailConverter extends TypeConverter[EMail] { - def targetTypeTag = typeTag[EMail] - def convertPF = { case str: String => EMail(str) } - } +object StringToEMailConverter extends TypeConverter[EMail] { + def targetTypeTag = typeTag[EMail] + def convertPF = { case str: String => EMail(str) } +} + +object EMailToStringConverter extends TypeConverter[String] { + def targetTypeTag = typeTag[String] + def convertPF = { case EMail(str) => str } +} - object EMailToStringConverter extends TypeConverter[String] { - def targetTypeTag = typeTag[String] - def convertPF = { case EMail(str) => str } - } - - TypeConverter.registerConverter(StringToEMailConverter) - TypeConverter.registerConverter(EMailToStringConverter) +TypeConverter.registerConverter(StringToEMailConverter) +TypeConverter.registerConverter(EMailToStringConverter) +``` Now you can map any Cassandra text or ascii column to `EMail` instance. The registration step must be performed before creating any RDDs you wish to @@ -100,3 +108,5 @@ In the same way, when writing an `RDD` back to Cassandra, an appropriate implici `RowWriter` are used to extract column values from every RDD item and bind them to an INSERT `PreparedStatement`. Please refer to the ScalaDoc for more details. + +[Next - Using Connector in Java](7_java_api.md)