Skip to content

Commit

Permalink
SPARKC-385 add driver.LocalDate conversion
Browse files Browse the repository at this point in the history
Conversion has to be added because driver's LocalDate is not
serializable.

GettableData now converts LocalDate to Joda's LocalDate.
  • Loading branch information
jtgrabowski committed May 25, 2016
1 parent df26b2d commit b9d5abb
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.reflect.runtime.universe.typeTag

import org.joda.time.DateTime
import org.joda.time.{DateTime, LocalDate}

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
Expand Down Expand Up @@ -184,6 +184,11 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"insert into $ks.tuple_test3 (id, t) VALUES (0, (1, ('foo', 2.3)))")
session.execute(nestedTupleTable("tuple_test4"))
session.execute(nestedTupleTable("tuple_test5"))
},

Future {
session.execute(s"create table $ks.date_test (key int primary key, dd date)")
session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')")
}
)
}
Expand Down Expand Up @@ -1032,4 +1037,36 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"select count(1) from $ks.tuple_test5").one().getLong(0) should be (2)
}
}

it should "write Java dates as C* date type" in {
val dateRow = (7, new Date())
val dateTimeRow = (8, DateTime.now())

sc.parallelize(List(dateRow, dateTimeRow))
.saveToCassandra(ks, "date_test")

val resultSet = conn.withSessionDo { session =>
session.execute(
s"select count(1) from $ks.date_test where key in (${dateRow._1}, ${dateTimeRow._1})")
}
resultSet.one().getLong(0) should be(2)
}

it should "read C* row with dates as Java dates" in {
val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda
val row = sc.cassandraTable(ks, "date_test").where("key = 1").first

row.getInt("key") should be(1)
row.getDate("dd") should be(expected.toDate)
row.getDateTime("dd").toLocalDate should be(expected)
}

it should "read LocalDate as tuple value with given type" in {
val expected: LocalDate = new LocalDate(1930, 5, 31) // note this is Joda
val date = sc.cassandraTable[(Int, Date)](ks, "date_test").where("key = 1").first._2
val dateTime = sc.cassandraTable[(Int, DateTime)](ks, "date_test").where("key = 1").first._2

date should be(expected.toDate)
dateTime.toLocalDate should be(expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.datastax.spark.connector._
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.SQLContext
import org.joda.time.LocalDate

class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
useCassandraConfig(Seq("cassandra-default.yaml.template"))
Expand Down Expand Up @@ -55,6 +56,12 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"CREATE TABLE $ks.tuple_test1 (id int, t Tuple<text, int>, PRIMARY KEY (id))")
session.execute(s"CREATE TABLE $ks.tuple_test2 (id int, t Tuple<text, int>, PRIMARY KEY (id))")
session.execute(s"INSERT INTO $ks.tuple_test1 (id, t) VALUES (1, ('xyz', 3))")
},

Future {
session.execute(s"create table $ks.date_test (key int primary key, dd date)")
session.execute(s"create table $ks.date_test2 (key int primary key, dd date)")
session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')")
}
)
}
Expand Down Expand Up @@ -185,5 +192,25 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase {
}
}

it should "read and write C* LocalDate columns" in {
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "date_test", "keyspace" -> ks, "cluster" -> "ClusterOne"))
.load

df.count should be (1)
df.first.getDate(1) should be (new LocalDate(1930, 5, 31).toDate)

df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "date_test2", "keyspace" -> ks, "cluster" -> "ClusterOne"))
.save

conn.withSessionDo { session =>
session.execute(s"select count(1) from $ks.date_test2").one().getLong(0) should be (1)
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.nio.ByteBuffer

import scala.collection.JavaConversions._

import com.datastax.driver.core.{Row, UDTValue => DriverUDTValue, TupleValue => DriverTupleValue}
import com.datastax.driver.core.{LocalDate, Row, TupleValue => DriverTupleValue, UDTValue => DriverUDTValue}
import com.datastax.spark.connector.types.TypeConverter.StringConverter
import com.datastax.spark.connector.util.ByteBufferUtil

Expand Down Expand Up @@ -80,6 +80,7 @@ object GettableData {
case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap
case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue)
case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue)
case localDate: LocalDate => new org.joda.time.LocalDate(localDate.getMillisSinceEpoch)
case other => other.asInstanceOf[AnyRef]

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ object TypeConverter {
case x: UUID if x.version() == 1 => new Date(x.timestamp())
case x: LocalDate => new Date(x.getMillisSinceEpoch)
case x: String => TimestampParser.parse(x)
case x: org.joda.time.LocalDate => x.toDate
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object CassandraSQLRow {
value match {
case date: Date => new Timestamp(date.getTime)
case localDate: LocalDate => new java.sql.Date(subtractTimeZoneOffset(localDate.getMillisSinceEpoch))
case localDate: org.joda.time.LocalDate => new java.sql.Date(localDate.toDate.getTime)
case str: String => UTF8String.fromString(str)
case bigInteger: BigInteger => Decimal(bigInteger.toString)
case inetAddress: InetAddress => UTF8String.fromString(inetAddress.getHostAddress)
Expand Down

0 comments on commit b9d5abb

Please sign in to comment.