diff --git a/project/Settings.scala b/project/Settings.scala index b7afaadbf..4fdda3664 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -259,6 +259,8 @@ object Settings extends Build { if (test.name.toLowerCase.contains("auth")) "auth" else if (test.name.toLowerCase.contains("ssl")) "ssl" else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec" + else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec" + else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec" else test.name.reverse.dropWhile(_ != '.').reverse } @@ -274,6 +276,8 @@ object Settings extends Build { else if (pkgName.contains(".repl")) "repl" else if (pkgName.contains(".streaming")) "streaming" else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec" + else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec" + else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec" else "other" } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index d2079ec38..60caecfa7 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -1059,16 +1059,16 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { val row = sc.cassandraTable(ks, "date_test").where("key = 1").first row.getInt("key") should be(1) - row.getDate("dd") should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate) - row.getDateTime("dd").toLocalDate should be(expected) + row.getDate("dd") should be(expected.toDateTimeAtStartOfDay.toDate) + row.get[LocalDate]("dd") should be(expected) } it should "read LocalDate as tuple value with given type" in { val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda val date = sc.cassandraTable[(Int, Date)](ks, "date_test").where("key = 1").first._2 - val dateTime = sc.cassandraTable[(Int, DateTime)](ks, "date_test").where("key = 1").first._2 + val localDate = sc.cassandraTable[(Int, LocalDate)](ks, "date_test").where("key = 1").first._2 - date should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate) - dateTime.toLocalDate should be(expected) + date should be(expected.toDateTimeAtStartOfDay.toDate) + localDate should be(expected) } } diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala new file mode 100644 index 000000000..baee1fb38 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractDateTypeTest.scala @@ -0,0 +1,85 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.sql.{Date => SqlDate} +import java.text.SimpleDateFormat +import java.util.{TimeZone, Date => UtilDate} + +import scala.reflect.ClassTag + +import org.joda.time.{DateTime, DateTimeZone, LocalDate => JodaLocalDate} + +import com.datastax.driver.core.{Row, LocalDate => DriverLocalDate} +import com.datastax.spark.connector.rdd.reader.RowReaderFactory +import com.datastax.spark.connector.types.TypeConverter +import com.datastax.spark.connector.writer.RowWriterFactory + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +abstract class AbstractDateTypeTest[TestType: ClassTag]( + val testTimeZone: TimeZone)( + implicit + tConverter: TypeConverter[TestType], + rowReaderNormal: RowReaderFactory[(TestType, TestType, TestType, TestType)], + rowReaderCollection: RowReaderFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])], + rowReaderNull: RowReaderFactory[(TestType, TestType, Option[TestType], Set[TestType], Map[TestType, TestType], Seq[TestType])], + rowWriterNormal: RowWriterFactory[(TestType, TestType, TestType, TestType)], + rowWriterCollection: RowWriterFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])], + rowWriterNull: RowWriterFactory[(TestType, TestType, Null, Null, Null, Null)]) + extends AbstractTypeTest[TestType, DriverLocalDate] { + + TimeZone.setDefault(testTimeZone) + DateTimeZone.setDefault(DateTimeZone.forTimeZone(testTimeZone)) + + private val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d).*""".r + + protected def stringToDate(str: String): TestType + + protected def dateToString(date: TestType): String + + override protected val typeName: String = "date" + + override protected lazy val typeData: Seq[TestType] = Seq( + stringToDate("2015-05-01"), + stringToDate("2015-05-10"), + stringToDate("2015-05-20"), + stringToDate("1950-03-05")) + + override protected lazy val addData: Seq[TestType] = Seq( + stringToDate("2011-05-01"), + stringToDate("2011-05-10"), + stringToDate("2011-05-20"), + stringToDate("1950-01-01")) + + override def convertToDriverInsertable(testValue: TestType): DriverLocalDate = { + dateToString(testValue) match { + case dateRegx(year, month, day) => + DriverLocalDate.fromYearMonthDay(year.toInt, month.toInt, day.toInt) + } + } + + override def getDriverColumn(row: Row, colName: String): TestType = { + val ld = row.getDate(colName) + stringToDate(ld.toString) + } +} + +abstract class DateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[UtilDate](timeZone) { + private val format = new SimpleDateFormat("yyyy-MM-dd") + + override def stringToDate(str: String): UtilDate = format.parse(str) + + override def dateToString(date: UtilDate): String = format.format(date) +} + +abstract class DateTimeTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[DateTime](timeZone) { + override def stringToDate(str: String): DateTime = JodaLocalDate.parse(str).toDateTimeAtStartOfDay + + override def dateToString(date: DateTime): String = date.toLocalDate.toString +} + +abstract class SqlDateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[SqlDate](timeZone) { + override def stringToDate(str: String): SqlDate = SqlDate.valueOf(str) + + override def dateToString(date: SqlDate): String = date.toString +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala index 1e60dfda1..564c01f2e 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/AbstractTypeTest.scala @@ -15,7 +15,7 @@ import com.datastax.spark.connector.writer.RowWriterFactory /** * A template class for testing that various CQL Types work with the spark Cassandra Connector - * When creating an iimplementationof this test you must provide two types, + * When creating an implementation of this test you must provide two types, * The TestType is used to extract values from Cassandra ie: sc.cassandraTable[TestType] * and saving values in cassandra via sc.parallelize(x:TesType).saveToCassandra() * The DriverType is used for inserting values into C* via the javaDriver diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala new file mode 100644 index 000000000..7344d8fe1 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeCETTest.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.util.TimeZone + +/** + * Following tests are executed in separate JVM + */ +class DateTimeTypeCETTest extends DateTimeTypeTest(TimeZone.getTimeZone("CET")) { +} + +class DateTypeCETTest extends DateTypeTest(TimeZone.getTimeZone("CET")) { +} + +class SqlDateTypeCETTest extends SqlDateTypeTest(TimeZone.getTimeZone("CET")) { +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala new file mode 100644 index 000000000..aec737bbe --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypePSTTest.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.rdd.typeTests + +import java.util.TimeZone + +/** + * Following tests are executed in separate JVM + */ +class DateTimeTypePSTTest extends DateTimeTypeTest(TimeZone.getTimeZone("PST")) { +} + +class DateTypePSTTest extends DateTypeTest(TimeZone.getTimeZone("PST")) { +} + +class SqlDateTypePSTTest extends SqlDateTypeTest(TimeZone.getTimeZone("PST")) { +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala deleted file mode 100644 index 5d8c72480..000000000 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/typeTests/DateTypeTest.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.datastax.spark.connector.rdd.typeTests - -import java.sql.Date - -import com.datastax.driver.core.Row -import com.datastax.driver.core.LocalDate - -class DateTypeTest extends AbstractTypeTest[Date, LocalDate]{ - override protected val typeName: String = "date" - - override protected val typeData: Seq[Date] = Seq("2015-05-01", "2015-05-10", "2015-05-20","1950-03-05") - override protected val addData: Seq[Date] = Seq("2011-05-01", "2011-05-10", "2011-05-20","1950-01-01") - - override def getDriverColumn(row: Row, colName: String): Date = { - val ld = row.getDate(colName) - Date.valueOf(ld.toString) - } - - implicit def strToDate(str: String) : Date = java.sql.Date.valueOf(str) - - val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r - - override def convertToDriverInsertable(testValue: Date): LocalDate = { - testValue.toString match { case dateRegx(year, month, day) => - LocalDate.fromYearMonthDay(year.toInt, month.toInt, day.toInt) - } - } - - -} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala new file mode 100644 index 000000000..97fcc4750 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala @@ -0,0 +1,84 @@ +package com.datastax.spark.connector.sql + +import java.sql.Date +import java.util.TimeZone + +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} +import org.joda.time.DateTimeZone +import org.scalatest.FlatSpec + +import com.datastax.driver.core.LocalDate +import com.datastax.spark.connector.SparkCassandraITSpecBase +import com.datastax.spark.connector.cql.CassandraConnector + +trait CassandraDataFrameDateBehaviors extends SparkCassandraITSpecBase { + this: FlatSpec => + + useCassandraConfig(Seq("cassandra-default.yaml.template")) + useSparkConf(defaultConf) + + val conn = CassandraConnector(defaultConf) + val sqlContext: SQLContext = new SQLContext(sc) + + def dataFrame(timeZone: TimeZone): Unit = { + + TimeZone.setDefault(timeZone) + DateTimeZone.setDefault(DateTimeZone.forTimeZone(timeZone)) + + val readTable = s"date_test_${timeZone.getID.toLowerCase}_read" + val writeTable = s"date_test_${timeZone.getID.toLowerCase}_write" + + conn.withSessionDo { session => + createKeyspace(session) + session.execute(s"create table $ks.$readTable (key int primary key, dd date)") + session.execute(s"insert into $ks.$readTable (key, dd) values (1, '1930-05-31')") + session.execute(s"create table $ks.$writeTable (key int primary key, d0 date)") + } + + it should s"read C* LocalDate columns in ${timeZone.getID} timezone" in { + val df = sqlContext + .read + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> readTable, "keyspace" -> ks, "cluster" -> "ClusterOne")) + .load + + df.count should be(1) + + val foundDate = df.first.getDate(1) + val foundLocalDate = foundDate.toLocalDate + val foundTuple = (foundLocalDate.getYear, foundLocalDate.getMonthValue, foundLocalDate.getDayOfMonth) + + val expectedTuple = (1930, 5, 31) + + foundTuple should be(expectedTuple) + } + + it should s"write java.sql.date to C* date columns in ${timeZone.getID} timezone" in { + val schema = StructType(Seq( + StructField("key", DataTypes.IntegerType), + StructField("d0", DataTypes.DateType) + )) + + val rows = sc.parallelize(Seq( + Row(0, Date.valueOf("1986-01-02")), + Row(1, Date.valueOf("1987-01-02")) + )) + + val dataFrame = sqlContext.createDataFrame(rows, schema) + + dataFrame.write + .format("org.apache.spark.sql.cassandra") + .options(Map("table" -> writeTable, "keyspace" -> ks, "cluster" -> "ClusterOne")) + .save + + conn.withSessionDo { session => + val count = session.execute(s"select count(1) from $ks.$writeTable").one().getLong(0) + count should be(2) + + val date = session.execute(s"select d0 from $ks.$writeTable where key = 0").one().getDate(0) + date should be(LocalDate.fromYearMonthDay(1986, 1, 2)) + } + } + } +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala new file mode 100644 index 000000000..fb5d01e59 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateCETSpec.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.sql + +import java.util.TimeZone + +import org.scalatest.FlatSpec + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +class CassandraDataFrameDateCETSpec extends FlatSpec with CassandraDataFrameDateBehaviors { + + val centralEuropeanTimeZone = TimeZone.getTimeZone("CET") + + "A DataFrame in CET timezone" should behave like dataFrame(centralEuropeanTimeZone) +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala new file mode 100644 index 000000000..e8a0c0793 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDatePSTSpec.scala @@ -0,0 +1,15 @@ +package com.datastax.spark.connector.sql + +import java.util.TimeZone + +import org.scalatest.FlatSpec + +/** + * This should be executed in separate JVM, as Catalyst caches default time zone + */ +class CassandraDataFrameDatePSTSpec extends FlatSpec with CassandraDataFrameDateBehaviors { + + val pacificTimeZone = TimeZone.getTimeZone("PST") + + "A DataFrame in PST timezone" should behave like dataFrame(pacificTimeZone) +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala index 97a6e4301..2d545f483 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala @@ -4,12 +4,10 @@ import java.io.IOException import scala.collection.JavaConversions._ import scala.concurrent.Future - import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.sql.SQLContext -import org.joda.time.LocalDate class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { useCassandraConfig(Seq("cassandra-default.yaml.template")) @@ -56,12 +54,6 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { session.execute(s"CREATE TABLE $ks.tuple_test1 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"CREATE TABLE $ks.tuple_test2 (id int, t Tuple, PRIMARY KEY (id))") session.execute(s"INSERT INTO $ks.tuple_test1 (id, t) VALUES (1, ('xyz', 3))") - }, - - Future { - session.execute(s"create table $ks.date_test (key int primary key, dd date)") - session.execute(s"create table $ks.date_test2 (key int primary key, dd date)") - session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')") } ) } @@ -200,26 +192,4 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase { session.execute(s"select count(1) from $ks.tuple_test2").one().getLong(0) should be (1) } } - - it should "read and write C* LocalDate columns" in { - val df = sqlContext - .read - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "date_test", "keyspace" -> ks, "cluster" -> "ClusterOne")) - .load - - df.count should be (1) - df.first.getDate(1) should be (new LocalDate(1930, 5, 31).toDate) - - df.write - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "date_test2", "keyspace" -> ks, "cluster" -> "ClusterOne")) - .save - - conn.withSessionDo { session => - session.execute(s"select count(1) from $ks.date_test2").one().getLong(0) should be (1) - } - } - - } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala index 3f38e0db3..94ceaac58 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala @@ -8,6 +8,8 @@ import com.datastax.driver.core.{LocalDate, Row, TypeCodec, TupleValue => Driver import com.datastax.spark.connector.types.TypeConverter.StringConverter import com.datastax.spark.connector.util.ByteBufferUtil +import org.joda.time.DateTimeZone.UTC + trait GettableData extends GettableByIndexData { def metaData: CassandraRowMetadata @@ -77,7 +79,8 @@ object GettableData { case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue) case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue) - case localDate: LocalDate => new org.joda.time.LocalDate(localDate.getMillisSinceEpoch) + case localDate: LocalDate => + new org.joda.time.LocalDate(localDate.getYear, localDate.getMonth, localDate.getDay) case other => other.asInstanceOf[AnyRef] } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 9121c91f5..0af12a82b 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -11,7 +11,7 @@ import scala.reflect.runtime.universe._ import org.apache.commons.lang3.tuple import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock -import org.joda.time.{DateTime, DateTimeZone} +import org.joda.time.{DateTime, LocalDate => JodaLocalDate} import com.datastax.driver.core.LocalDate import com.datastax.spark.connector.TupleValue @@ -78,6 +78,7 @@ object TypeConverter { implicit object AnyConverter extends TypeConverter[Any] { def targetTypeTag = AnyTypeTag + def convertPF = { case obj => obj } @@ -89,6 +90,7 @@ object TypeConverter { implicit object AnyRefConverter extends TypeConverter[AnyRef] { def targetTypeTag = AnyRefTypeTag + def convertPF = { case obj => obj.asInstanceOf[AnyRef] } @@ -100,6 +102,7 @@ object TypeConverter { implicit object BooleanConverter extends TypeConverter[Boolean] { def targetTypeTag = BooleanTypeTag + def convertPF = { case x: java.lang.Boolean => x case x: java.lang.Integer => x != 0 @@ -115,6 +118,7 @@ object TypeConverter { implicit object JavaBooleanConverter extends NullableTypeConverter[java.lang.Boolean] { def targetTypeTag = JavaBooleanTypeTag + def convertPF = BooleanConverter.convertPF.andThen(_.asInstanceOf[java.lang.Boolean]) } @@ -124,6 +128,7 @@ object TypeConverter { implicit object ByteConverter extends TypeConverter[Byte] { def targetTypeTag = ByteTypeTag + def convertPF = { case x: Number => x.byteValue case x: String => x.toByte @@ -136,6 +141,7 @@ object TypeConverter { implicit object JavaByteConverter extends NullableTypeConverter[java.lang.Byte] { def targetTypeTag = JavaByteTypeTag + def convertPF = ByteConverter.convertPF.andThen(_.asInstanceOf[java.lang.Byte]) } @@ -145,6 +151,7 @@ object TypeConverter { implicit object ShortConverter extends TypeConverter[Short] { def targetTypeTag = ShortTypeTag + def convertPF = { case x: Number => x.shortValue case x: String => x.toShort @@ -157,6 +164,7 @@ object TypeConverter { implicit object JavaShortConverter extends NullableTypeConverter[java.lang.Short] { def targetTypeTag = JavaShortTypeTag + def convertPF = ShortConverter.convertPF.andThen(_.asInstanceOf[java.lang.Short]) } @@ -166,6 +174,7 @@ object TypeConverter { implicit object IntConverter extends TypeConverter[Int] { def targetTypeTag = IntTypeTag + def convertPF = { case x: Number => x.intValue case x: String => x.toInt @@ -178,6 +187,7 @@ object TypeConverter { implicit object JavaIntConverter extends NullableTypeConverter[java.lang.Integer] { def targetTypeTag = JavaIntTypeTag + def convertPF = IntConverter.convertPF.andThen(_.asInstanceOf[java.lang.Integer]) } @@ -187,6 +197,7 @@ object TypeConverter { implicit object LongConverter extends TypeConverter[Long] { def targetTypeTag = LongTypeTag + def convertPF = { case x: Number => x.longValue case x: Date => x.getTime @@ -202,6 +213,7 @@ object TypeConverter { implicit object JavaLongConverter extends NullableTypeConverter[java.lang.Long] { def targetTypeTag = JavaLongTypeTag + def convertPF = LongConverter.convertPF.andThen(_.asInstanceOf[java.lang.Long]) } @@ -211,6 +223,7 @@ object TypeConverter { implicit object FloatConverter extends TypeConverter[Float] { def targetTypeTag = FloatTypeTag + def convertPF = { case x: Number => x.floatValue case x: String => x.toFloat @@ -223,6 +236,7 @@ object TypeConverter { implicit object JavaFloatConverter extends NullableTypeConverter[java.lang.Float] { def targetTypeTag = JavaFloatTypeTag + def convertPF = FloatConverter.convertPF.andThen(_.asInstanceOf[java.lang.Float]) } @@ -232,6 +246,7 @@ object TypeConverter { implicit object DoubleConverter extends TypeConverter[Double] { def targetTypeTag = DoubleTypeTag + def convertPF = { case x: Number => x.doubleValue case x: String => x.toDouble @@ -244,6 +259,7 @@ object TypeConverter { implicit object JavaDoubleConverter extends NullableTypeConverter[java.lang.Double] { def targetTypeTag = JavaDoubleTypeTag + def convertPF = DoubleConverter.convertPF.andThen(_.asInstanceOf[java.lang.Double]) } @@ -253,13 +269,14 @@ object TypeConverter { implicit object StringConverter extends NullableTypeConverter[String] { def targetTypeTag = StringTypeTag + def convertPF = { case x: Date => TimestampFormatter.format(x) case x: Array[Byte] => "0x" + x.map("%02x" format _).mkString case x: Map[_, _] => x.map(kv => convert(kv._1) + ": " + convert(kv._2)).mkString("{", ",", "}") case x: Set[_] => x.map(convert).mkString("{", ",", "}") case x: Seq[_] => x.map(convert).mkString("[", ",", "]") - case x: Any => x.toString + case x: Any => x.toString } } @@ -269,6 +286,7 @@ object TypeConverter { implicit object ByteBufferConverter extends NullableTypeConverter[ByteBuffer] { def targetTypeTag = ByteBufferTypeTag + def convertPF = { case x: ByteBuffer => x case x: Array[Byte] => ByteBuffer.wrap(x) @@ -281,6 +299,7 @@ object TypeConverter { implicit object ByteArrayConverter extends NullableTypeConverter[Array[Byte]] { def targetTypeTag = ByteArrayTypeTag + def convertPF = { case x: Array[Byte] => x case x: ByteBuffer => ByteBufferUtil.toArray(x) @@ -293,15 +312,16 @@ object TypeConverter { implicit object DateConverter extends NullableTypeConverter[Date] { def targetTypeTag = DateTypeTag + def convertPF = { case x: Date => x case x: DateTime => x.toDate case x: Calendar => x.getTime case x: Long => new Date(x) case x: UUID if x.version() == 1 => new Date(x.timestamp()) - case x: LocalDate => new Date(x.getMillisSinceEpoch) + case x: LocalDate => DateConverter.convert(JodaLocalDateConverter.convert(x)) case x: String => TimestampParser.parse(x) - case x: org.joda.time.LocalDate => x.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate + case x: JodaLocalDate => x.toDateTimeAtStartOfDay.toDate } } @@ -310,22 +330,13 @@ object TypeConverter { } implicit object SqlDateConverter extends NullableTypeConverter[java.sql.Date] { - - /* java.sql.Date assume that the internal timestamp is offset by the local timezone. This means - a direct conversion from LocalDate to java.sql.Date will actually change the Year-Month-Day - stored. - */ - def subtractTimeZoneOffset(millis: Long) = millis - defaultTimezone.getOffset(millis) - def targetTypeTag = SqlDateTypeTag - val shiftLocalDate: PartialFunction[Any, java.sql.Date] = { - case x: LocalDate => new java.sql.Date(subtractTimeZoneOffset(x.getMillisSinceEpoch)) - case x: org.joda.time.LocalDate => shiftLocalDate(LocalDateConverter.convertPF(x)) + def convertPF = { + case x: Date => new java.sql.Date(x.getTime) + case x: LocalDate => SqlDateConverter.convert(JodaLocalDateConverter.convert(x)) + case x: JodaLocalDate => new java.sql.Date(x.toDateTimeAtStartOfDay.getMillis) } - - //If there is no Local Date input we will use the normal date converter - def convertPF = shiftLocalDate orElse DateConverter.convertPF.andThen(d => new java.sql.Date(d.getTime)) } private val JodaDateTypeTag = SparkReflectionLock.synchronized { @@ -337,6 +348,19 @@ object TypeConverter { def convertPF = DateConverter.convertPF.andThen(new DateTime(_)) } + private val JodaLocalDateTypeTag = SparkReflectionLock.synchronized { + implicitly[TypeTag[JodaLocalDate]] + } + + implicit object JodaLocalDateConverter extends NullableTypeConverter[JodaLocalDate] { + def targetTypeTag = JodaLocalDateTypeTag + def convertPF = { + case x: JodaLocalDate => x + case x: LocalDate => new JodaLocalDate(x.getYear, x.getMonth, x.getDay) + case x: java.sql.Date => JodaLocalDate.fromDateFields(x) + } + } + private val GregorianCalendarTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[GregorianCalendar]] } @@ -440,20 +464,20 @@ object TypeConverter { def targetTypeTag = LocalDateTypeTag val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r - /* java.sql.Date assume that the internal timestamp is offset by the local timezone. This means - a direct conversion from LocalDate to java.sql.Date will actually change the Year-Month-Day - stored. (See the SqlDate converter for the opposite adjustment) - */ - def addTimeZoneOffset(millis: Long) = millis + defaultTimezone.getOffset(millis) - def convertPF = { case x: LocalDate => x case dateRegx(y, m, d) => LocalDate.fromYearMonthDay(y.toInt, m.toInt, d.toInt) case x: Int => LocalDate.fromDaysSinceEpoch(x) - case x: java.sql.Date => LocalDate.fromMillisSinceEpoch(addTimeZoneOffset(x.getTime)) - case x: Date => LocalDate.fromMillisSinceEpoch(x.getTime) - case x: DateTime => LocalDate.fromMillisSinceEpoch(x.getMillis) - case x: org.joda.time.LocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + case x: JodaLocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + case x: DateTime => { + val ld = x.toLocalDate + LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) + } + case x: Date => { + val a = JodaLocalDate.fromDateFields(x) + val b = LocalDateConverter.convert(a) + b + } } } @@ -834,6 +858,7 @@ object TypeConverter { DateConverter, SqlDateConverter, JodaDateConverter, + JodaLocalDateConverter, GregorianCalendarConverter, InetAddressConverter, UUIDConverter, diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index 5ea743956..546b87771 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -6,13 +6,14 @@ import java.util.{Date, UUID} import java.math.BigInteger import com.datastax.driver.core.Row -import com.datastax.driver.core.LocalDate import com.datastax.spark.connector.{CassandraRow, CassandraRowMetadata, GettableData, TupleValue, UDTValue} import com.datastax.spark.connector.rdd.reader.{RowReader, ThisRowReaderAsFactory} import com.datastax.spark.connector.types.TypeConverter + import org.apache.spark.sql.{Row => SparkRow} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql.types.Decimal +import org.joda.time.DateTimeZone.UTC final class CassandraSQLRow(val metaData: CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends GettableData with SparkRow with Serializable { @@ -48,13 +49,6 @@ final class CassandraSQLRow(val metaData: CassandraRowMetadata, val columnValues object CassandraSQLRow { - /** SparkSQL assumes all incoming Timestamps will be shifted by the - * local TimeZone, if we do not anticipate this our LocalDate objects will - * go back in time. - */ - lazy val defaultTimeZone = java.util.TimeZone.getDefault - def subtractTimeZoneOffset( millis: Long ) = millis - defaultTimeZone.getOffset(millis) - def fromJavaDriverRow(row: Row, metaData:CassandraRowMetadata): CassandraSQLRow = { val data = CassandraRow.dataFromJavaDriverRow(row, metaData) new CassandraSQLRow(metaData, data.map(toSparkSqlType)) @@ -72,8 +66,8 @@ object CassandraSQLRow { private def toSparkSqlType(value: Any): AnyRef = { value match { case date: Date => new Timestamp(date.getTime) - case localDate: LocalDate => new java.sql.Date(subtractTimeZoneOffset(localDate.getMillisSinceEpoch)) - case localDate: org.joda.time.LocalDate => new java.sql.Date(localDate.toDate.getTime) + case localDate: org.joda.time.LocalDate => + new java.sql.Date(localDate.toDateTimeAtStartOfDay().getMillis) case str: String => UTF8String.fromString(str) case bigInteger: BigInteger => Decimal(bigInteger.toString) case inetAddress: InetAddress => UTF8String.fromString(inetAddress.getHostAddress) diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala new file mode 100644 index 000000000..3b2f62467 --- /dev/null +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/GettableDataSpec.scala @@ -0,0 +1,45 @@ +package com.datastax.spark.connector + +import scala.util.Random + +import org.joda.time.{DateTimeZone, LocalDate => JodaLocalDate} +import org.scalatest.{FlatSpec, Matchers} + +import com.datastax.driver.core.LocalDate + +class GettableDataSpec extends FlatSpec with Matchers { + + private def getLocalDates(count: Int): Seq[LocalDate] = for (t <- 1 to count) yield { + Random.setSeed(10) + val year = Random.nextInt(2000) + val month = Random.nextInt(12) + 1 + val day = Random.nextInt(28) + 1 + LocalDate.fromYearMonthDay(year, month, day) + } + + "GettableData" should "convert Driver LocalDates to Joda LocalDate" in { + val zones = Seq( + DateTimeZone.forID("America/New_York"), + DateTimeZone.forID("Europe/Warsaw"), + DateTimeZone.forID("+00:55"), + DateTimeZone.forID("-00:33"), + DateTimeZone.forID("UTC") + ) + val dates = getLocalDates(count = 10000) + + val default = DateTimeZone.getDefault + try { + for (zone <- zones; + date <- dates) { + DateTimeZone.setDefault(zone) + val jodaLocalDate = GettableData.convert(date).asInstanceOf[JodaLocalDate] + val ld = (date.getYear, date.getMonth, date.getDay) + val jd = (jodaLocalDate.getYear, jodaLocalDate.getMonthOfYear, jodaLocalDate.getDayOfMonth) + + withClue(s"LocalDate conversion failed for $zone, $date") {ld should be(jd)} + } + } finally { + DateTimeZone.setDefault(default) + } + } +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala index 3263b6d96..7fdf1a150 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala @@ -133,13 +133,12 @@ class TypeConverterTest { def testDate() { val c = TypeConverter.forType[Date] val dateStr = "2014-04-23 11:21:32+0100" - val dayOnlyStr = "2014-04-23 0:0:0+0000" - val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ") - val localDate = LocalDate.fromYearMonthDay(2014,4,23) + val dayOnlyStr = "2014-04-23" + val localDate = LocalDate.fromYearMonthDay(2014, 4, 23) val jodaLocalDate = new org.joda.time.LocalDate(2014, 4, 23) - val date = dateFormat.parse(dateStr) - val dateDayOnly = dateFormat.parse(dayOnlyStr) + val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ").parse(dateStr) + val dateDayOnly = new SimpleDateFormat("yyyy-MM-dd").parse(dayOnlyStr) assertEquals(dateDayOnly, c.convert(localDate)) assertEquals(dateDayOnly, c.convert(jodaLocalDate)) @@ -220,8 +219,8 @@ class TypeConverterTest { def testLocalDate(): Unit = { val c = TypeConverter.forType[LocalDate] val testDate = LocalDate.fromYearMonthDay(1985, 8, 3) - val dateFormat = new SimpleDateFormat("yyyy-MM-ddZ") - val date = dateFormat.parse("1985-08-03-0000") + val dateFormat = new SimpleDateFormat("yyyy-MM-dd") + val date = dateFormat.parse("1985-08-03") assertEquals(testDate, c.convert("1985-08-03")) assertEquals(testDate, c.convert(5693)) assertEquals(testDate, c.convert(date))