From 57976a01ec7747038564c997a5507beb826392be Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 25 Jul 2016 17:40:52 -0700 Subject: [PATCH 1/2] SPARKC-405: Disallow TimeUUID Predicate Pushdown Timeuuids are compared differently in Spark and in Cassandra. In Cassandra Timeuuids are compared bas on the Time portion of the binary data. In Spark they are compared lexically (in byte order). In Spark < 1.6 all filters in Dataframes are applied twice, once at the DataSource and once in Spark. This double application would be done using both of the above comparitors leading to incorrect results. We will now throw exceptions if we sense an end user is trying to perform one of these impossible comparisons. --- .../sql/cassandra/PredicatePushDown.scala | 26 +++++++++++++++++ .../sql/cassandra/PredicatePushDownSpec.scala | 29 +++++++++++++++++-- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala index b42a23716..46c85ec40 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.cassandra import com.datastax.spark.connector.cql.TableDef +import com.datastax.spark.connector.types.TimeUUIDType /** * Determines which filter predicates can be pushed down to Cassandra. @@ -61,6 +62,31 @@ class PredicatePushDown[Predicate : PredicateOps](predicates: Set[Predicate], ta private def firstNonEmptySet[T](sets: Set[T]*): Set[T] = sets.find(_.nonEmpty).getOrElse(Set.empty[T]) + + /** All non-equal predicates on a TimeUUID column are going to fail and fail + * in silent way. The basic issue here is that when you use a comparison on + * a time UUID column in C* it compares based on the Time portion of the UUID. When + * Spark repeats this filter (default behavior) it will compare lexically, this + * will lead to results being incorrectly filtered out of the set. Because + * there is no way to correctly do this computation in Spark < 1.6 we will simply + * throw an error to let the user know they cannot use the DataFrames API + * for this call. + */ + val timeUUIDNonEqual = { + val timeUUIDCols = table.columns.filter( x => x.columnType == TimeUUIDType) + timeUUIDCols.flatMap( col => rangePredicatesByName.get(col.columnName)) + } + require(timeUUIDNonEqual.isEmpty, + s""" + | You are attempting to do a non-equality comparison on a TimeUUID column in Cassandra. + | Spark can only compare TimeUUIDs Lexically which means that the comparison will be + | different than the comparison done in C* which is done based on the Time Portion of + | TimeUUID. This will in almost all cases lead to incorrect results. If you are able to + | to pushdown your entire predicate to C* use the RDD interface with ".where()" to + | accomplish this. For more info see + | https://datastax-oss.atlassian.net/browse/SPARKC-405. + """.stripMargin) + /** * Selects partition key predicates for pushdown: * 1. Partition key predicates must be equality or IN predicates. diff --git a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala index 7f8ba5666..480a40704 100644 --- a/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala +++ b/spark-cassandra-connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala @@ -1,9 +1,8 @@ package org.apache.spark.sql.cassandra -import org.scalatest.{Matchers, FlatSpec} - +import org.scalatest.{FlatSpec, Matchers} import com.datastax.spark.connector.cql._ -import com.datastax.spark.connector.types.IntType +import com.datastax.spark.connector.types.{IntType, TimeUUIDType} class PredicatePushDownSpec extends FlatSpec with Matchers { @@ -40,6 +39,8 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { val r1 = ColumnDef("r1", RegularColumn, IntType) val r2 = ColumnDef("r2", RegularColumn, IntType) + val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType) + val table = TableDef( keyspaceName = "test", tableName = "test", @@ -48,6 +49,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { regularColumns = Seq(i1, i2, r1, r2) ) + val timeUUIDTable = TableDef( + keyspaceName = "test", + tableName = "uuidtab", + partitionKey = Seq(pk1, pk2), + clusteringColumns = Seq(timeUUIDc1), + regularColumns = Seq(i1, i2, r1, r2) + ) + "PredicatePushDown" should "push down all equality predicates restricting partition key columns" in { val f1 = EqFilter("pk1") val f2 = EqFilter("pk2") @@ -56,6 +65,20 @@ class PredicatePushDownSpec extends FlatSpec with Matchers { ppd.predicatesToPreserve shouldBe empty } + it should " break if the user tries to use a TimeUUID column in a non-eq predicate" in { + val f1 = GtFilter("c1") + val ex = intercept[IllegalArgumentException] { + val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable) + } + } + + it should " work if the user tries to use a TimeUUID column in a eq predicate" in { + val f1 = EqFilter("c1") + val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable) + ppd.predicatesToPushDown should contain (f1) + ppd.predicatesToPreserve shouldBe empty + } + it should "not push down a partition key predicate for a part of the partition key" in { val f1 = EqFilter("pk1") val ppd1 = new PredicatePushDown(Set[Filter](f1), table) From 6d743544296fbcbfb366a301dd5764eb67f31c8e Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 25 Jul 2016 19:32:19 -0700 Subject: [PATCH 2/2] SPARKC-405: Reformat --- .../org/apache/spark/sql/cassandra/PredicatePushDown.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala index 46c85ec40..b27247c46 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/PredicatePushDown.scala @@ -73,8 +73,8 @@ class PredicatePushDown[Predicate : PredicateOps](predicates: Set[Predicate], ta * for this call. */ val timeUUIDNonEqual = { - val timeUUIDCols = table.columns.filter( x => x.columnType == TimeUUIDType) - timeUUIDCols.flatMap( col => rangePredicatesByName.get(col.columnName)) + val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType) + timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName)) } require(timeUUIDNonEqual.isEmpty, s"""