Skip to content

Commit

Permalink
Merge pull request datastax#1010 from datastax/SPARKC-405-b1.6
Browse files Browse the repository at this point in the history
SPARKC-405: B1.6
  • Loading branch information
RussellSpitzer authored Aug 17, 2016
2 parents 0303005 + 8459ca0 commit 5eebabf
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.cassandra

import com.datastax.spark.connector.cql.TableDef
import com.datastax.spark.connector.types.TimeUUIDType

/**
* Determines which filter predicates can be pushed down to Cassandra.
Expand Down Expand Up @@ -61,6 +62,22 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](predicates: Set[
private def firstNonEmptySet[T](sets: Set[T]*): Set[T] =
sets.find(_.nonEmpty).getOrElse(Set.empty[T])


/** All non-equal predicates on a TimeUUID column are going to fail and fail
* in silent way. The basic issue here is that when you use a comparison on
* a time UUID column in C* it compares based on the Time portion of the UUID. When
* Spark executes this filter (unhandled behavior) it will compare lexically, this
* will lead to results being incorrectly filtered out of the set. As long as the
* range predicate is handled completely by the connector the correct result
* will be obtained.
*/
val timeUUIDNonEqual = {
val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType)
timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName)).flatten
}



/**
* Selects partition key predicates for pushdown:
* 1. Partition key predicates must be equality or IN predicates.
Expand Down Expand Up @@ -137,4 +154,21 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](predicates: Set[
* so they must be applied by Spark */
val predicatesToPreserve: Set[Predicate] =
predicates -- predicatesToPushDown


val unhandledTimeUUIDNonEqual = {
timeUUIDNonEqual.toSet -- predicatesToPushDown
}

require(unhandledTimeUUIDNonEqual.isEmpty,
s"""
| You are attempting to do a non-equality comparison on a TimeUUID column in Spark.
| Spark can only compare TimeUUIDs Lexically which means that the comparison will be
| different than the comparison done in C* which is done based on the Time Portion of
| TimeUUID. This will in almost all cases lead to incorrect results. If possible restrict
| doing a TimeUUID comparison only to columns which can be pushed down to Cassandra.
| https://datastax-oss.atlassian.net/browse/SPARKC-405.
|
| $unhandledTimeUUIDNonEqual
""".stripMargin)
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package org.apache.spark.sql.cassandra

import org.scalatest.{Matchers, FlatSpec}

import org.scalatest.{FlatSpec, Matchers}
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.types.IntType
import com.datastax.spark.connector.types.{IntType, TimeUUIDType}

class PredicatePushDownSpec extends FlatSpec with Matchers {

Expand Down Expand Up @@ -39,6 +38,9 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
val i2 = ColumnDef("i2", RegularColumn, IntType)
val r1 = ColumnDef("r1", RegularColumn, IntType)
val r2 = ColumnDef("r2", RegularColumn, IntType)
val t1 = ColumnDef("t1", RegularColumn, TimeUUIDType)

val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType)

val table = TableDef(
keyspaceName = "test",
Expand All @@ -51,6 +53,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
IndexDef("DummyIndex", "i2", "IndexTwo", Map.empty))
)

val timeUUIDTable = TableDef(
keyspaceName = "test",
tableName = "uuidtab",
partitionKey = Seq(pk1, pk2),
clusteringColumns = Seq(timeUUIDc1),
regularColumns = Seq(i1, i2, r1, r2, t1)
)

"BasicCassandraPredicatePushDown" should "push down all equality predicates restricting partition key columns" in {
val f1 = EqFilter("pk1")
val f2 = EqFilter("pk2")
Expand All @@ -59,6 +69,28 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
ppd.predicatesToPreserve shouldBe empty
}

it should " break if the user tries to use a TimeUUID on a fully unhandled predicate" in {
val f1 = GtFilter("t1")

val ex = intercept[IllegalArgumentException] {
val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable)
}
}

it should " work if the user tries to use a TimeUUID on a fully handled predicate" in {
val f1 = GtFilter("c1")
val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable)
ppd.predicatesToPushDown should contain (f1)
ppd.predicatesToPreserve shouldBe empty
}

it should " work if the user tries to use a TimeUUID column in a eq predicate" in {
val f1 = EqFilter("c1")
val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable)
ppd.predicatesToPushDown should contain (f1)
ppd.predicatesToPreserve shouldBe empty
}

it should "not push down a partition key predicate for a part of the partition key" in {
val f1 = EqFilter("pk1")
val ppd1 = new BasicCassandraPredicatePushDown(Set[Filter](f1), table)
Expand Down

0 comments on commit 5eebabf

Please sign in to comment.