From 8133089c5ca42b1ffe4995300d06752235918c0b Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Tue, 14 Jun 2016 20:08:33 -0700 Subject: [PATCH] SPARKC-391: Fix conversion of LocalDate to Joda LocalDate We transform LocalDate objects to JodaLocal date objects so that we'll have a serializable AnyRef fo moving around in Spark. Previously this was done by just using the default constructor which assumes the timestamp provided is in the DefaultTimezone. This is incorrect at certain times of the year in certain places since LocalDate will give back the Timestamp in GMT. Previously we were using some of our own home grown normalization of timestamps for converting too and from java.sql.Date. This has led to some errors and difficult to maintain code. In this commit we replace all previous methods with a pass through of Joda's internal normalizing machinery. In addition we now run through all our Date tests in both PST, and CET on Jenkins This PR fixes instantiating various Date objects while reading C* table with date type columns. When C* date column is read, it's values are eventually converted to java.sql.Date, java.util.Date or Joda's DateTime or others. This PR makes returned dates consistent. Connector returns date objects set to midnight in user's default timezone. For example, if I choose to get java.util.Date out of C* value 2016-12-25 I would get (on my laptop): java.util.Date = Sun Dec 25 00:00:00 CET 2016 Because default timezone is cached by Spark, we cannot run tests against different timezones in single JVM. This PR introduces tests for two different timezones that are executed in two different JVMs. --- project/Settings.scala | 4 + .../connector/rdd/CassandraRDDSpec.scala | 10 +-- .../rdd/typeTests/AbstractDateTypeTest.scala | 85 +++++++++++++++++++ .../rdd/typeTests/AbstractTypeTest.scala | 2 +- .../rdd/typeTests/DateTypeCETTest.scala | 15 ++++ .../rdd/typeTests/DateTypePSTTest.scala | 15 ++++ .../rdd/typeTests/DateTypeTest.scala | 30 ------- .../sql/CassandraDataFrameDateBehaviors.scala | 84 ++++++++++++++++++ .../sql/CassandraDataFrameDateCETSpec.scala | 15 ++++ .../sql/CassandraDataFrameDatePSTSpec.scala | 15 ++++ .../sql/CassandraDataFrameSpec.scala | 30 ------- .../spark/connector/GettableData.scala | 5 +- .../spark/connector/types/TypeConverter.scala | 79 +++++++++++------ .../spark/sql/cassandra/CassandraSQLRow.scala | 14 +-- .../spark/connector/GettableDataSpec.scala | 45 ++++++++++ .../connector/types/TypeConverterTest.scala | 13 ++- 16 files changed, 350 insertions(+), 111 deletions(-) create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala delete mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala create mode 100644 spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala diff --git a/project/Settings.scala b/project/Settings.scala index b7afaadbf..4fdda3664 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -259,6 +259,8 @@ object Settings extends Build { if (test.name.toLowerCase.contains("auth")) "auth" else if (test.name.toLowerCase.contains("ssl")) "ssl" else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec" + else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec" + else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec" else test.name.reverse.dropWhile(_ != '.').reverse } @@ -274,6 +276,8 @@ object Settings extends Build { else if (pkgName.contains(".repl")) "repl" else if (pkgName.contains(".streaming")) "streaming" else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec" + else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec" + else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec" else "other" } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index d2079ec38..60caecfa7 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -1059,16 +1059,16 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { val row = sc.cassandraTable(ks, "date_test").where("key = 1").first row.getInt("key") should be(1) - row.getDate("dd") should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate) - row.getDateTime("dd").toLocalDate should be(expected) + row.getDate("dd") should be(expected.toDateTimeAtStartOfDay.toDate) + row.get[LocalDate]("dd") should be(expected) } it should "read LocalDate as tuple value with given type" in { val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda val date = sc.cassandraTable[(Int, Date)](ks, "date_test").where("key = 1").first._2 - val dateTime = sc.cassandraTable[(Int, DateTime)](ks, "date_test").where("key = 1").first._2 + val localDate = sc.cassandraTable[(Int, LocalDate)](ks, "date_test").where("key = 1").first._2 - date should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate) - dateTime.toLocalDate should be(expected) + date should be(expected.toDateTimeAtStartOfDay.toDate) + localDate should be(expected) } } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala new file mode 100644 index 000000000..baee1fb38 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala @@ -0,0 +1,85 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.sql.{Date => SqlDate} +import java.text.SimpleDateFormat +import java.util.{TimeZone, Date => UtilDate} + +import scala.reflect.ClassTag + +import org.joda.time.{DateTime, DateTimeZone, LocalDate => JodaLocalDate} + +import com.datastax.driver.core.{Row, LocalDate => DriverLocalDate} +import com.datastax.spark.connector.rdd.reader.RowReaderFactory +import com.datastax.spark.connector.types.TypeConverter +import com.datastax.spark.connector.writer.RowWriterFactory + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +abstract class AbstractDateTypeTest[TestType: ClassTag]( + val testTimeZone: TimeZone)( + implicit + tConverter: TypeConverter[TestType], + rowReaderNormal: RowReaderFactory[(TestType, TestType, TestType, TestType)], + rowReaderCollection: RowReaderFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])], + rowReaderNull: RowReaderFactory[(TestType, TestType, Option[TestType], Set[TestType], Map[TestType, TestType], Seq[TestType])], + rowWriterNormal: RowWriterFactory[(TestType, TestType, TestType, TestType)], + rowWriterCollection: RowWriterFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])], + rowWriterNull: RowWriterFactory[(TestType, TestType, Null, Null, Null, Null)]) + extends AbstractTypeTest[TestType, DriverLocalDate] { + + TimeZone.setDefault(testTimeZone) + DateTimeZone.setDefault(DateTimeZone.forTimeZone(testTimeZone)) + + private val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d).*""".r + + protected def stringToDate(str: String): TestType + + protected def dateToString(date: TestType): String + + override protected val typeName: String = "date" + + override protected lazy val typeData: Seq[TestType] = Seq( + stringToDate("2015-05-01"), + stringToDate("2015-05-10"), + stringToDate("2015-05-20"), + stringToDate("1950-03-05")) + + override protected lazy val addData: Seq[TestType] = Seq( + stringToDate("2011-05-01"), + stringToDate("2011-05-10"), + stringToDate("2011-05-20"), + stringToDate("1950-01-01")) + + override def convertToDriverInsertable(testValue: TestType): DriverLocalDate = { + dateToString(testValue) match { + case dateRegx(year, month, day) => + DriverLocalDate.fromYearMonthDay(year.toInt, month.toInt, day.toInt) + } + } + + override def getDriverColumn(row: Row, colName: String): TestType = { + val ld = row.getDate(colName) + stringToDate(ld.toString) + } +} + +abstract class DateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[UtilDate](timeZone) { + private val format = new SimpleDateFormat("yyyy-MM-dd") + + override def stringToDate(str: String): UtilDate = format.parse(str) + + override def dateToString(date: UtilDate): String = format.format(date) +} + +abstract class DateTimeTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[DateTime](timeZone) { + override def stringToDate(str: String): DateTime = JodaLocalDate.parse(str).toDateTimeAtStartOfDay + + override def dateToString(date: DateTime): String = date.toLocalDate.toString +} + +abstract class SqlDateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[SqlDate](timeZone) { + override def stringToDate(str: String): SqlDate = SqlDate.valueOf(str) + + override def dateToString(date: SqlDate): String = date.toString +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala index 1e60dfda1..564c01f2e 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala @@ -15,7 +15,7 @@ import com.datastax.spark.connector.writer.RowWriterFactory /** * A template class for testing that various CQL Types work with the spark Cassandra Connector - * When creating an iimplementationof this test you must provide two types, + * When creating an implementation of this test you must provide two types, * The TestType is used to extract values from Cassandra ie: sc.cassandraTable[TestType] * and saving values in cassandra via sc.parallelize(x:TesType).saveToCassandra() * The DriverType is used for inserting values into C* via the javaDriver diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala new file mode 100644 index 000000000..7344d8fe1 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.util.TimeZone + +/** + * Following tests are executed in separate JVM + */ +class DateTimeTypeCETTest extends DateTimeTypeTest(TimeZone.getTimeZone("CET")) { +} + +class DateTypeCETTest extends DateTypeTest(TimeZone.getTimeZone("CET")) { +} + +class SqlDateTypeCETTest extends SqlDateTypeTest(TimeZone.getTimeZone("CET")) { +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala new file mode 100644 index 000000000..aec737bbe --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.util.TimeZone + +/** + * Following tests are executed in separate JVM + */ +class DateTimeTypePSTTest extends DateTimeTypeTest(TimeZone.getTimeZone("PST")) { +} + +class DateTypePSTTest extends DateTypeTest(TimeZone.getTimeZone("PST")) { +} + +class SqlDateTypePSTTest extends SqlDateTypeTest(TimeZone.getTimeZone("PST")) { +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala deleted file mode 100644 index 5d8c72480..000000000 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.datastax.spark.connector.rdd.typeTests - -import java.sql.Date - -import com.datastax.driver.core.Row -import com.datastax.driver.core.LocalDate - -class DateTypeTest extends AbstractTypeTest[Date, LocalDate]{ - override protected val typeName: String = "date" - - override protected val typeData: Seq[Date] = Seq("2015-05-01", "2015-05-10", "2015-05-20","1950-03-05") - override protected val addData: Seq[Date] = Seq("2011-05-01", "2011-05-10", "2011-05-20","1950-01-01") - - override def getDriverColumn(row: Row, colName: String): Date = { - val ld = row.getDate(colName) - Date.valueOf(ld.toString) - } - - implicit def strToDate(str: String) : Date = java.sql.Date.valueOf(str) - - val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r - - override def convertToDriverInsertable(testValue: Date): LocalDate = { - testValue.toString match { case dateRegx(year, month, day) => - LocalDate.fromYearMonthDay(year.toInt, month.toInt, day.toInt) - } - } - - -} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala new file mode 100644 index 000000000..97fcc4750 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala @@ -0,0 +1,84 @@ +package com.datastax.spark.connector.sql + +import java.sql.Date +import java.util.TimeZone + +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} +import org.joda.time.DateTimeZone +import org.scalatest.FlatSpec + +import com.datastax.driver.core.LocalDate +import com.datastax.spark.connector.SparkCassandraITSpecBase +import com.datastax.spark.connector.cql.CassandraConnector + +trait CassandraDataFrameDateBehaviors extends SparkCassandraITSpecBase { + this: FlatSpec => + + useCassandraConfig(Seq("cassandra-default.yaml.template")) + useSparkConf(defaultConf) + + val conn = CassandraConnector(defaultConf) + val sqlContext: SQLContext = new SQLContext(sc) + + def dataFrame(timeZone: TimeZone): Unit = { + + TimeZone.setDefault(timeZone) + DateTimeZone.setDefault(DateTimeZone.forTimeZone(timeZone)) + + val readTable = s"date_test_${timeZone.getID.toLowerCase}_read" + val writeTable = s"date_test_${timeZone.getID.toLowerCase}_write" + + conn.withSessionDo { session => + createKeyspace(session) + session.execute(s"create table $ks.$readTable (key int primary key, dd date)") + session.execute(s"insert into $ks.$readTable (key, dd) values (1, '1930-05-31')") + session.execute(s"create table $ks.$writeTable (key int primary key, d0 date)") + } + + it should s"read C* LocalDate columns in ${timeZone.getID} timezone" in { + val df = sqlContext + .read + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> readTable, "keyspace" -> ks, "cluster" -> "ClusterOne")) + .load + + df.count should be(1) + + val foundDate = df.first.getDate(1) + val foundLocalDate = foundDate.toLocalDate + val foundTuple = (foundLocalDate.getYear, foundLocalDate.getMonthValue, foundLocalDate.getDayOfMonth) + + val expectedTuple = (1930, 5, 31) + + foundTuple should be(expectedTuple) + } + + it should s"write java.sql.date to C* date columns in ${timeZone.getID} timezone" in { + val schema = StructType(Seq( + StructField("key", DataTypes.IntegerType), + StructField("d0", DataTypes.DateType) + )) + + val rows = sc.parallelize(Seq( + Row(0, Date.valueOf("1986-01-02")), + Row(1, Date.valueOf("1987-01-02")) + )) + + val dataFrame = sqlContext.createDataFrame(rows, schema) + + dataFrame.write + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> writeTable, "keyspace" -> ks, "cluster" -> "ClusterOne")) + .save + + conn.withSessionDo { session => + val count = session.execute(s"select count(1) from $ks.$writeTable").one().getLong(0) + count should be(2) + + val date = session.execute(s"select d0 from $ks.$writeTable where key = 0").one().getDate(0) + date should be(LocalDate.fromYearMonthDay(1986, 1, 2)) + } + } + } +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala new file mode 100644 index 000000000..fb5d01e59 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.sql + +import java.util.TimeZone + +import org.scalatest.FlatSpec + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +class CassandraDataFrameDateCETSpec extends FlatSpec with CassandraDataFrameDateBehaviors { + + val centralEuropeanTimeZone = TimeZone.getTimeZone("CET") + + "A DataFrame in CET timezone" should behave like dataFrame(centralEuropeanTimeZone) +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala new file mode 100644 index 000000000..e8a0c0793 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.sql + +import java.util.TimeZone + +import org.scalatest.FlatSpec + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +class CassandraDataFrameDatePSTSpec extends FlatSpec with CassandraDataFrameDateBehaviors { + + val pacificTimeZone = TimeZone.getTimeZone("PST") + + "A DataFrame in PST timezone" should behave like dataFrame(pacificTimeZone) +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala index 97a6e4301..2d545f483 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala @@ -4,12 +4,10 @@ import java.io.IOException import scala.collection.JavaConversions._ import scala.concurrent.Future - import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.sql.SQLContext -import org.joda.time.LocalDate class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { useCassandraConfig(Seq("cassandra-default.yaml.template")) @@ -56,12 +54,6 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { session.execute(s"CREATE TABLE $ks.tuple_test1 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"CREATE TABLE $ks.tuple_test2 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"INSERT INTO $ks.tuple_test1 (id, t) VALUES (1, ('xyz', 3))") - }, - - Future { - session.execute(s"create table $ks.date_test (key int primary key, dd date)") - session.execute(s"create table $ks.date_test2 (key int primary key, dd date)") - session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')") } ) } @@ -200,26 +192,4 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { session.execute(s"select count(1) from $ks.tuple_test2").one().getLong(0) should be (1) } } - - it should "read and write C* LocalDate columns" in { - val df = sqlContext - .read - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "date_test", "keyspace" -> ks, "cluster" -> "ClusterOne")) - .load - - df.count should be (1) - df.first.getDate(1) should be (new LocalDate(1930, 5, 31).toDate) - - df.write - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "date_test2", "keyspace" -> ks, "cluster" -> "ClusterOne")) - .save - - conn.withSessionDo { session => - session.execute(s"select count(1) from $ks.date_test2").one().getLong(0) should be (1) - } - } - - } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala index 3f38e0db3..94ceaac58 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala @@ -8,6 +8,8 @@ import com.datastax.driver.core.{LocalDate, Row, TypeCodec, TupleValue => Driver import com.datastax.spark.connector.types.TypeConverter.StringConverter import com.datastax.spark.connector.util.ByteBufferUtil +import org.joda.time.DateTimeZone.UTC + trait GettableData extends GettableByIndexData { def metaData: CassandraRowMetadata @@ -77,7 +79,8 @@ object GettableData { case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue) case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue) - case localDate: LocalDate => new org.joda.time.LocalDate(localDate.getMillisSinceEpoch) + case localDate: LocalDate => + new org.joda.time.LocalDate(localDate.getYear, localDate.getMonth, localDate.getDay) case other => other.asInstanceOf[AnyRef] } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 9121c91f5..0af12a82b 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -11,7 +11,7 @@ import scala.reflect.runtime.universe._ import org.apache.commons.lang3.tuple import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock -import org.joda.time.{DateTime, DateTimeZone} +import org.joda.time.{DateTime, LocalDate => JodaLocalDate} import com.datastax.driver.core.LocalDate import com.datastax.spark.connector.TupleValue @@ -78,6 +78,7 @@ object TypeConverter { implicit object AnyConverter extends TypeConverter[Any] { def targetTypeTag = AnyTypeTag + def convertPF = { case obj => obj } @@ -89,6 +90,7 @@ object TypeConverter { implicit object AnyRefConverter extends TypeConverter[AnyRef] { def targetTypeTag = AnyRefTypeTag + def convertPF = { case obj => obj.asInstanceOf[AnyRef] } @@ -100,6 +102,7 @@ object TypeConverter { implicit object BooleanConverter extends TypeConverter[Boolean] { def targetTypeTag = BooleanTypeTag + def convertPF = { case x: java.lang.Boolean => x case x: java.lang.Integer => x != 0 @@ -115,6 +118,7 @@ object TypeConverter { implicit object JavaBooleanConverter extends NullableTypeConverter[java.lang.Boolean] { def targetTypeTag = JavaBooleanTypeTag + def convertPF = BooleanConverter.convertPF.andThen(_.asInstanceOf[java.lang.Boolean]) } @@ -124,6 +128,7 @@ object TypeConverter { implicit object ByteConverter extends TypeConverter[Byte] { def targetTypeTag = ByteTypeTag + def convertPF = { case x: Number => x.byteValue case x: String => x.toByte @@ -136,6 +141,7 @@ object TypeConverter { implicit object JavaByteConverter extends NullableTypeConverter[java.lang.Byte] { def targetTypeTag = JavaByteTypeTag + def convertPF = ByteConverter.convertPF.andThen(_.asInstanceOf[java.lang.Byte]) } @@ -145,6 +151,7 @@ object TypeConverter { implicit object ShortConverter extends TypeConverter[Short] { def targetTypeTag = ShortTypeTag + def convertPF = { case x: Number => x.shortValue case x: String => x.toShort @@ -157,6 +164,7 @@ object TypeConverter { implicit object JavaShortConverter extends NullableTypeConverter[java.lang.Short] { def targetTypeTag = JavaShortTypeTag + def convertPF = ShortConverter.convertPF.andThen(_.asInstanceOf[java.lang.Short]) } @@ -166,6 +174,7 @@ object TypeConverter { implicit object IntConverter extends TypeConverter[Int] { def targetTypeTag = IntTypeTag + def convertPF = { case x: Number => x.intValue case x: String => x.toInt @@ -178,6 +187,7 @@ object TypeConverter { implicit object JavaIntConverter extends NullableTypeConverter[java.lang.Integer] { def targetTypeTag = JavaIntTypeTag + def convertPF = IntConverter.convertPF.andThen(_.asInstanceOf[java.lang.Integer]) } @@ -187,6 +197,7 @@ object TypeConverter { implicit object LongConverter extends TypeConverter[Long] { def targetTypeTag = LongTypeTag + def convertPF = { case x: Number => x.longValue case x: Date => x.getTime @@ -202,6 +213,7 @@ object TypeConverter { implicit object JavaLongConverter extends NullableTypeConverter[java.lang.Long] { def targetTypeTag = JavaLongTypeTag + def convertPF = LongConverter.convertPF.andThen(_.asInstanceOf[java.lang.Long]) } @@ -211,6 +223,7 @@ object TypeConverter { implicit object FloatConverter extends TypeConverter[Float] { def targetTypeTag = FloatTypeTag + def convertPF = { case x: Number => x.floatValue case x: String => x.toFloat @@ -223,6 +236,7 @@ object TypeConverter { implicit object JavaFloatConverter extends NullableTypeConverter[java.lang.Float] { def targetTypeTag = JavaFloatTypeTag + def convertPF = FloatConverter.convertPF.andThen(_.asInstanceOf[java.lang.Float]) } @@ -232,6 +246,7 @@ object TypeConverter { implicit object DoubleConverter extends TypeConverter[Double] { def targetTypeTag = DoubleTypeTag + def convertPF = { case x: Number => x.doubleValue case x: String => x.toDouble @@ -244,6 +259,7 @@ object TypeConverter { implicit object JavaDoubleConverter extends NullableTypeConverter[java.lang.Double] { def targetTypeTag = JavaDoubleTypeTag + def convertPF = DoubleConverter.convertPF.andThen(_.asInstanceOf[java.lang.Double]) } @@ -253,13 +269,14 @@ object TypeConverter { implicit object StringConverter extends NullableTypeConverter[String] { def targetTypeTag = StringTypeTag + def convertPF = { case x: Date => TimestampFormatter.format(x) case x: Array[Byte] => "0x" + x.map("%02x" format _).mkString case x: Map[_, _] => x.map(kv => convert(kv._1) + ": " + convert(kv._2)).mkString("{", ",", "}") case x: Set[_] => x.map(convert).mkString("{", ",", "}") case x: Seq[_] => x.map(convert).mkString("[", ",", "]") - case x: Any => x.toString + case x: Any => x.toString } } @@ -269,6 +286,7 @@ object TypeConverter { implicit object ByteBufferConverter extends NullableTypeConverter[ByteBuffer] { def targetTypeTag = ByteBufferTypeTag + def convertPF = { case x: ByteBuffer => x case x: Array[Byte] => ByteBuffer.wrap(x) @@ -281,6 +299,7 @@ object TypeConverter { implicit object ByteArrayConverter extends NullableTypeConverter[Array[Byte]] { def targetTypeTag = ByteArrayTypeTag + def convertPF = { case x: Array[Byte] => x case x: ByteBuffer => ByteBufferUtil.toArray(x) @@ -293,15 +312,16 @@ object TypeConverter { implicit object DateConverter extends NullableTypeConverter[Date] { def targetTypeTag = DateTypeTag + def convertPF = { case x: Date => x case x: DateTime => x.toDate case x: Calendar => x.getTime case x: Long => new Date(x) case x: UUID if x.version() == 1 => new Date(x.timestamp()) - case x: LocalDate => new Date(x.getMillisSinceEpoch) + case x: LocalDate => DateConverter.convert(JodaLocalDateConverter.convert(x)) case x: String => TimestampParser.parse(x) - case x: org.joda.time.LocalDate => x.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate + case x: JodaLocalDate => x.toDateTimeAtStartOfDay.toDate } } @@ -310,22 +330,13 @@ object TypeConverter { } implicit object SqlDateConverter extends NullableTypeConverter[java.sql.Date] { - - /* java.sql.Date assume that the internal timestamp is offset by the local timezone. This means - a direct conversion from LocalDate to java.sql.Date will actually change the Year-Month-Day - stored. - */ - def subtractTimeZoneOffset(millis: Long) = millis - defaultTimezone.getOffset(millis) - def targetTypeTag = SqlDateTypeTag - val shiftLocalDate: PartialFunction[Any, java.sql.Date] = { - case x: LocalDate => new java.sql.Date(subtractTimeZoneOffset(x.getMillisSinceEpoch)) - case x: org.joda.time.LocalDate => shiftLocalDate(LocalDateConverter.convertPF(x)) + def convertPF = { + case x: Date => new java.sql.Date(x.getTime) + case x: LocalDate => SqlDateConverter.convert(JodaLocalDateConverter.convert(x)) + case x: JodaLocalDate => new java.sql.Date(x.toDateTimeAtStartOfDay.getMillis) } - - //If there is no Local Date input we will use the normal date converter - def convertPF = shiftLocalDate orElse DateConverter.convertPF.andThen(d => new java.sql.Date(d.getTime)) } private val JodaDateTypeTag = SparkReflectionLock.synchronized { @@ -337,6 +348,19 @@ object TypeConverter { def convertPF = DateConverter.convertPF.andThen(new DateTime(_)) } + private val JodaLocalDateTypeTag = SparkReflectionLock.synchronized { + implicitly[TypeTag[JodaLocalDate]] + } + + implicit object JodaLocalDateConverter extends NullableTypeConverter[JodaLocalDate] { + def targetTypeTag = JodaLocalDateTypeTag + def convertPF = { + case x: JodaLocalDate => x + case x: LocalDate => new JodaLocalDate(x.getYear, x.getMonth, x.getDay) + case x: java.sql.Date => JodaLocalDate.fromDateFields(x) + } + } + private val GregorianCalendarTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[GregorianCalendar]] } @@ -440,20 +464,20 @@ object TypeConverter { def targetTypeTag = LocalDateTypeTag val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r - /* java.sql.Date assume that the internal timestamp is offset by the local timezone. This means - a direct conversion from LocalDate to java.sql.Date will actually change the Year-Month-Day - stored. (See the SqlDate converter for the opposite adjustment) - */ - def addTimeZoneOffset(millis: Long) = millis + defaultTimezone.getOffset(millis) - def convertPF = { case x: LocalDate => x case dateRegx(y, m, d) => LocalDate.fromYearMonthDay(y.toInt, m.toInt, d.toInt) case x: Int => LocalDate.fromDaysSinceEpoch(x) - case x: java.sql.Date => LocalDate.fromMillisSinceEpoch(addTimeZoneOffset(x.getTime)) - case x: Date => LocalDate.fromMillisSinceEpoch(x.getTime) - case x: DateTime => LocalDate.fromMillisSinceEpoch(x.getMillis) - case x: org.joda.time.LocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + case x: JodaLocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + case x: DateTime => { + val ld = x.toLocalDate + LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + } + case x: Date => { + val a = JodaLocalDate.fromDateFields(x) + val b = LocalDateConverter.convert(a) + b + } } } @@ -834,6 +858,7 @@ object TypeConverter { DateConverter, SqlDateConverter, JodaDateConverter, + JodaLocalDateConverter, GregorianCalendarConverter, InetAddressConverter, UUIDConverter, diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index 5ea743956..546b87771 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -6,13 +6,14 @@ import java.util.{Date, UUID} import java.math.BigInteger import com.datastax.driver.core.Row -import com.datastax.driver.core.LocalDate import com.datastax.spark.connector.{CassandraRow, CassandraRowMetadata, GettableData, TupleValue, UDTValue} import com.datastax.spark.connector.rdd.reader.{RowReader, ThisRowReaderAsFactory} import com.datastax.spark.connector.types.TypeConverter + import org.apache.spark.sql.{Row => SparkRow} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql.types.Decimal +import org.joda.time.DateTimeZone.UTC final class CassandraSQLRow(val metaData: CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends GettableData with SparkRow with Serializable { @@ -48,13 +49,6 @@ final class CassandraSQLRow(val metaData: CassandraRowMetadata, val columnValues object CassandraSQLRow { - /** SparkSQL assumes all incoming Timestamps will be shifted by the - * local TimeZone, if we do not anticipate this our LocalDate objects will - * go back in time. - */ - lazy val defaultTimeZone = java.util.TimeZone.getDefault - def subtractTimeZoneOffset( millis: Long ) = millis - defaultTimeZone.getOffset(millis) - def fromJavaDriverRow(row: Row, metaData:CassandraRowMetadata): CassandraSQLRow = { val data = CassandraRow.dataFromJavaDriverRow(row, metaData) new CassandraSQLRow(metaData, data.map(toSparkSqlType)) @@ -72,8 +66,8 @@ object CassandraSQLRow { private def toSparkSqlType(value: Any): AnyRef = { value match { case date: Date => new Timestamp(date.getTime) - case localDate: LocalDate => new java.sql.Date(subtractTimeZoneOffset(localDate.getMillisSinceEpoch)) - case localDate: org.joda.time.LocalDate => new java.sql.Date(localDate.toDate.getTime) + case localDate: org.joda.time.LocalDate => + new java.sql.Date(localDate.toDateTimeAtStartOfDay().getMillis) case str: String => UTF8String.fromString(str) case bigInteger: BigInteger => Decimal(bigInteger.toString) case inetAddress: InetAddress => UTF8String.fromString(inetAddress.getHostAddress) diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala new file mode 100644 index 000000000..3b2f62467 --- /dev/null +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala @@ -0,0 +1,45 @@ +package com.datastax.spark.connector + +import scala.util.Random + +import org.joda.time.{DateTimeZone, LocalDate => JodaLocalDate} +import org.scalatest.{FlatSpec, Matchers} + +import com.datastax.driver.core.LocalDate + +class GettableDataSpec extends FlatSpec with Matchers { + + private def getLocalDates(count: Int): Seq[LocalDate] = for (t <- 1 to count) yield { + Random.setSeed(10) + val year = Random.nextInt(2000) + val month = Random.nextInt(12) + 1 + val day = Random.nextInt(28) + 1 + LocalDate.fromYearMonthDay(year, month, day) + } + + "GettableData" should "convert Driver LocalDates to Joda LocalDate" in { + val zones = Seq( + DateTimeZone.forID("America/New_York"), + DateTimeZone.forID("Europe/Warsaw"), + DateTimeZone.forID("+00:55"), + DateTimeZone.forID("-00:33"), + DateTimeZone.forID("UTC") + ) + val dates = getLocalDates(count = 10000) + + val default = DateTimeZone.getDefault + try { + for (zone <- zones; + date <- dates) { + DateTimeZone.setDefault(zone) + val jodaLocalDate = GettableData.convert(date).asInstanceOf[JodaLocalDate] + val ld = (date.getYear, date.getMonth, date.getDay) + val jd = (jodaLocalDate.getYear, jodaLocalDate.getMonthOfYear, jodaLocalDate.getDayOfMonth) + + withClue(s"LocalDate conversion failed for $zone, $date") {ld should be(jd)} + } + } finally { + DateTimeZone.setDefault(default) + } + } +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala index 3263b6d96..7fdf1a150 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala @@ -133,13 +133,12 @@ class TypeConverterTest { def testDate() { val c = TypeConverter.forType[Date] val dateStr = "2014-04-23 11:21:32+0100" - val dayOnlyStr = "2014-04-23 0:0:0+0000" - val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ") - val localDate = LocalDate.fromYearMonthDay(2014,4,23) + val dayOnlyStr = "2014-04-23" + val localDate = LocalDate.fromYearMonthDay(2014, 4, 23) val jodaLocalDate = new org.joda.time.LocalDate(2014, 4, 23) - val date = dateFormat.parse(dateStr) - val dateDayOnly = dateFormat.parse(dayOnlyStr) + val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ").parse(dateStr) + val dateDayOnly = new SimpleDateFormat("yyyy-MM-dd").parse(dayOnlyStr) assertEquals(dateDayOnly, c.convert(localDate)) assertEquals(dateDayOnly, c.convert(jodaLocalDate)) @@ -220,8 +219,8 @@ class TypeConverterTest { def testLocalDate(): Unit = { val c = TypeConverter.forType[LocalDate] val testDate = LocalDate.fromYearMonthDay(1985, 8, 3) - val dateFormat = new SimpleDateFormat("yyyy-MM-ddZ") - val date = dateFormat.parse("1985-08-03-0000") + val dateFormat = new SimpleDateFormat("yyyy-MM-dd") + val date = dateFormat.parse("1985-08-03") assertEquals(testDate, c.convert("1985-08-03")) assertEquals(testDate, c.convert(5693)) assertEquals(testDate, c.convert(date))