From b9d5abb6b5fdb7b56c2a3e4197d00887c61abf9a Mon Sep 17 00:00:00 2001 From: Jaroslaw Grabowski Date: Tue, 24 May 2016 13:58:04 +0200 Subject: [PATCH] SPARKC-385 add driver.LocalDate conversion Conversion has to be added because driver's LocalDate is not serializable. GettableData now converts LocalDate to Joda's LocalDate. --- .../connector/rdd/CassandraRDDSpec.scala | 39 ++++++++++++++++++- .../sql/CassandraDataFrameSpec.scala | 27 +++++++++++++ .../spark/connector/GettableData.scala | 3 +- .../spark/connector/types/TypeConverter.scala | 1 + .../spark/sql/cassandra/CassandraSQLRow.scala | 1 + 5 files changed, 69 insertions(+), 2 deletions(-) diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index 40cfc6091..63642ce6a 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -7,7 +7,7 @@ import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.reflect.runtime.universe.typeTag -import org.joda.time.DateTime +import org.joda.time.{DateTime, LocalDate} import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector @@ -184,6 +184,11 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { session.execute(s"insert into $ks.tuple_test3 (id, t) VALUES (0, (1, ('foo', 2.3)))") session.execute(nestedTupleTable("tuple_test4")) session.execute(nestedTupleTable("tuple_test5")) + }, + + Future { + session.execute(s"create table $ks.date_test (key int primary key, dd date)") + session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')") } ) } @@ -1032,4 +1037,36 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { session.execute(s"select count(1) from $ks.tuple_test5").one().getLong(0) should be (2) } } + + it should "write Java dates as C* date type" in { + val dateRow = (7, new Date()) + val dateTimeRow = (8, DateTime.now()) + + sc.parallelize(List(dateRow, dateTimeRow)) + .saveToCassandra(ks, "date_test") + + val resultSet = conn.withSessionDo { session => + session.execute( + s"select count(1) from $ks.date_test where key in (${dateRow._1}, ${dateTimeRow._1})") + } + resultSet.one().getLong(0) should be(2) + } + + it should "read C* row with dates as Java dates" in { + val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda + val row = sc.cassandraTable(ks, "date_test").where("key = 1").first + + row.getInt("key") should be(1) + row.getDate("dd") should be(expected.toDate) + row.getDateTime("dd").toLocalDate should be(expected) + } + + it should "read LocalDate as tuple value with given type" in { + val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda + val date = sc.cassandraTable[(Int, Date)](ks, "date_test").where("key = 1").first._2 + val dateTime = sc.cassandraTable[(Int, DateTime)](ks, "date_test").where("key = 1").first._2 + + date should be(expected.toDate) + dateTime.toLocalDate should be(expected) + } } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala index a5a501f68..d9a962d93 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala @@ -9,6 +9,7 @@ import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.sql.SQLContext +import org.joda.time.LocalDate class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { useCassandraConfig(Seq("cassandra-default.yaml.template")) @@ -55,6 +56,12 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { session.execute(s"CREATE TABLE $ks.tuple_test1 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"CREATE TABLE $ks.tuple_test2 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"INSERT INTO $ks.tuple_test1 (id, t) VALUES (1, ('xyz', 3))") + }, + + Future { + session.execute(s"create table $ks.date_test (key int primary key, dd date)") + session.execute(s"create table $ks.date_test2 (key int primary key, dd date)") + session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')") } ) } @@ -185,5 +192,25 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { } } + it should "read and write C* LocalDate columns" in { + val df = sqlContext + .read + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> "date_test", "keyspace" -> ks, "cluster" -> "ClusterOne")) + .load + + df.count should be (1) + df.first.getDate(1) should be (new LocalDate(1930, 5, 31).toDate) + + df.write + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> "date_test2", "keyspace" -> ks, "cluster" -> "ClusterOne")) + .save + + conn.withSessionDo { session => + session.execute(s"select count(1) from $ks.date_test2").one().getLong(0) should be (1) + } + } + } \ No newline at end of file diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala index c21d0246a..caf265085 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala @@ -4,7 +4,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import com.datastax.driver.core.{Row, UDTValue => DriverUDTValue, TupleValue => DriverTupleValue} +import com.datastax.driver.core.{LocalDate, Row, TupleValue => DriverTupleValue, UDTValue => DriverUDTValue} import com.datastax.spark.connector.types.TypeConverter.StringConverter import com.datastax.spark.connector.util.ByteBufferUtil @@ -80,6 +80,7 @@ object GettableData { case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue) case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue) + case localDate: LocalDate => new org.joda.time.LocalDate(localDate.getMillisSinceEpoch) case other => other.asInstanceOf[AnyRef] } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index e2bf2a6bf..1cc52c9ef 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -302,6 +302,7 @@ object TypeConverter { case x: UUID if x.version() == 1 => new Date(x.timestamp()) case x: LocalDate => new Date(x.getMillisSinceEpoch) case x: String => TimestampParser.parse(x) + case x: org.joda.time.LocalDate => x.toDate } } diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index 865ded9f0..af63d378d 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -78,6 +78,7 @@ object CassandraSQLRow { value match { case date: Date => new Timestamp(date.getTime) case localDate: LocalDate => new java.sql.Date(subtractTimeZoneOffset(localDate.getMillisSinceEpoch)) + case localDate: org.joda.time.LocalDate => new java.sql.Date(localDate.toDate.getTime) case str: String => UTF8String.fromString(str) case bigInteger: BigInteger => Decimal(bigInteger.toString) case inetAddress: InetAddress => UTF8String.fromString(inetAddress.getHostAddress)