Skip to content

Commit

Permalink
SPARKC-391: Fix conversion of LocalDate to Joda LocalDate
Browse files Browse the repository at this point in the history
We transform LocalDate objects to JodaLocal date objects so that we'll
have a serializable AnyRef fo moving around in Spark. Previously this
was done by just using the default constructor which assumes the
timestamp provided is in the DefaultTimezone. This is incorrect at
certain times of the year in certain places since LocalDate will give
back the Timestamp in GMT.

Previously we were using some of our own home grown normalization of
timestamps for converting too and from java.sql.Date. This has led to
some errors and difficult to maintain code. In this commit we replace
all previous methods with a pass through of Joda's internal normalizing
machinery.

In addition we now run through all our Date tests in both PST, and CET
on Jenkins

This PR fixes instantiating various Date objects while reading C* table
with date type columns. When C* date column is read, it's values are
eventually converted to java.sql.Date, java.util.Date or Joda's DateTime
or others.

This PR makes returned dates consistent. Connector returns date objects
set to midnight in user's default timezone. For example, if I choose to
get java.util.Date out of C* value 2016-12-25  I would get (on my
laptop):
java.util.Date = Sun Dec 25 00:00:00 CET 2016

Because default timezone is cached by Spark, we cannot run tests against
different timezones in single JVM. This PR introduces tests for two
different timezones that are executed in two different JVMs.
  • Loading branch information
RussellSpitzer authored and jtgrabowski committed Jul 20, 2016
1 parent 873baed commit 8133089
Show file tree
Hide file tree
Showing 16 changed files with 350 additions and 111 deletions.
4 changes: 4 additions & 0 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ object Settings extends Build {
if (test.name.toLowerCase.contains("auth")) "auth"
else if (test.name.toLowerCase.contains("ssl")) "ssl"
else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec"
else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec"
else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec"
else test.name.reverse.dropWhile(_ != '.').reverse
}

Expand All @@ -274,6 +276,8 @@ object Settings extends Build {
else if (pkgName.contains(".repl")) "repl"
else if (pkgName.contains(".streaming")) "streaming"
else if (test.name.contains("CustomFromDriverSpec")) "customdriverspec"
else if (test.name.contains("CETSpec") || test.name.contains("CETTest")) "cetspec"
else if (test.name.contains("PSTSpec") || test.name.contains("PSTTest")) "pstspec"
else "other"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,16 +1059,16 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
val row = sc.cassandraTable(ks, "date_test").where("key = 1").first

row.getInt("key") should be(1)
row.getDate("dd") should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate)
row.getDateTime("dd").toLocalDate should be(expected)
row.getDate("dd") should be(expected.toDateTimeAtStartOfDay.toDate)
row.get[LocalDate]("dd") should be(expected)
}

