diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala index e783901c6..1232f10fc 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.cassandra import com.datastax.spark.connector.cql.TableDef +import com.datastax.spark.connector.types.TimeUUIDType /** * Determines which filter predicates can be pushed down to Cassandra. @@ -61,6 +62,22 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](predicates: Set[ private def firstNonEmptySet[T](sets: Set[T]*): Set[T] = sets.find(_.nonEmpty).getOrElse(Set.empty[T]) + + /** All non-equal predicates on a TimeUUID column are going to fail and fail + * in silent way. The basic issue here is that when you use a comparison on + * a time UUID column in C* it compares based on the Time portion of the UUID. When + * Spark executes this filter (unhandled behavior) it will compare lexically, this + * will lead to results being incorrectly filtered out of the set. As long as the + * range predicate is handled completely by the connector the correct result + * will be obtained. + */ + val timeUUIDNonEqual = { + val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType) + timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName)).flatten + } + + + /** * Selects partition key predicates for pushdown: * 1. Partition key predicates must be equality or IN predicates. @@ -137,4 +154,21 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](predicates: Set[ * so they must be applied by Spark */ val predicatesToPreserve: Set[Predicate] = predicates -- predicatesToPushDown + + + val unhandledTimeUUIDNonEqual = { + timeUUIDNonEqual.toSet -- predicatesToPushDown + } + + require(unhandledTimeUUIDNonEqual.isEmpty, + s""" + | You are attempting to do a non-equality comparison on a TimeUUID column in Spark. + | Spark can only compare TimeUUIDs Lexically which means that the comparison will be + | different than the comparison done in C* which is done based on the Time Portion of + | TimeUUID. This will in almost all cases lead to incorrect results. If possible restrict + | doing a TimeUUID comparison only to columns which can be pushed down to Cassandra. + | https://datastax-oss.atlassian.net/browse/SPARKC-405. + | + | $unhandledTimeUUIDNonEqual + """.stripMargin) } diff --git a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala index 39ca44bd0..eabdc1ea5 100644 --- a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala +++ b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala @@ -1,9 +1,8 @@ package org.apache.spark.sql.cassandra -import org.scalatest.{Matchers, FlatSpec} - +import org.scalatest.{FlatSpec, Matchers} import com.datastax.spark.connector.cql._ -import com.datastax.spark.connector.types.IntType +import com.datastax.spark.connector.types.{IntType, TimeUUIDType} class PredicatePushDownSpec extends FlatSpec with Matchers { @@ -39,6 +38,9 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { val i2 = ColumnDef("i2", RegularColumn, IntType) val r1 = ColumnDef("r1", RegularColumn, IntType) val r2 = ColumnDef("r2", RegularColumn, IntType) + val t1 = ColumnDef("t1", RegularColumn, TimeUUIDType) + + val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType) val table = TableDef( keyspaceName = "test", @@ -51,6 +53,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { IndexDef("DummyIndex", "i2", "IndexTwo", Map.empty)) ) + val timeUUIDTable = TableDef( + keyspaceName = "test", + tableName = "uuidtab", + partitionKey = Seq(pk1, pk2), + clusteringColumns = Seq(timeUUIDc1), + regularColumns = Seq(i1, i2, r1, r2, t1) + ) + "BasicCassandraPredicatePushDown" should "push down all equality predicates restricting partition key columns" in { val f1 = EqFilter("pk1") val f2 = EqFilter("pk2") @@ -59,6 +69,28 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { ppd.predicatesToPreserve shouldBe empty } + it should " break if the user tries to use a TimeUUID on a fully unhandled predicate" in { + val f1 = GtFilter("t1") + + val ex = intercept[IllegalArgumentException] { + val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable) + } + } + + it should " work if the user tries to use a TimeUUID on a fully handled predicate" in { + val f1 = GtFilter("c1") + val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable) + ppd.predicatesToPushDown should contain (f1) + ppd.predicatesToPreserve shouldBe empty + } + + it should " work if the user tries to use a TimeUUID column in a eq predicate" in { + val f1 = EqFilter("c1") + val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable) + ppd.predicatesToPushDown should contain (f1) + ppd.predicatesToPreserve shouldBe empty + } + it should "not push down a partition key predicate for a part of the partition key" in { val f1 = EqFilter("pk1") val ppd1 = new BasicCassandraPredicatePushDown(Set[Filter](f1), table)