Skip to content

Commit

Permalink
Merge pull request datastax#999 from datastax/SPARKC-399
Browse files Browse the repository at this point in the history
SPARKC-399: Removed CassandraSQLContext
  • Loading branch information
jacek-lewandowski authored Jul 14, 2016
2 parents b291636 + 9f363e9 commit 92fc20b
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 476 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
2.0.0 M1
* Removed CassandraSqlContext and underscore based options (SPARKC-399)
* Upgrade to Spark 2.0.0-preview (SPARKC-396)
- Removed Twitter demo because there is no spark-streaming-twitter package available anymore
- Removed Akka Actor demo becaues there is no support for such streams anymore
Expand Down
6 changes: 1 addition & 5 deletions doc/14_data_frames.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ Accessing data Frames using Spark SQL involves creating temporary tables and spe
source as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to
establish a relation between the CassandraTable and the internally used DataSource.

Because of a limitation in SparkSQL, SparkSQL `OPTIONS` must have their
`.` characters replaced with `_`. This means `spark.cassandra.input.split.size_in_mb` becomes
`spark_cassandra_input_split_size_in_mb`.

Example Creating a Source Using Spark SQL:

Create Relation with the Cassandra table test.words
Expand Down Expand Up @@ -214,7 +210,7 @@ the option which it represents to a `DataFrameReader` or `DataFrameWriter`.
Suppose we want to set `spark.cassandra.read.timeout_ms` to 7 seconds on some `DataFrameReader`, we can do this both
ways:
```scala
option("spark_cassandra_read_timeout_ms", "7000")
option("spark.cassandra.read.timeout_ms", "7000")
```
Since this setting is represented by `CassandraConnectorConf.ReadTimeoutParam` we can simply do:
```scala
Expand Down
5 changes: 0 additions & 5 deletions doc/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ retrieve size from C*. Can be set manually now</td>
<td>default</td>
<td>Sets the default Cluster to inherit configuration from</td>
</tr>
<tr>
<td><code>sql.keyspace</code></td>
<td>None</td>
<td>Sets the default keyspace</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.datastax.spark.connector.demo

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.cassandra._

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra.CassandraSQLContext

/** This demo creates a table in Cassandra, populates it with sample data,
* then queries it using SparkSQL and finally displays the query results to the standard output.
* You need to start Cassandra on local node prior to executing this demo. */
object SQLDemo extends DemoApp {

val cc = new CassandraSQLContext(sc)
val sqlContext = new SQLContext(sc)

CassandraConnector(conf).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
Expand All @@ -22,9 +24,9 @@ object SQLDemo extends DemoApp {
session.execute("INSERT INTO test.sql_demo(key, grp, value) VALUES (6, 2, 2.8)")
}

cc.setKeyspace("test")
val rdd = cc.sql("SELECT grp, max(value) AS mv FROM sql_demo GROUP BY grp ORDER BY mv")
rdd.collect().foreach(println) // [2, 4.0] [1, 10.0]
sqlContext.read.cassandraFormat("sql_demo", "test").load().createOrReplaceTempView("test_sql_demo")
val rdd = sqlContext.sql("SELECT grp, max(value) AS mv FROM test_sql_demo GROUP BY grp ORDER BY mv")
rdd.collect().foreach(println) // [2, 4.0] [1, 10.0]

sc.stop()
}
Original file line number Diff line number Diff line change
@@ -1,72 +1,65 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.embedded.SparkTemplate._
import org.apache.spark.sql.SQLContext

import com.datastax.spark.connector.embedded.EmbeddedCassandra
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.toDataFrameFunctions

import org.apache.spark.sql.cassandra.CassandraSQLContext
import com.datastax.spark.connector.{SparkCassandraITFlatSpecBase, _}

class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase {

useCassandraConfig(Seq("cassandra-password-auth.yaml.template"))
useSparkConf(defaultConf)

// Wait for the default user to be created in Cassandra.
Thread.sleep(1000)

val conf = defaultConf
conf.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
conf.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

"A CassandraConnector" should "authenticate with username and password when using native protocol" in {
val conn2 = CassandraConnector(conf)
conn2.withSessionDo { session =>
assert(session !== null)
assert(session.isClosed === false)
assert(session.getCluster.getMetadata.getClusterName != null)
}
}

it should "pick up user and password from SparkConf" in {
val conf = defaultConf
.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

// would throw exception if connection unsuccessful
val conn2 = CassandraConnector(conf)
conn2.withSessionDo { session => }
}

"A DataFrame" should "read and write data with valid auth" in {

val csc = new CassandraSQLContext(sc)
import csc.implicits._

val conf = defaultConf
.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

val conn = CassandraConnector(conf)

val personDF1 = sc.parallelize(Seq(
("Andy", 28, "America"),
("Kaushal", 25, "India"),
("Desanth", 27, "India"),
("Mahendra", 26, "Rajasthan")))
.toDF("name", "age", "address")

createKeyspace(conn.openSession())
personDF1.createCassandraTable(ks, "authtest", Some(Array("address")), Some(Array("age")))(conn)

val options = Map("spark_cassandra_auth_username" -> "cassandra",
"spark_cassandra_auth_password" -> "cassandra",
"keyspace" -> ks, "table" -> "authtest")

personDF1.write.format("org.apache.spark.sql.cassandra").options(options).save();
val personDF2 = csc.read.format("org.apache.spark.sql.cassandra").options(options).load();

personDF2.count should be(4)
}
useCassandraConfig(Seq("cassandra-password-auth.yaml.template"))
useSparkConf(defaultConf)

// Wait for the default user to be created in Cassandra.
Thread.sleep(1000)

val conf = defaultConf
conf.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
conf.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

"A CassandraConnector" should "authenticate with username and password when using native protocol" in {
val conn2 = CassandraConnector(conf)
conn2.withSessionDo { session =>
assert(session !== null)
assert(session.isClosed === false)
assert(session.getCluster.getMetadata.getClusterName != null)
}
}

it should "pick up user and password from SparkConf" in {
val conf = defaultConf
.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

// would throw exception if connection unsuccessful
val conn2 = CassandraConnector(conf)
conn2.withSessionDo { session => }
}

"A DataFrame" should "read and write data with valid auth" in {
val sqlContext = new SQLContext(sc)

val conf = defaultConf
.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra")
.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra")

val conn = CassandraConnector(conf)

val personDF1 = sqlContext.sparkSession.createDataFrame(Seq(
("Andy", 28, "America"),
("Kaushal", 25, "India"),
("Desanth", 27, "India"),
("Mahendra", 26, "Rajasthan"))).toDF("name", "age", "address")

createKeyspace(conn.openSession())
personDF1.createCassandraTable(ks, "authtest", Some(Array("address")), Some(Array("age")))(conn)

val options = Map("spark.cassandra.auth.username" -> "cassandra",
"spark.cassandra.auth.password" -> "cassandra",
"keyspace" -> ks, "table" -> "authtest")

personDF1.write.format("org.apache.spark.sql.cassandra").options(options).save()
val personDF2 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(options).load()

personDF2.count should be(4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
Map(
"table" -> "kv_copy",
"keyspace" -> ks,
"spark_cassandra_output_ttl" -> "300"
"spark.cassandra.output.ttl" -> "300"
)
)
.save()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging
}
}

case class Test(val epoch:Long, val uri:String, val browser:String, val customer_id:Int)
case class TestPartialColumns(val epoch:Long, val browser:String, val customer_id:Int)
case class Test(epoch: Long, uri: String, browser: String, customer_id: Int)

case class TestPartialColumns(epoch: Long, browser: String, customer_id: Int)

object PushdownEverything extends CassandraPredicateRules {
override def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.datastax.spark.connector.sql

import scala.concurrent.Future

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.cassandra._

import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
Expand Down Expand Up @@ -56,33 +57,33 @@ class CassandraSQLClusterLevelSpec extends SparkCassandraITFlatSpecBase {
}
)

var cc: CassandraSQLContext = null
var sqlContext: SQLContext = null

private val cluster1 = "cluster1"
private val cluster2 = "cluster2"

override def beforeAll() {
cc = new CassandraSQLContext(sc)
cc.setConf(cluster1,
sqlContext = new SQLContext(sc)
sqlContext.setCassandraConf(cluster1,
ConnectionHostParam.option(getHost(0).getHostAddress) ++ ConnectionPortParam.option(getPort(0)))
cc.setConf(cluster2,
sqlContext.setCassandraConf(cluster2,
ConnectionHostParam.option(getHost(1).getHostAddress) ++ ConnectionPortParam.option(getPort(1)))
}

it should "allow to join tables from different clusters" in {
cc.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1")
cc.read.cassandraFormat("test2", ks, cluster2).load().createOrReplaceTempView("c2_test2")
sqlContext.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1")
sqlContext.read.cassandraFormat("test2", ks, cluster2).load().createOrReplaceTempView("c2_test2")

val result = cc.sql(s"SELECT * FROM c1_test1 AS test1 JOIN c2_test2 AS test2 WHERE test1.a = test2.a").collect()
val result = sqlContext.sql(s"SELECT * FROM c1_test1 AS test1 JOIN c2_test2 AS test2 WHERE test1.a = test2.a").collect()
result should have length 2
}

it should "allow to write data to another cluster" in {
cc.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1")
cc.read.cassandraFormat("test3", ks, cluster2).load().createOrReplaceTempView("c2_test3")
sqlContext.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1")
sqlContext.read.cassandraFormat("test3", ks, cluster2).load().createOrReplaceTempView("c2_test3")

val insert = cc.sql(s"INSERT INTO TABLE c2_test3 SELECT * FROM c1_test1 AS t1").collect()
val result = cc.sql(s"SELECT * FROM c2_test3 AS test3").collect()
val insert = sqlContext.sql(s"INSERT INTO TABLE c2_test3 SELECT * FROM c1_test1 AS t1").collect()
val result = sqlContext.sql(s"SELECT * FROM c2_test3 AS test3").collect()
result should have length 5
}
}
Loading

0 comments on commit 92fc20b

Please sign in to comment.