diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala index b42a23716..b27247c46 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.cassandra import com.datastax.spark.connector.cql.TableDef +import com.datastax.spark.connector.types.TimeUUIDType /** * Determines which filter predicates can be pushed down to Cassandra. @@ -61,6 +62,31 @@ class PredicatePushDown[Predicate : PredicateOps](predicates: Set[Predicate], ta private def firstNonEmptySet[T](sets: Set[T]*): Set[T] = sets.find(_.nonEmpty).getOrElse(Set.empty[T]) + + /** All non-equal predicates on a TimeUUID column are going to fail and fail + * in silent way. The basic issue here is that when you use a comparison on + * a time UUID column in C* it compares based on the Time portion of the UUID. When + * Spark repeats this filter (default behavior) it will compare lexically, this + * will lead to results being incorrectly filtered out of the set. Because + * there is no way to correctly do this computation in Spark < 1.6 we will simply + * throw an error to let the user know they cannot use the DataFrames API + * for this call. + */ + val timeUUIDNonEqual = { + val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType) + timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName)) + } + require(timeUUIDNonEqual.isEmpty, + s""" + | You are attempting to do a non-equality comparison on a TimeUUID column in Cassandra. + | Spark can only compare TimeUUIDs Lexically which means that the comparison will be + | different than the comparison done in C* which is done based on the Time Portion of + | TimeUUID. This will in almost all cases lead to incorrect results. If you are able to + | to pushdown your entire predicate to C* use the RDD interface with ".where()" to + | accomplish this. For more info see + | https://datastax-oss.atlassian.net/browse/SPARKC-405. + """.stripMargin) + /** * Selects partition key predicates for pushdown: * 1. Partition key predicates must be equality or IN predicates. diff --git a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala index 7f8ba5666..480a40704 100644 --- a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala +++ b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala @@ -1,9 +1,8 @@ package org.apache.spark.sql.cassandra -import org.scalatest.{Matchers, FlatSpec} - +import org.scalatest.{FlatSpec, Matchers} import com.datastax.spark.connector.cql._ -import com.datastax.spark.connector.types.IntType +import com.datastax.spark.connector.types.{IntType, TimeUUIDType} class PredicatePushDownSpec extends FlatSpec with Matchers { @@ -40,6 +39,8 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { val r1 = ColumnDef("r1", RegularColumn, IntType) val r2 = ColumnDef("r2", RegularColumn, IntType) + val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType) + val table = TableDef( keyspaceName = "test", tableName = "test", @@ -48,6 +49,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { regularColumns = Seq(i1, i2, r1, r2) ) + val timeUUIDTable = TableDef( + keyspaceName = "test", + tableName = "uuidtab", + partitionKey = Seq(pk1, pk2), + clusteringColumns = Seq(timeUUIDc1), + regularColumns = Seq(i1, i2, r1, r2) + ) + "PredicatePushDown" should "push down all equality predicates restricting partition key columns" in { val f1 = EqFilter("pk1") val f2 = EqFilter("pk2") @@ -56,6 +65,20 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { ppd.predicatesToPreserve shouldBe empty } + it should " break if the user tries to use a TimeUUID column in a non-eq predicate" in { + val f1 = GtFilter("c1") + val ex = intercept[IllegalArgumentException] { + val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable) + } + } + + it should " work if the user tries to use a TimeUUID column in a eq predicate" in { + val f1 = EqFilter("c1") + val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable) + ppd.predicatesToPushDown should contain (f1) + ppd.predicatesToPreserve shouldBe empty + } + it should "not push down a partition key predicate for a part of the partition key" in { val f1 = EqFilter("pk1") val ppd1 = new PredicatePushDown(Set[Filter](f1), table)