From e98d150a42676f95e9a9031dc91d0562a596cbd0 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Fri, 8 Jul 2016 15:47:01 +0200 Subject: [PATCH 1/2] SPARKC-399: Removed CassandraSQLContext --- CHANGES.txt | 1 + doc/reference.md | 5 - .../spark/connector/demo/SQLDemo.scala | 12 +- .../CassandraAuthenticatedConnectorSpec.scala | 123 +++++++------ .../sql/CassandraDataSourceSpec.scala | 5 +- .../sql/CassandraSQLClusterLevelSpec.scala | 23 +-- .../connector/sql/CassandraSQLSpec.scala | 163 +++++++++--------- .../spark/connector/util/ConfigCheck.scala | 15 +- .../sql/cassandra/CassandraCatalog.scala | 109 ------------ .../sql/cassandra/CassandraSQLContext.scala | 130 -------------- .../sql/cassandra/CassandraSession.scala | 28 --- .../apache/spark/sql/cassandra/package.scala | 70 ++++++++ 12 files changed, 237 insertions(+), 447 deletions(-) delete mode 100644 spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala delete mode 100644 spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala delete mode 100644 spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSession.scala diff --git a/CHANGES.txt b/CHANGES.txt index cac677d2b..0bbdcf869 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.0 M1 + * Removed CassandraSqlContext (SPARKC-399) * Upgrade to Spark 2.0.0-preview (SPARKC-396) - Removed Twitter demo because there is no spark-streaming-twitter package available anymore - Removed Akka Actor demo becaues there is no support for such streams anymore diff --git a/doc/reference.md b/doc/reference.md index 6af611988..e8cadbf86 100644 --- a/doc/reference.md +++ b/doc/reference.md @@ -118,11 +118,6 @@ retrieve size from C*. Can be set manually now default Sets the default Cluster to inherit configuration from - - sql.keyspace - None - Sets the default keyspace - diff --git a/spark-cassandra-connector-demos/simple-demos/src/main/scala/com/datastax/spark/connector/demo/SQLDemo.scala b/spark-cassandra-connector-demos/simple-demos/src/main/scala/com/datastax/spark/connector/demo/SQLDemo.scala index cc60ff74e..f25993df5 100644 --- a/spark-cassandra-connector-demos/simple-demos/src/main/scala/com/datastax/spark/connector/demo/SQLDemo.scala +++ b/spark-cassandra-connector-demos/simple-demos/src/main/scala/com/datastax/spark/connector/demo/SQLDemo.scala @@ -1,14 +1,16 @@ package com.datastax.spark.connector.demo +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.cassandra._ + import com.datastax.spark.connector.cql.CassandraConnector -import org.apache.spark.sql.cassandra.CassandraSQLContext /** This demo creates a table in Cassandra, populates it with sample data, * then queries it using SparkSQL and finally displays the query results to the standard output. * You need to start Cassandra on local node prior to executing this demo. */ object SQLDemo extends DemoApp { - val cc = new CassandraSQLContext(sc) + val sqlContext = new SQLContext(sc) CassandraConnector(conf).withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") @@ -22,9 +24,9 @@ object SQLDemo extends DemoApp { session.execute("INSERT INTO test.sql_demo(key, grp, value) VALUES (6, 2, 2.8)") } - cc.setKeyspace("test") - val rdd = cc.sql("SELECT grp, max(value) AS mv FROM sql_demo GROUP BY grp ORDER BY mv") - rdd.collect().foreach(println) // [2, 4.0] [1, 10.0] + sqlContext.read.cassandraFormat("sql_demo", "test").load().createOrReplaceTempView("test_sql_demo") + val rdd = sqlContext.sql("SELECT grp, max(value) AS mv FROM test_sql_demo GROUP BY grp ORDER BY mv") + rdd.collect().foreach(println) // [2, 4.0] [1, 10.0] sc.stop() } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala index 298a43623..93cf4b2a7 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala @@ -1,72 +1,65 @@ package com.datastax.spark.connector.cql -import com.datastax.spark.connector.embedded.SparkTemplate._ +import org.apache.spark.sql.SQLContext -import com.datastax.spark.connector.embedded.EmbeddedCassandra -import com.datastax.spark.connector.SparkCassandraITFlatSpecBase -import com.datastax.spark.connector.toDataFrameFunctions - -import org.apache.spark.sql.cassandra.CassandraSQLContext +import com.datastax.spark.connector.{SparkCassandraITFlatSpecBase, _} class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase { - useCassandraConfig(Seq("cassandra-password-auth.yaml.template")) - useSparkConf(defaultConf) - - // Wait for the default user to be created in Cassandra. - Thread.sleep(1000) - - val conf = defaultConf - conf.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") - conf.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") - - "A CassandraConnector" should "authenticate with username and password when using native protocol" in { - val conn2 = CassandraConnector(conf) - conn2.withSessionDo { session => - assert(session !== null) - assert(session.isClosed === false) - assert(session.getCluster.getMetadata.getClusterName != null) - } - } - - it should "pick up user and password from SparkConf" in { - val conf = defaultConf - .set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") - .set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") - - // would throw exception if connection unsuccessful - val conn2 = CassandraConnector(conf) - conn2.withSessionDo { session => } - } - - "A DataFrame" should "read and write data with valid auth" in { - - val csc = new CassandraSQLContext(sc) - import csc.implicits._ - - val conf = defaultConf - .set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") - .set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") - - val conn = CassandraConnector(conf) - - val personDF1 = sc.parallelize(Seq( - ("Andy", 28, "America"), - ("Kaushal", 25, "India"), - ("Desanth", 27, "India"), - ("Mahendra", 26, "Rajasthan"))) - .toDF("name", "age", "address") - - createKeyspace(conn.openSession()) - personDF1.createCassandraTable(ks, "authtest", Some(Array("address")), Some(Array("age")))(conn) - - val options = Map("spark_cassandra_auth_username" -> "cassandra", - "spark_cassandra_auth_password" -> "cassandra", - "keyspace" -> ks, "table" -> "authtest") - - personDF1.write.format("org.apache.spark.sql.cassandra").options(options).save(); - val personDF2 = csc.read.format("org.apache.spark.sql.cassandra").options(options).load(); - - personDF2.count should be(4) - } + useCassandraConfig(Seq("cassandra-password-auth.yaml.template")) + useSparkConf(defaultConf) + + // Wait for the default user to be created in Cassandra. + Thread.sleep(1000) + + val conf = defaultConf + conf.set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") + conf.set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") + + "A CassandraConnector" should "authenticate with username and password when using native protocol" in { + val conn2 = CassandraConnector(conf) + conn2.withSessionDo { session => + assert(session !== null) + assert(session.isClosed === false) + assert(session.getCluster.getMetadata.getClusterName != null) + } + } + + it should "pick up user and password from SparkConf" in { + val conf = defaultConf + .set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") + .set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") + + // would throw exception if connection unsuccessful + val conn2 = CassandraConnector(conf) + conn2.withSessionDo { session => } + } + + "A DataFrame" should "read and write data with valid auth" in { + val sqlContext = new SQLContext(sc) + + val conf = defaultConf + .set(DefaultAuthConfFactory.UserNameParam.name, "cassandra") + .set(DefaultAuthConfFactory.PasswordParam.name, "cassandra") + + val conn = CassandraConnector(conf) + + val personDF1 = sqlContext.sparkSession.createDataFrame(Seq( + ("Andy", 28, "America"), + ("Kaushal", 25, "India"), + ("Desanth", 27, "India"), + ("Mahendra", 26, "Rajasthan"))).toDF("name", "age", "address") + + createKeyspace(conn.openSession()) + personDF1.createCassandraTable(ks, "authtest", Some(Array("address")), Some(Array("age")))(conn) + + val options = Map("spark_cassandra_auth_username" -> "cassandra", + "spark_cassandra_auth_password" -> "cassandra", + "keyspace" -> ks, "table" -> "authtest") + + personDF1.write.format("org.apache.spark.sql.cassandra").options(options).save() + val personDF2 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(options).load() + + personDF2.count should be(4) + } } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala index 192cf5834..304588c7d 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala @@ -238,8 +238,9 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging } } -case class Test(val epoch:Long, val uri:String, val browser:String, val customer_id:Int) -case class TestPartialColumns(val epoch:Long, val browser:String, val customer_id:Int) +case class Test(epoch: Long, uri: String, browser: String, customer_id: Int) + +case class TestPartialColumns(epoch: Long, browser: String, customer_id: Int) object PushdownEverything extends CassandraPredicateRules { override def apply( diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLClusterLevelSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLClusterLevelSpec.scala index 3c51405c5..7798fdfb7 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLClusterLevelSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLClusterLevelSpec.scala @@ -2,6 +2,7 @@ package com.datastax.spark.connector.sql import scala.concurrent.Future +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase @@ -56,33 +57,33 @@ class CassandraSQLClusterLevelSpec extends SparkCassandraITFlatSpecBase { } ) - var cc: CassandraSQLContext = null + var sqlContext: SQLContext = null private val cluster1 = "cluster1" private val cluster2 = "cluster2" override def beforeAll() { - cc = new CassandraSQLContext(sc) - cc.setConf(cluster1, + sqlContext = new SQLContext(sc) + sqlContext.setCassandraConf(cluster1, ConnectionHostParam.option(getHost(0).getHostAddress) ++ ConnectionPortParam.option(getPort(0))) - cc.setConf(cluster2, + sqlContext.setCassandraConf(cluster2, ConnectionHostParam.option(getHost(1).getHostAddress) ++ ConnectionPortParam.option(getPort(1))) } it should "allow to join tables from different clusters" in { - cc.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1") - cc.read.cassandraFormat("test2", ks, cluster2).load().createOrReplaceTempView("c2_test2") + sqlContext.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1") + sqlContext.read.cassandraFormat("test2", ks, cluster2).load().createOrReplaceTempView("c2_test2") - val result = cc.sql(s"SELECT * FROM c1_test1 AS test1 JOIN c2_test2 AS test2 WHERE test1.a = test2.a").collect() + val result = sqlContext.sql(s"SELECT * FROM c1_test1 AS test1 JOIN c2_test2 AS test2 WHERE test1.a = test2.a").collect() result should have length 2 } it should "allow to write data to another cluster" in { - cc.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1") - cc.read.cassandraFormat("test3", ks, cluster2).load().createOrReplaceTempView("c2_test3") + sqlContext.read.cassandraFormat("test1", ks, cluster1).load().createOrReplaceTempView("c1_test1") + sqlContext.read.cassandraFormat("test3", ks, cluster2).load().createOrReplaceTempView("c2_test3") - val insert = cc.sql(s"INSERT INTO TABLE c2_test3 SELECT * FROM c1_test1 AS t1").collect() - val result = cc.sql(s"SELECT * FROM c2_test3 AS test3").collect() + val insert = sqlContext.sql(s"INSERT INTO TABLE c2_test3 SELECT * FROM c1_test1 AS t1").collect() + val result = sqlContext.sql(s"SELECT * FROM c2_test3 AS test3").collect() result should have length 5 } } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala index 501d81dab..eb6dbf863 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala @@ -2,19 +2,18 @@ package com.datastax.spark.connector.sql import scala.concurrent.Future -import org.apache.spark.sql.cassandra.CassandraSQLContext -import org.scalatest.ParallelTestExecution +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.CassandraConnector -import com.datastax.spark.connector.embedded.SparkTemplate._ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { useCassandraConfig(Seq("cassandra-default.yaml.template")) useSparkConf(defaultConf) val conn = CassandraConnector(defaultConf) - var cc: CassandraSQLContext = null + var sqlContext: SQLContext = null val ks1 = ks + "_1" val ks2 = ks + "_2" @@ -320,234 +319,239 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { ) } - cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) + sqlContext = new SQLContext(sc) + Seq("index_test", "varint_test", "timestamp_conversion_bug", "uuid_inet_type", "export_table", + "objects_copy", "udts", "test_collection", "test_data_type1", "test_data_type", "tuple_test1", + "test3", "test2", "test1") + .foreach(t => sqlContext.read.cassandraFormat(t, ks1).load().createOrReplaceTempView(s"ks1_$t")) + Seq("test3", "test2") + .foreach(t => sqlContext.read.cassandraFormat(t, ks2).load().createOrReplaceTempView(s"ks2_$t")) it should "allow to select all rows" in { - val result = cc.sql(s"SELECT * FROM test1").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1").collect() result should have length 8 } it should "allow to select rows with index columns" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE g = 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE g = 2").collect() result should have length 4 } it should "allow to select rows with indexed columns that do not belong to partition key" in { - val result = cc.sql(s"SELECT * FROM $ks1.index_test WHERE id1 = 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_index_test WHERE id1 = 2").collect() result should have length 1 } it should "allow to select rows with indexed columns that belong to partition key" in { - val result = cc.sql(s"SELECT * FROM $ks1.index_test WHERE ipk1 = 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_index_test WHERE ipk1 = 2").collect() result should have length 1 } it should "allow to select rows with indexed partition and regular columns" in { - val result = cc.sql(s"SELECT * FROM $ks1.index_test WHERE ipk1 = 2 and id1 = 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_index_test WHERE ipk1 = 2 and id1 = 2").collect() result should have length 1 } it should "allow to select rows with >= clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b >= 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b >= 2").collect() result should have length 4 } it should "allow to select rows with > clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b > 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b > 2").collect() result should have length 0 } it should "allow to select rows with < clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b < 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b < 2").collect() result should have length 4 } it should "allow to select rows with <= clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b <= 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b <= 2").collect() result should have length 8 } it should "allow to select rows with in clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b in (1,2)").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b in (1,2)").collect() result should have length 8 } it should "allow to select rows with in clause pushed down" in { - val query = cc.sql(s"SELECT * FROM test2 WHERE a in (1,2)") + val query = sqlContext.sql(s"SELECT * FROM ks1_test2 WHERE a in (1,2)") query.queryExecution.sparkPlan.toString should not include ("Filter (") // No Spark Filter Step val result = query.collect() result should have length 6 } it should "allow to select rows with or clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b = 2 or b = 1").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b = 2 or b = 1").collect() result should have length 8 } it should "allow to select rows with != clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b != 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b != 2").collect() result should have length 4 } it should "allow to select rows with <> clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b <> 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b <> 2").collect() result should have length 4 } it should "allow to select rows with not in clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b not in (1,2)").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b not in (1,2)").collect() result should have length 0 } it should "allow to select rows with is not null clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE b is not null").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE b is not null").collect() result should have length 8 } it should "allow to select rows with like clause" in { - val result = cc.sql(s"SELECT * FROM test2 WHERE name LIKE '%om' ").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test2 WHERE name LIKE '%om' ").collect() result should have length 1 } it should "allow to select rows with between clause" in { - val result = cc.sql(s"SELECT * FROM test2 WHERE a BETWEEN 1 AND 2 ").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test2 WHERE a BETWEEN 1 AND 2 ").collect() result should have length 6 } it should "allow to select rows with alias" in { - val result = cc.sql(s"SELECT a AS a_column, b AS b_column FROM test2").collect() + val result = sqlContext.sql(s"SELECT a AS a_column, b AS b_column FROM ks1_test2").collect() result should have length 9 } it should "allow to select rows with distinct column" in { - val result = cc.sql(s"SELECT DISTINCT a FROM test2").collect() + val result = sqlContext.sql(s"SELECT DISTINCT a FROM ks1_test2").collect() result should have length 3 } it should "allow to select rows with limit clause" in { - val result = cc.sql(s"SELECT * FROM test1 limit 2").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 limit 2").collect() result should have length 2 } it should "allow to select rows with order by clause" in { - val result = cc.sql(s"SELECT * FROM test1 order by d").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 order by d").collect() result should have length 8 } it should "allow to select rows with group by clause" in { - val result = cc.sql(s"SELECT count(*) FROM test1 GROUP BY b").collect() + val result = sqlContext.sql(s"SELECT count(*) FROM ks1_test1 GROUP BY b").collect() result should have length 2 } it should "allow to select rows with union clause" in { - val result = cc.sql(s"SELECT test1.a FROM $ks1.test1 AS test1 UNION DISTINCT SELECT test2.a FROM $ks1.test2 AS test2").collect() + val result = sqlContext.sql(s"SELECT test1.a FROM ks1_test1 AS test1 UNION DISTINCT SELECT test2.a FROM ks1_test2 AS test2").collect() result should have length 3 } it should "allow to select rows with union distinct clause" in { - val result = cc.sql(s"SELECT test1.a FROM $ks1.test1 AS test1 UNION DISTINCT SELECT test2.a FROM $ks1.test2 AS test2").collect() + val result = sqlContext.sql(s"SELECT test1.a FROM ks1_test1 AS test1 UNION DISTINCT SELECT test2.a FROM ks1_test2 AS test2").collect() result should have length 3 } it should "allow to select rows with union all clause" in { - val result = cc.sql(s"SELECT test1.a FROM $ks1.test1 AS test1 UNION ALL SELECT test2.a FROM $ks1.test2 AS test2").collect() + val result = sqlContext.sql(s"SELECT test1.a FROM ks1_test1 AS test1 UNION ALL SELECT test2.a FROM ks1_test2 AS test2").collect() result should have length 17 } it should "allow to select rows with having clause" in { - val result = cc.sql(s"SELECT count(*) FROM test1 GROUP BY b HAVING count(b) > 4").collect() + val result = sqlContext.sql(s"SELECT count(*) FROM ks1_test1 GROUP BY b HAVING count(b) > 4").collect() result should have length 0 } it should "allow to select rows with partition column clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE a = 1 and b = 1 and c = 1").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE a = 1 and b = 1 and c = 1").collect() result should have length 4 } it should "allow to select rows with partition column and cluster column clause" in { - val result = cc.sql(s"SELECT * FROM test1 WHERE a = 1 and b = 1 and c = 1 and d = 1 and e = 1").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test1 WHERE a = 1 and b = 1 and c = 1 and d = 1 and e = 1").collect() result should have length 1 } it should "allow to insert into another table" in { - val result = cc.sql(s"INSERT INTO TABLE test3 SELECT a, b, c FROM test2").collect() - val result2 = cc.sql(s"SELECT a, b, c FROM test3").collect() + val result = sqlContext.sql(s"INSERT INTO TABLE ks1_test3 SELECT a, b, c FROM ks1_test2").collect() + val result2 = sqlContext.sql(s"SELECT a, b, c FROM ks1_test3").collect() result2 should have length 9 } it should "allow to insert into another table in different keyspace" in { - val result = cc.sql(s"INSERT INTO TABLE $ks2.test3 SELECT test2.a, test2.b, test2.c FROM $ks1.test2 as test2").collect() - val result2 = cc.sql(s"SELECT test3.a, test3.b, test3.c FROM $ks2.test3 as test3").collect() + val result = sqlContext.sql(s"INSERT INTO TABLE ks2_test3 SELECT test2.a, test2.b, test2.c FROM ks1_test2 as test2").collect() + val result2 = sqlContext.sql(s"SELECT test3.a, test3.b, test3.c FROM ks2_test3 as test3").collect() result2 should have length 9 } it should "allow to join two tables" in { - val result = cc.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM $ks1.test1 AS test1 " + - s"JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM ks1_test1 AS test1 " + + s"JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 4 } it should "allow to join two tables from different keyspaces" in { - val result = cc.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM $ks1.test1 AS test1 " + - s"JOIN $ks2.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM ks1_test1 AS test1 " + + s"JOIN ks2_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 4 } it should "allow to inner join two tables" in { - val result = cc.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM $ks1.test1 AS test1 " + - s"INNER JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test1.a, test1.b, test1.c, test2.a FROM ks1_test1 AS test1 " + + s"INNER JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 4 } it should "allow to left join two tables" in { - val result = cc.sql(s"SELECT test1.a, test1.b, test1.c, test1.d, test1.e, test1.f FROM $ks1.test1 AS test1 " + - s"LEFT JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test1.a, test1.b, test1.c, test1.d, test1.e, test1.f FROM ks1_test1 AS test1 " + + s"LEFT JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 8 } it should "allow to left outer join two tables" in { - val result = cc.sql(s"SELECT test1.a, test1.b, test1.c, test1.d, test1.e, test1.f FROM $ks1.test1 AS test1 " + - s"LEFT OUTER JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test1.a, test1.b, test1.c, test1.d, test1.e, test1.f FROM ks1_test1 AS test1 " + + s"LEFT OUTER JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 8 } it should "allow to right join two tables" in { - val result = cc.sql(s"SELECT test2.a, test2.b, test2.c FROM $ks1.test1 AS test1 " + - s"RIGHT JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test2.a, test2.b, test2.c FROM ks1_test1 AS test1 " + + s"RIGHT JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 12 } it should "allow to right outer join two tables" in { - val result = cc.sql(s"SELECT test2.a, test2.b, test2.c FROM $ks1.test1 AS test1 " + - s"RIGHT OUTER JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test2.a, test2.b, test2.c FROM ks1_test1 AS test1 " + + s"RIGHT OUTER JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 12 } it should "allow to full join two tables" in { - val result = cc.sql(s"SELECT test2.a, test2.b, test2.c FROM $ks1.test1 AS test1 " + - s"FULL JOIN $ks1.test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() + val result = sqlContext.sql(s"SELECT test2.a, test2.b, test2.c FROM ks1_test1 AS test1 " + + s"FULL JOIN ks1_test2 AS test2 ON test1.a = test2.a AND test1.b = test2.b AND test1.c = test2.c").collect() result should have length 16 } it should "allow to select rows for collection columns" in { - val result = cc.sql(s"SELECT * FROM test_collection").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test_collection").collect() result should have length 1 } it should "allow to select rows for data types of ASCII, INT, FLOAT, DOUBLE, BIGINT, BOOLEAN, DECIMAL, INET, TEXT, TIMESTAMP, UUID, VARINT, SMALLINT" in { - val result = cc.sql(s"SELECT * FROM test_data_type").collect() + val result = sqlContext.sql(s"SELECT * FROM ks1_test_data_type").collect() result should have length 1 } it should "allow to insert rows for data types of ASCII, INT, FLOAT, DOUBLE, BIGINT, BOOLEAN, DECIMAL, INET, TEXT, TIMESTAMP, UUID, VARINT, SMALLINT" in { - val result = cc.sql(s"INSERT INTO TABLE test_data_type1 SELECT * FROM test_data_type").collect() - val result1 = cc.sql(s"SELECT * FROM test_data_type1").collect() + val result = sqlContext.sql(s"INSERT INTO TABLE ks1_test_data_type1 SELECT * FROM ks1_test_data_type").collect() + val result1 = sqlContext.sql(s"SELECT * FROM ks1_test_data_type1").collect() result1 should have length 1 } it should "allow to select specified non-UDT columns from a table containing some UDT columns" in { - val result = cc.sql(s"SELECT key, name FROM udts").collect() + val result = sqlContext.sql(s"SELECT key, name FROM ks1_udts").collect() result should have length 1 val row = result.head row.getInt(0) should be(1) @@ -556,10 +560,7 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { //TODO: SPARK-9269 is opened to address Set matching issue. I change the Set data type to List for now it should "allow to select UDT collection column and nested UDT column" in { - - val cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) - val result = cc + val result = sqlContext .read .format("org.apache.spark.sql.cassandra") .options( @@ -574,10 +575,7 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { } it should "allow writing UDTs to C* tables" in { - - val cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) - val result = cc + val result = sqlContext .read .format("org.apache.spark.sql.cassandra") .options( @@ -599,42 +597,35 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { // Regression test for #454: java.util.NoSuchElementException thrown when accessing timestamp field using CassandraSQLContext it should "allow to restrict a clustering timestamp column value" in { - val cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) - cc.cassandraSql("select objectid, meterid, utcstamp from export_table where meterid = 4317 and utcstamp > '2013-07-26 20:30:00-0700'").collect() + sqlContext.sql("select objectid, meterid, utcstamp from ks1_export_table where meterid = 4317 and utcstamp > '2013-07-26 20:30:00-0700'").collect() } it should "allow to min/max timestamp column" in { - val cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) - cc.cassandraSql("select k, min(d), max(d) from timestamp_conversion_bug group by k").collect() + sqlContext.sql("select k, min(d), max(d) from ks1_timestamp_conversion_bug group by k").collect() } it should "be able to push down filter on UUID and Inet columns" in { - val cc = new CassandraSQLContext(sc) - cc.setKeyspace(ks1) - val result = cc.cassandraSql( + val result = sqlContext.sql( "select * " + - "from uuid_inet_type " + + "from ks1_uuid_inet_type " + "where b > '74.125.239.135'").collect() result should have length 1 - val result1 = cc.cassandraSql( + val result1 = sqlContext.sql( "select * " + - "from uuid_inet_type " + + "from ks1_uuid_inet_type " + "where a < '123e4567-e89b-12d3-a456-426655440000'").collect() result1 should have length 1 - val result2 = cc.cassandraSql( + val result2 = sqlContext.sql( "select * " + - "from uuid_inet_type " + + "from ks1_uuid_inet_type " + "where a = '123e4567-e89b-12d3-a456-426655440000' and b = '74.125.239.135'").collect() result2 should have length 1 } it should "be able to push down filter on varint columns" in { - val cc = new CassandraSQLContext(sc) - cc.sql( + sqlContext.sql( s""" - |SELECT * FROM $ks1.varint_test + |SELECT * FROM ks1_varint_test |WHERE id = 1234567891234 | AND series = 1234567891235 | AND rollup_minutes = 1234567891236 @@ -643,7 +634,7 @@ class CassandraSQLSpec extends SparkCassandraITFlatSpecBase { } it should "read C* Tuple using sql" in { - val df = cc.sql(s"SELECT * FROM $ks1.tuple_test1") + val df = sqlContext.sql(s"SELECT * FROM ks1_tuple_test1") df.count should be (1) df.first.getStruct(1).getString(0) should be ("xyz") diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/util/ConfigCheck.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/util/ConfigCheck.scala index 4e3cc6370..1142ee3f1 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/util/ConfigCheck.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/util/ConfigCheck.scala @@ -1,13 +1,14 @@ package com.datastax.spark.connector.util -import com.datastax.spark.connector.cql.{CassandraConnectionFactory, AuthConfFactory, CassandraConnectorConf} -import com.datastax.spark.connector.rdd.ReadConf -import com.datastax.spark.connector.types.ColumnTypeConf -import com.datastax.spark.connector.writer.WriteConf import org.apache.commons.configuration.ConfigurationException import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.cassandra.{CassandraSourceRelation, CassandraSQLContext} +import org.apache.spark.sql.cassandra.{CassandraSQLContextParams, CassandraSourceRelation} + +import com.datastax.spark.connector.cql.{AuthConfFactory, CassandraConnectionFactory, CassandraConnectorConf} +import com.datastax.spark.connector.rdd.ReadConf +import com.datastax.spark.connector.types.ColumnTypeConf +import com.datastax.spark.connector.writer.WriteConf /** * Helper class to throw exceptions if there are environment variables in the spark.cassandra @@ -26,7 +27,7 @@ object ConfigCheck { CassandraConnectorConf.Properties ++ AuthConfFactory.Properties ++ CassandraConnectionFactory.Properties ++ - CassandraSQLContext.Properties ++ + CassandraSQLContextParams.Properties ++ CassandraSourceRelation.Properties ++ ColumnTypeConf.Properties @@ -36,6 +37,7 @@ object ConfigCheck { /** * Checks the SparkConf Object for any unknown spark.cassandra.* properties and throws an exception * with suggestions if an unknown property is found. + * * @param conf SparkConf object to check */ def checkConfig(conf: SparkConf): Unit = { @@ -84,6 +86,7 @@ object ConfigCheck { /** * Exception to be thrown when unknown properties are found in the SparkConf + * * @param unknownProps Properties that have no mapping to known Spark Cassandra Connector properties * @param suggestionMap A map possibly containing suggestions for each of of the unknown properties */ diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala deleted file mode 100644 index 0bd0facf2..000000000 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala +++ /dev/null @@ -1,109 +0,0 @@ -package org.apache.spark.sql.cassandra - -import scala.util.Try - -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.cassandra.CassandraSourceRelation._ -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.aggregate.Count -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation - -import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, Schema} - -private[cassandra] class CassandraCatalog( - cs: CassandraSession, - fnResourceLoader: FunctionResourceLoader, - fnRegistry: FunctionRegistry, - conf: CatalystConf, - hadoopConf: Configuration -) extends SessionCatalog(cs.externalCatalog, fnResourceLoader, fnRegistry, conf, hadoopConf) { - - private val csc = cs.wrapped match { - case sqlCtx: CassandraSQLContext => sqlCtx - case sqlCtx: SQLContext => - val ctx = new CassandraSQLContext(cs) - cs.setWrappedContext(ctx) - ctx - } - - val caseSensitive: Boolean = true - - /** A cache of Spark SQL data source tables that have been accessed. Cache is thread safe.*/ - private[cassandra] val cachedDataSourceTables: LoadingCache[TableIdentifier, LogicalPlan] = { - val cacheLoader = new CacheLoader[TableIdentifier, LogicalPlan]() { - override def load(tableIdent: TableIdentifier): LogicalPlan = { - logDebug(s"Creating new cached data source for $tableIdent") - buildRelation(tableIdent) - } - } - CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) - } - - override def lookupRelation(tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { - Try(cachedDataSourceTables.get(tableIdent)) - .map(plan => alias.map(a => SubqueryAlias(a, plan)).getOrElse(plan)) - .getOrElse(super.lookupRelation(tableIdent, alias)) - } - - /** Build logic plan from a CassandraSourceRelation */ - private def buildRelation(tableIdentifier: TableIdentifier): LogicalPlan = { - val (cluster, database, table) = getClusterDBTableNames(tableIdentifier) - val tableRef = TableRef(table, database, Option(cluster)) - val sourceRelation = CassandraSourceRelation(tableRef, csc, CassandraSourceOptions()) - SubqueryAlias(table, LogicalRelation(sourceRelation)) - } - - /** Return cluster, database and table names from a table identifier*/ - private def getClusterDBTableNames(tableIdent: TableIdentifier): (String, String, String) = { - val database = tableIdent.database.getOrElse(csc.getKeyspace) - val table = tableIdent.table - (csc.getCluster, database, table) - } - - override def databaseExists(db: String): Boolean = { - val cluster = csc.getCluster - val tableRef = TableRef("", db, Option(cluster)) - val schema = Schema.fromCassandra(getCassandraConnector(tableRef), Some(db)) - schema.keyspaces.nonEmpty || super.databaseExists(db) - } - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - val (cluster, database, table) = getClusterDBTableNames(tableIdent) - val cached = cachedDataSourceTables.asMap().containsKey(tableIdent) - if (cached) { - true - } else { - val tableRef = TableRef(table, database, Option(cluster)) - val schema = Schema.fromCassandra(getCassandraConnector(tableRef)) - val tabDef = - for (ksDef <- schema.keyspaceByName.get(database); - tabDef <- ksDef.tableByName.get(table)) yield tabDef - tabDef.nonEmpty - } - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - cachedDataSourceTables.refresh(tableIdent) - } - - - override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { - if (name.funcName.toLowerCase == "count") { - return Count(children) - } - super.lookupFunction(name, children) - } - - private def getCassandraConnector(tableRef: TableRef) : CassandraConnector = { - val sparkConf = csc.sparkContext.getConf.clone() - val sqlConf = csc.getAllConfs - val conf = consolidateConfs(sparkConf, sqlConf, tableRef, Map.empty) - new CassandraConnector(CassandraConnectorConf(conf)) - } -} diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala deleted file mode 100644 index f8974cb6e..000000000 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala +++ /dev/null @@ -1,130 +0,0 @@ -package org.apache.spark.sql.cassandra - -import java.util.NoSuchElementException - -import com.datastax.spark.connector.util.ConfigParameter -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SQLContext, execution => sparkexecution} - -/** Allows to execute SQL queries against Cassandra and access results as - * `SchemaRDD` collections. Predicate pushdown to Cassandra is supported. - * - * Example: - * {{{ - * import com.datastax.spark.connector._ - * - * val sparkMasterHost = "127.0.0.1" - * val cassandraHost = "127.0.0.1" - * - * // Tell Spark the address of one Cassandra node: - * val conf = new SparkConf(true).set("spark.cassandra.connection.host", cassandraHost) - * - * // Connect to the Spark cluster: - * val sc = new SparkContext("spark://" + sparkMasterHost + ":7077", "example", conf) - * - * // Create CassandraSQLContext: - * val cc = new CassandraSQLContext(sc) - * - * // Execute SQL query: - * val rdd = cc.sql("SELECT * FROM keyspace.table ...") - * - * }}} */ -class CassandraSQLContext(ss: CassandraSession) extends SQLContext(ss) { - import org.apache.spark.sql.cassandra.CassandraSQLContext._ - - def this(sc: SparkContext) = this(new CassandraSession(sc)) - - def cassandraSession = this.sparkSession.asInstanceOf[CassandraSession] - - ss.setWrappedContext(this) - - /** Set default Cassandra keyspace to be used when accessing tables with unqualified names. */ - def setKeyspace(ks: String) = { - this.setConf(KSNameParam.name, ks) - } - - /** Set current used database name. Database is equivalent to keyspace */ - def setDatabase(db: String) = setKeyspace(db) - - /** Set current used cluster name */ - def setCluster(cluster: String) = { - this.setConf(SqlClusterParam.name, cluster) - } - - /** Get current used cluster name */ - def getCluster : String = this.getConf(SqlClusterParam.name, SqlClusterParam.default) - - /** - * Returns keyspace/database set previously by [[setKeyspace]] or throws IllegalStateException if - * keyspace has not been set yet. - */ - def getKeyspace: String = { - try { - this.getConf(KSNameParam.name) - } catch { - case _: NoSuchElementException => - throw new IllegalStateException("Default keyspace not set. Please call CassandraSqlContext#setKeyspace.") - } - } - - /** Executes SQL query against Cassandra and returns DataFrame representing the result. */ - def cassandraSql(cassandraQuery: String): DataFrame = sql(cassandraQuery) - - /** Set the Spark Cassandra Connector configuration parameters */ - def setConf(options: Map[String, String]): CassandraSQLContext = { - setConf(SqlClusterParam.default, options) - } - - /** Set the Spark Cassandra Connector configuration parameters which will be used when accessing - * a given cluster */ - def setConf( - cluster: String, - options: Map[String, String]): CassandraSQLContext = { - checkOptions(options) - for ((k, v) <- options) setConf(s"$cluster/$k", v) - this - } - - /** Set the Spark Cassandra Connector configuration parameters which will be used when accessing - * a given keyspace in a given cluster */ - def setConf( - cluster: String, - keyspace: String, - options: Map[String, String]): CassandraSQLContext = { - checkOptions(options) - for ((k, v) <- options) setConf(s"$cluster:$keyspace/$k", v) - this - } - - private def checkOptions(options: Map[String, String]): Unit = { - options.keySet.foreach { name => - require(DefaultSource.confProperties.contains(name), - s"Unrelated parameter. You can only set the following parameters: ${DefaultSource.confProperties.mkString(", ")}") - } - } -} - -object CassandraSQLContext { - // Should use general used database than Cassandra specific keyspace? - // Other source tables don't have keyspace concept. We should make - // an effort to set CassandraSQLContext a more database like to join - // tables from other sources. Keyspace is equivalent to database in SQL world - val ReferenceSection = "Cassandra SQL Context Options" - - val KSNameParam = ConfigParameter[Option[String]]( - name = "spark.cassandra.sql.keyspace", - section = ReferenceSection, - default = None, - description = """Sets the default keyspace""") - - val SqlClusterParam = ConfigParameter[String]( - name = "spark.cassandra.sql.cluster", - section = ReferenceSection, - default = "default", - description = "Sets the default Cluster to inherit configuration from") - - val Properties = Seq( - KSNameParam, - SqlClusterParam - ) -} diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSession.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSession.scala deleted file mode 100644 index 995401d1f..000000000 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSession.scala +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.spark.sql.cassandra - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.internal.SessionState - -class CassandraSession(sc: SparkContext) extends SparkSession(sc) { - - import CassandraSession._ - - override lazy val sessionState: SessionState = new CassandraSessionState(this) - -} - -object CassandraSession { - - class CassandraSessionState(session: CassandraSession) extends SessionState(session) { - override lazy val catalog: SessionCatalog = new CassandraCatalog( - session, - functionResourceLoader, - functionRegistry, - conf, - newHadoopConf()) - } - -} - diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/package.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/package.scala index 2cd75f72e..ecbd94d8b 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/package.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/package.scala @@ -1,5 +1,9 @@ package org.apache.spark.sql +import scala.language.implicitConversions + +import com.datastax.spark.connector.util.ConfigParameter + package object cassandra { /** A data frame format used to access Cassandra through Connector */ @@ -52,4 +56,70 @@ package object cassandra { } } + implicit class CassandraSQLContextFunctions(val sqlContext: SQLContext) extends AnyVal { + + import org.apache.spark.sql.cassandra.CassandraSQLContextParams._ + + /** Set current used cluster name */ + def setCluster(cluster: String): SQLContext = { + sqlContext.setConf(SqlClusterParam.name, cluster) + sqlContext + } + + /** Get current used cluster name */ + def getCluster: String = sqlContext.getConf(SqlClusterParam.name, SqlClusterParam.default) + + /** Set the Spark Cassandra Connector configuration parameters */ + def setCassandraConf(options: Map[String, String]): SQLContext = { + setCassandraConf(SqlClusterParam.default, options) + sqlContext + } + + /** Set the Spark Cassandra Connector configuration parameters which will be used when accessing + * a given cluster */ + def setCassandraConf( + cluster: String, + options: Map[String, String]): SQLContext = { + checkOptions(options) + for ((k, v) <- options) sqlContext.setConf(s"$cluster/$k", v) + sqlContext + } + + /** Set the Spark Cassandra Connector configuration parameters which will be used when accessing + * a given keyspace in a given cluster */ + def setCassandraConf( + cluster: String, + keyspace: String, + options: Map[String, String]): SQLContext = { + checkOptions(options) + for ((k, v) <- options) sqlContext.setConf(s"$cluster:$keyspace/$k", v) + sqlContext + } + + private def checkOptions(options: Map[String, String]): Unit = { + options.keySet.foreach { name => + require(DefaultSource.confProperties.contains(name), + s"Unrelated parameter. You can only set the following parameters: ${DefaultSource.confProperties.mkString(", ")}") + } + } + } + + object CassandraSQLContextParams { + // Should use general used database than Cassandra specific keyspace? + // Other source tables don't have keyspace concept. We should make + // an effort to set CassandraSQLContext a more database like to join + // tables from other sources. Keyspace is equivalent to database in SQL world + val ReferenceSection = "Cassandra SQL Context Options" + + val SqlClusterParam = ConfigParameter[String]( + name = "spark.cassandra.sql.cluster", + section = ReferenceSection, + default = "default", + description = "Sets the default Cluster to inherit configuration from") + + val Properties = Seq( + SqlClusterParam + ) + } + } From 9f363e94928052eedc4124a217bd39a5fcb4f1a8 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 14 Jul 2016 12:40:19 +0200 Subject: [PATCH 2/2] SPARKC-399: Replace underscore based properties in favour of dot based properties --- CHANGES.txt | 2 +- doc/14_data_frames.md | 6 +--- .../CassandraAuthenticatedConnectorSpec.scala | 4 +-- .../sql/CassandraDataFrameSpec.scala | 2 +- .../spark/sql/cassandra/DefaultSource.scala | 31 +++++-------------- 5 files changed, 13 insertions(+), 32 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0bbdcf869..35ad6af84 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 2.0.0 M1 - * Removed CassandraSqlContext (SPARKC-399) + * Removed CassandraSqlContext and underscore based options (SPARKC-399) * Upgrade to Spark 2.0.0-preview (SPARKC-396) - Removed Twitter demo because there is no spark-streaming-twitter package available anymore - Removed Akka Actor demo becaues there is no support for such streams anymore diff --git a/doc/14_data_frames.md b/doc/14_data_frames.md index 94cc5d695..16bf77b00 100644 --- a/doc/14_data_frames.md +++ b/doc/14_data_frames.md @@ -129,10 +129,6 @@ Accessing data Frames using Spark SQL involves creating temporary tables and spe source as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to establish a relation between the CassandraTable and the internally used DataSource. -Because of a limitation in SparkSQL, SparkSQL `OPTIONS` must have their -`.` characters replaced with `_`. This means `spark.cassandra.input.split.size_in_mb` becomes -`spark_cassandra_input_split_size_in_mb`. - Example Creating a Source Using Spark SQL: Create Relation with the Cassandra table test.words @@ -214,7 +210,7 @@ the option which it represents to a `DataFrameReader` or `DataFrameWriter`. Suppose we want to set `spark.cassandra.read.timeout_ms` to 7 seconds on some `DataFrameReader`, we can do this both ways: ```scala -option("spark_cassandra_read_timeout_ms", "7000") +option("spark.cassandra.read.timeout_ms", "7000") ``` Since this setting is represented by `CassandraConnectorConf.ReadTimeoutParam` we can simply do: ```scala diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala index 93cf4b2a7..f547d2b1a 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala @@ -53,8 +53,8 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase { createKeyspace(conn.openSession()) personDF1.createCassandraTable(ks, "authtest", Some(Array("address")), Some(Array("age")))(conn) - val options = Map("spark_cassandra_auth_username" -> "cassandra", - "spark_cassandra_auth_password" -> "cassandra", + val options = Map("spark.cassandra.auth.username" -> "cassandra", + "spark.cassandra.auth.password" -> "cassandra", "keyspace" -> ks, "table" -> "authtest") personDF1.write.format("org.apache.spark.sql.cassandra").options(options).save() diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala index 97a6e4301..20299f7d9 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala @@ -99,7 +99,7 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { Map( "table" -> "kv_copy", "keyspace" -> ks, - "spark_cassandra_output_ttl" -> "300" + "spark.cassandra.output.ttl" -> "300" ) ) .save() diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala index e6b02e844..8c9611faa 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DefaultSource.scala @@ -25,9 +25,9 @@ import com.datastax.spark.connector.writer.WriteConf * keyspace "keyspace", * cluster "test_cluster", * pushdown "true", - * spark_cassandra_input_page_row_size "10", - * spark_cassandra_output_consistency_level "ONE", - * spark_cassandra_connection_timeout_ms "1000" + * spark.cassandra.input.fetch.size_in_rows "10", + * spark.cassandra.output.consistency.level "ONE", + * spark.cassandra.connection.timeout_ms "1000" * ) */ class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with Logging { @@ -40,9 +40,9 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr * keyspace -- keyspace name, required * cluster -- cluster name, optional, default name is "default" * pushdown -- true/false, optional, default is true - * Cassandra connection settings -- optional, e.g. spark_cassandra_connection_timeout_ms - * Cassandra Read Settings -- optional, e.g. spark_cassandra_input_page_row_size - * Cassandra Write settings -- optional, e.g. spark_cassandra_output_consistency_level + * Cassandra connection settings -- optional, e.g. spark.cassandra.connection.timeout_ms + * Cassandra Read Settings -- optional, e.g. spark.cassandra.input.fetch.size_in_rows + * Cassandra Write settings -- optional, e.g. spark.cassandra.output.consistency.level * * When push_down is true, some filters are pushed down to CQL. * @@ -135,24 +135,9 @@ object DefaultSource { AuthConfFactory.Properties.map(_.name) ++ DefaultAuthConfFactory.properties - // Dot is not allowed in Options key for Spark SQL parsers, so convert . to _ - // Map converted property to origin property name - // TODO check SPARK 1.4 it may be fixed - private val propertiesMap : Map[String, String] = { - confProperties.map(prop => (prop.replace(".", "_"), prop)).toMap - } - /** Construct a map stores Cassandra Conf settings from options */ - def buildConfMap(parameters: Map[String, String]) : Map[String, String] = { - val confMap = mutable.Map.empty[String, String] - for (convertedProp <- propertiesMap.keySet) { - val setting = parameters.get(convertedProp) - if (setting.nonEmpty) { - confMap += propertiesMap(convertedProp) -> setting.get - } - } - confMap.toMap - } + def buildConfMap(parameters: Map[String, String]): Map[String, String] = + parameters.filterKeys(confProperties.contains) /** Check whether the provider is Cassandra datasource or not */ def cassandraSource(provider: String) : Boolean = {