From 8c0202f4900c7242f33c01fab1b0d04ad6de77b5 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Thu, 7 Aug 2014 10:44:21 -0400 Subject: [PATCH] #133 Changes from PR and scala api docs updated. --- doc/0_quick_start.md | 10 +++++----- doc/5_saving.md | 4 ++-- doc/6_advanced_mapper.md | 2 +- .../spark/connector/writer/TableWriterSpec.scala | 13 +++++++++++++ .../spark/connector/rdd/ColumnSelector.scala | 8 +++++++- .../spark/connector/writer/TableWriter.scala | 4 ++-- 6 files changed, 30 insertions(+), 11 deletions(-) diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index 3481617aa..3590080ba 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -86,7 +86,7 @@ import com.datastax.spark.connector.streaming._ Create any of the available or custom Spark streams, for example an Akka Actor stream: ```scala -val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK) +val stream = ssc.actorStream[String](Props[SimpleStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK) ``` Writing to Cassandra from a Stream: @@ -95,14 +95,14 @@ Writing to Cassandra from a Stream: val wc = stream.flatMap(_.split("\\s+")) .map(x => (x, 1)) .reduceByKey(_ + _) - .saveToCassandra("streaming_test", "words", Seq("word", "count")) + .saveToCassandra("streaming_test", "words") ``` Where `saveToCassandra` accepts - keyspaceName: String, tableName: String -- keyspaceName: String, tableName: String, columnNames: Seq[String] -- keyspaceName: String, tableName: String, columnNames: Seq[String], batchSize: Option[Int] +- keyspaceName: String, tableName: String, columnNames: SomeColumns +- keyspaceName: String, tableName: String, columnNames: SomeColumns, batchSize: Int ### Loading and analyzing data from Cassandra Use the `sc.cassandraTable` method to view this table as a Spark `RDD`: @@ -119,7 +119,7 @@ Add two more rows to the table: ```scala val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) -collection.saveToCassandra("test", "kv", Seq("key", "value")) +collection.saveToCassandra("test", "kv", SomeColumns("key", "value")) ``` diff --git a/doc/5_saving.md b/doc/5_saving.md index 4d7dca9a2..f95d29edd 100644 --- a/doc/5_saving.md +++ b/doc/5_saving.md @@ -12,7 +12,7 @@ keyspace name, table name and a list of columns. Make sure to include at least a ```scala collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) -collection.saveToCassandra("test", "words", Seq("word", "count")) +collection.saveToCassandra("test", "words", SomeColumns("word", "count")) ``` cqlsh:test> select * from words; @@ -34,7 +34,7 @@ to be saved. This example provides more information on property-column naming co ```scala case class WordCount(word: String, count: Long) collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) -collection.saveToCassandra("test", "words", Seq("word", "count")) +collection.saveToCassandra("test", "words", SomeColumns("word", "count")) ``` cqlsh:test> select * from words; diff --git a/doc/6_advanced_mapper.md b/doc/6_advanced_mapper.md index dc62709fe..14b8954ee 100644 --- a/doc/6_advanced_mapper.md +++ b/doc/6_advanced_mapper.md @@ -52,7 +52,7 @@ sc.cassandraTable[WordCount]("test", "words").toArray // Array(WordCount(bar,20), WordCount(foo,10)) sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40))) - .saveToCassandra("test", "words", Seq("word", "count")) + .saveToCassandra("test", "words", SomeColumns("word", "count")) ``` ### Writing custom `ColumnMapper` implementations diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala index a8ff9868d..3a20005b1 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala @@ -120,6 +120,19 @@ class TableWriterSpec extends FlatSpec with Matchers with BeforeAndAfter with Ca } } + it should "distinguish (deprecated) implicit `seqToSomeColumns`" in { + val col = Seq((2, 1L, None)) + sc.parallelize(col).saveToCassandra("write_test", "key_value", Seq("key", "group")) + conn.withSessionDo { session => + val result = session.execute("SELECT * FROM write_test.key_value").all() + result should have size 1 + for (row <- result) { + row.getInt(0) should be (2) + row.getString(2) should be (null) + } + } + } + it should "write collections" in { val col = Seq( (1, Vector("item1", "item2"), Set("item1", "item2"), Map("key1" -> "value1", "key2" -> "value2")), diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ColumnSelector.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ColumnSelector.scala index c2833f383..d6154f756 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ColumnSelector.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ColumnSelector.scala @@ -1,7 +1,13 @@ package com.datastax.spark.connector.rdd -private[connector] trait ColumnSelector +sealed trait ColumnSelector case object AllColumns extends ColumnSelector case class SomeColumns(columns: String*) extends ColumnSelector +object SomeColumns { + @deprecated("Use com.datastax.spark.connector.rdd.SomeColumns instead of Seq", "1.0") + implicit def seqToSomeColumns(columns: Seq[String]): SomeColumns = + SomeColumns(columns: _*) +} + diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala index 26665f49f..27cb5a956 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala @@ -4,7 +4,7 @@ import java.io.IOException import com.datastax.driver.core.{Session, BatchStatement, PreparedStatement, ConsistencyLevel} import com.datastax.spark.connector.cql.{ColumnDef, Schema, TableDef, CassandraConnector} -import com.datastax.spark.connector.rdd.{SomeColumns, ColumnSelector} +import com.datastax.spark.connector.rdd.{AllColumns, SomeColumns, ColumnSelector} import com.datastax.spark.connector.util.CountingIterator import org.apache.spark.{Logging, TaskContext} @@ -166,7 +166,7 @@ object TableWriter { .getOrElse(throw new IOException(s"Table not found: $keyspaceName.$tableName")) val selectedColumns = columnNames match { case SomeColumns(names @ _*) => names - case _ => tableDef.allColumns.map(_.columnName).toSeq + case AllColumns => tableDef.allColumns.map(_.columnName).toSeq } val rowWriter = implicitly[RowWriterFactory[T]].rowWriter(tableDef, selectedColumns) new TableWriter[T](connector, tableDef, rowWriter, batchSizeInBytes, batchSizeInRows, parallelismLevel, consistencyLevel)