it should "read LocalDate as tuple value with given type" in {
val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda
val date = sc.cassandraTable[(Int, Date)](ks, "date_test").where("key = 1").first._2
val dateTime = sc.cassandraTable[(Int, DateTime)](ks, "date_test").where("key = 1").first._2
val localDate = sc.cassandraTable[(Int, LocalDate)](ks, "date_test").where("key = 1").first._2

date should be(expected.toDateTimeAtStartOfDay(DateTimeZone.UTC).toDate)
dateTime.toLocalDate should be(expected)
date should be(expected.toDateTimeAtStartOfDay.toDate)
localDate should be(expected)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.datastax.spark.connector.rdd.typeTests

import java.sql.{Date => SqlDate}
import java.text.SimpleDateFormat
import java.util.{TimeZone, Date => UtilDate}

import scala.reflect.ClassTag

import org.joda.time.{DateTime, DateTimeZone, LocalDate => JodaLocalDate}

import com.datastax.driver.core.{Row, LocalDate => DriverLocalDate}
import com.datastax.spark.connector.rdd.reader.RowReaderFactory
import com.datastax.spark.connector.types.TypeConverter
import com.datastax.spark.connector.writer.RowWriterFactory

/**
* This should be executed in separate JVM, as Catalyst caches default time zone
*/
abstract class AbstractDateTypeTest[TestType: ClassTag](
val testTimeZone: TimeZone)(
implicit
tConverter: TypeConverter[TestType],
rowReaderNormal: RowReaderFactory[(TestType, TestType, TestType, TestType)],
rowReaderCollection: RowReaderFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])],
rowReaderNull: RowReaderFactory[(TestType, TestType, Option[TestType], Set[TestType], Map[TestType, TestType], Seq[TestType])],
rowWriterNormal: RowWriterFactory[(TestType, TestType, TestType, TestType)],
rowWriterCollection: RowWriterFactory[(TestType, Set[TestType], List[TestType], Map[String, TestType], Map[TestType, String])],
rowWriterNull: RowWriterFactory[(TestType, TestType, Null, Null, Null, Null)])
extends AbstractTypeTest[TestType, DriverLocalDate] {

TimeZone.setDefault(testTimeZone)
DateTimeZone.setDefault(DateTimeZone.forTimeZone(testTimeZone))

private val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d).*""".r

protected def stringToDate(str: String): TestType

protected def dateToString(date: TestType): String

override protected val typeName: String = "date"

override protected lazy val typeData: Seq[TestType] = Seq(
stringToDate("2015-05-01"),
stringToDate("2015-05-10"),
stringToDate("2015-05-20"),
stringToDate("1950-03-05"))

override protected lazy val addData: Seq[TestType] = Seq(
stringToDate("2011-05-01"),
stringToDate("2011-05-10"),
stringToDate("2011-05-20"),
stringToDate("1950-01-01"))

override def convertToDriverInsertable(testValue: TestType): DriverLocalDate = {
dateToString(testValue) match {
case dateRegx(year, month, day) =>
DriverLocalDate.fromYearMonthDay(year.toInt, month.toInt, day.toInt)
}
}

override def getDriverColumn(row: Row, colName: String): TestType = {
val ld = row.getDate(colName)
stringToDate(ld.toString)
}
}

abstract class DateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[UtilDate](timeZone) {
private val format = new SimpleDateFormat("yyyy-MM-dd")

override def stringToDate(str: String): UtilDate = format.parse(str)

override def dateToString(date: UtilDate): String = format.format(date)
}

abstract class DateTimeTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[DateTime](timeZone) {
override def stringToDate(str: String): DateTime = JodaLocalDate.parse(str).toDateTimeAtStartOfDay

override def dateToString(date: DateTime): String = date.toLocalDate.toString
}

abstract class SqlDateTypeTest(timeZone: TimeZone) extends AbstractDateTypeTest[SqlDate](timeZone) {
override def stringToDate(str: String): SqlDate = SqlDate.valueOf(str)

override def dateToString(date: SqlDate): String = date.toString
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.datastax.spark.connector.writer.RowWriterFactory

/**
* A template class for testing that various CQL Types work with the spark Cassandra Connector
* When creating an iimplementationof this test you must provide two types,
* When creating an implementation of this test you must provide two types,
* The TestType is used to extract values from Cassandra ie: sc.cassandraTable[TestType]
* and saving values in cassandra via sc.parallelize(x:TesType).saveToCassandra()
* The DriverType is used for inserting values into C* via the javaDriver
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.datastax.spark.connector.rdd.typeTests

import java.util.TimeZone

/**
* Following tests are executed in separate JVM
*/
class DateTimeTypeCETTest extends DateTimeTypeTest(TimeZone.getTimeZone("CET")) {
}

class DateTypeCETTest extends DateTypeTest(TimeZone.getTimeZone("CET")) {
}

class SqlDateTypeCETTest extends SqlDateTypeTest(TimeZone.getTimeZone("CET")) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.datastax.spark.connector.rdd.typeTests

import java.util.TimeZone

/**
* Following tests are executed in separate JVM
*/
class DateTimeTypePSTTest extends DateTimeTypeTest(TimeZone.getTimeZone("PST")) {
}

class DateTypePSTTest extends DateTypeTest(TimeZone.getTimeZone("PST")) {
}

class SqlDateTypePSTTest extends SqlDateTypeTest(TimeZone.getTimeZone("PST")) {
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.datastax.spark.connector.sql

import java.sql.Date
import java.util.TimeZone

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.joda.time.DateTimeZone
import org.scalatest.FlatSpec

import com.datastax.driver.core.LocalDate
import com.datastax.spark.connector.SparkCassandraITSpecBase
import com.datastax.spark.connector.cql.CassandraConnector

trait CassandraDataFrameDateBehaviors extends SparkCassandraITSpecBase {
this: FlatSpec =>

useCassandraConfig(Seq("cassandra-default.yaml.template"))
useSparkConf(defaultConf)

val conn = CassandraConnector(defaultConf)
val sqlContext: SQLContext = new SQLContext(sc)

def dataFrame(timeZone: TimeZone): Unit = {

TimeZone.setDefault(timeZone)
DateTimeZone.setDefault(DateTimeZone.forTimeZone(timeZone))

val readTable = s"date_test_${timeZone.getID.toLowerCase}_read"
val writeTable = s"date_test_${timeZone.getID.toLowerCase}_write"

conn.withSessionDo { session =>
createKeyspace(session)
session.execute(s"create table $ks.$readTable (key int primary key, dd date)")
session.execute(s"insert into $ks.$readTable (key, dd) values (1, '1930-05-31')")
session.execute(s"create table $ks.$writeTable (key int primary key, d0 date)")
}

it should s"read C* LocalDate columns in ${timeZone.getID} timezone" in {
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> readTable, "keyspace" -> ks, "cluster" -> "ClusterOne"))
.load

df.count should be(1)

val foundDate = df.first.getDate(1)
val foundLocalDate = foundDate.toLocalDate
val foundTuple = (foundLocalDate.getYear, foundLocalDate.getMonthValue, foundLocalDate.getDayOfMonth)

val expectedTuple = (1930, 5, 31)

foundTuple should be(expectedTuple)
}

it should s"write java.sql.date to C* date columns in ${timeZone.getID} timezone" in {
val schema = StructType(Seq(
StructField("key", DataTypes.IntegerType),
StructField("d0", DataTypes.DateType)
))

val rows = sc.parallelize(Seq(
Row(0, Date.valueOf("1986-01-02")),
Row(1, Date.valueOf("1987-01-02"))
))

val dataFrame = sqlContext.createDataFrame(rows, schema)

dataFrame.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> writeTable, "keyspace" -> ks, "cluster" -> "ClusterOne"))
.save

conn.withSessionDo { session =>
val count = session.execute(s"select count(1) from $ks.$writeTable").one().getLong(0)
count should be(2)

val date = session.execute(s"select d0 from $ks.$writeTable where key = 0").one().getDate(0)
date should be(LocalDate.fromYearMonthDay(1986, 1, 2))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.datastax.spark.connector.sql

import java.util.TimeZone

import org.scalatest.FlatSpec

/**
* This should be executed in separate JVM, as Catalyst caches default time zone
*/
class CassandraDataFrameDateCETSpec extends FlatSpec with CassandraDataFrameDateBehaviors {

val centralEuropeanTimeZone = TimeZone.getTimeZone("CET")

"A DataFrame in CET timezone" should behave like dataFrame(centralEuropeanTimeZone)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.datastax.spark.connector.sql

import java.util.TimeZone

import org.scalatest.FlatSpec

/**
* This should be executed in separate JVM, as Catalyst caches default time zone
*/
class CassandraDataFrameDatePSTSpec extends FlatSpec with CassandraDataFrameDateBehaviors {

val pacificTimeZone = TimeZone.getTimeZone("PST")

"A DataFrame in PST timezone" should behave like dataFrame(pacificTimeZone)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import java.io.IOException

import scala.collection.JavaConversions._
import scala.concurrent.Future

import com.datastax.spark.connector._
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.SQLContext
import org.joda.time.LocalDate

class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
useCassandraConfig(Seq("cassandra-default.yaml.template"))
Expand Down Expand Up @@ -56,12 +54,6 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"CREATE TABLE $ks.tuple_test1 (id int, t Tuple<text, int>, PRIMARY KEY (id))")
session.execute(s"CREATE TABLE $ks.tuple_test2 (id int, t Tuple<text, int>, PRIMARY KEY (id))")
session.execute(s"INSERT INTO $ks.tuple_test1 (id, t) VALUES (1, ('xyz', 3))")
},

Future {
session.execute(s"create table $ks.date_test (key int primary key, dd date)")
session.execute(s"create table $ks.date_test2 (key int primary key, dd date)")
session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')")
}
)
}
Expand Down Expand Up @@ -200,26 +192,4 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"select count(1) from $ks.tuple_test2").one().getLong(0) should be (1)
}
}

it should "read and write C* LocalDate columns" in {
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "date_test", "keyspace" -> ks, "cluster" -> "ClusterOne"))
.load

df.count should be (1)
df.first.getDate(1) should be (new LocalDate(1930, 5, 31).toDate)

df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "date_test2", "keyspace" -> ks, "cluster" -> "ClusterOne"))
.save

conn.withSessionDo { session =>
session.execute(s"select count(1) from $ks.date_test2").one().getLong(0) should be (1)
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.datastax.driver.core.{LocalDate, Row, TypeCodec, TupleValue => Driver
import com.datastax.spark.connector.types.TypeConverter.StringConverter
import com.datastax.spark.connector.util.ByteBufferUtil

import org.joda.time.DateTimeZone.UTC

trait GettableData extends GettableByIndexData {

def metaData: CassandraRowMetadata
Expand Down Expand Up @@ -77,7 +79,8 @@ object GettableData {
case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap
case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue)
case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue)
case localDate: LocalDate => new org.joda.time.LocalDate(localDate.getMillisSinceEpoch)
case localDate: LocalDate =>
new org.joda.time.LocalDate(localDate.getYear, localDate.getMonth, localDate.getDay)
case other => other.asInstanceOf[AnyRef]

}
Expand Down
Loading

0 comments on commit 8133089

Please sign in to comment.