Skip to content

Commit

Permalink
#133 Changes from PR and scala api docs updated.
Browse files Browse the repository at this point in the history
  • Loading branch information
Helena Edelson committed Aug 7, 2014
1 parent 22dd434 commit 8c0202f
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 11 deletions.
10 changes: 5 additions & 5 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ import com.datastax.spark.connector.streaming._
Create any of the available or custom Spark streams, for example an Akka Actor stream:

```scala
val stream = ssc.actorStream[String](Props[SimpleActor], actorName, StorageLevel.MEMORY_AND_DISK)
val stream = ssc.actorStream[String](Props[SimpleStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK)
```

Writing to Cassandra from a Stream:
Expand All @@ -95,14 +95,14 @@ Writing to Cassandra from a Stream:
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", Seq("word", "count"))
.saveToCassandra("streaming_test", "words")
```

Where `saveToCassandra` accepts

- keyspaceName: String, tableName: String
- keyspaceName: String, tableName: String, columnNames: Seq[String]
- keyspaceName: String, tableName: String, columnNames: Seq[String], batchSize: Option[Int]
- keyspaceName: String, tableName: String, columnNames: SomeColumns
- keyspaceName: String, tableName: String, columnNames: SomeColumns, batchSize: Int

### Loading and analyzing data from Cassandra
Use the `sc.cassandraTable` method to view this table as a Spark `RDD`:
Expand All @@ -119,7 +119,7 @@ Add two more rows to the table:

```scala
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", Seq("key", "value"))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
```


Expand Down
4 changes: 2 additions & 2 deletions doc/5_saving.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keyspace name, table name and a list of columns. Make sure to include at least a

```scala
collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection.saveToCassandra("test", "words", Seq("word", "count"))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
```

cqlsh:test> select * from words;
Expand All @@ -34,7 +34,7 @@ to be saved. This example provides more information on property-column naming co
```scala
case class WordCount(word: String, count: Long)
collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", Seq("word", "count"))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
```

cqlsh:test> select * from words;
Expand Down
2 changes: 1 addition & 1 deletion doc/6_advanced_mapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ sc.cassandraTable[WordCount]("test", "words").toArray
// Array(WordCount(bar,20), WordCount(foo,10))

sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40)))
.saveToCassandra("test", "words", Seq("word", "count"))
.saveToCassandra("test", "words", SomeColumns("word", "count"))
```

### Writing custom `ColumnMapper` implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ class TableWriterSpec extends FlatSpec with Matchers with BeforeAndAfter with Ca
}
}

it should "distinguish (deprecated) implicit `seqToSomeColumns`" in {
val col = Seq((2, 1L, None))
sc.parallelize(col).saveToCassandra("write_test", "key_value", Seq("key", "group"))
conn.withSessionDo { session =>
val result = session.execute("SELECT * FROM write_test.key_value").all()
result should have size 1
for (row <- result) {
row.getInt(0) should be (2)
row.getString(2) should be (null)
}
}
}

it should "write collections" in {
val col = Seq(
(1, Vector("item1", "item2"), Set("item1", "item2"), Map("key1" -> "value1", "key2" -> "value2")),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.datastax.spark.connector.rdd

private[connector] trait ColumnSelector
sealed trait ColumnSelector
case object AllColumns extends ColumnSelector
case class SomeColumns(columns: String*) extends ColumnSelector

object SomeColumns {
@deprecated("Use com.datastax.spark.connector.rdd.SomeColumns instead of Seq", "1.0")
implicit def seqToSomeColumns(columns: Seq[String]): SomeColumns =
SomeColumns(columns: _*)
}


Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.io.IOException

import com.datastax.driver.core.{Session, BatchStatement, PreparedStatement, ConsistencyLevel}
import com.datastax.spark.connector.cql.{ColumnDef, Schema, TableDef, CassandraConnector}
import com.datastax.spark.connector.rdd.{SomeColumns, ColumnSelector}
import com.datastax.spark.connector.rdd.{AllColumns, SomeColumns, ColumnSelector}
import com.datastax.spark.connector.util.CountingIterator

import org.apache.spark.{Logging, TaskContext}
Expand Down Expand Up @@ -166,7 +166,7 @@ object TableWriter {
.getOrElse(throw new IOException(s"Table not found: $keyspaceName.$tableName"))
val selectedColumns = columnNames match {
case SomeColumns(names @ _*) => names
case _ => tableDef.allColumns.map(_.columnName).toSeq
case AllColumns => tableDef.allColumns.map(_.columnName).toSeq
}
val rowWriter = implicitly[RowWriterFactory[T]].rowWriter(tableDef, selectedColumns)
new TableWriter[T](connector, tableDef, rowWriter, batchSizeInBytes, batchSizeInRows, parallelismLevel, consistencyLevel)
Expand Down

0 comments on commit 8c0202f

Please sign in to comment.