Skip to content

Commit

Permalink
SPARKC-405: Disallow TimeUUID Predicate Pushdown
Browse files Browse the repository at this point in the history
Timeuuids are compared differently in Spark and in Cassandra. In
Cassandra Timeuuids are compared bas on the Time portion of the binary
data. In Spark they are compared lexically (in byte order). In Spark <
1.6 all filters in Dataframes are applied twice, once at the DataSource
and once in Spark. This double application would be done using both of
the above comparitors leading to incorrect results. We will now throw
exceptions if we sense an end user is trying to perform one of these
impossible comparisons.
  • Loading branch information
RussellSpitzer committed Jul 26, 2016
1 parent 154bb52 commit 57976a0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.cassandra

import com.datastax.spark.connector.cql.TableDef
import com.datastax.spark.connector.types.TimeUUIDType

/**
* Determines which filter predicates can be pushed down to Cassandra.
Expand Down Expand Up @@ -61,6 +62,31 @@ class PredicatePushDown[Predicate : PredicateOps](predicates: Set[Predicate], ta
private def firstNonEmptySet[T](sets: Set[T]*): Set[T] =
sets.find(_.nonEmpty).getOrElse(Set.empty[T])


/** All non-equal predicates on a TimeUUID column are going to fail and fail
* in silent way. The basic issue here is that when you use a comparison on
* a time UUID column in C* it compares based on the Time portion of the UUID. When
* Spark repeats this filter (default behavior) it will compare lexically, this
* will lead to results being incorrectly filtered out of the set. Because
* there is no way to correctly do this computation in Spark < 1.6 we will simply
* throw an error to let the user know they cannot use the DataFrames API
* for this call.
*/
val timeUUIDNonEqual = {
val timeUUIDCols = table.columns.filter( x => x.columnType == TimeUUIDType)
timeUUIDCols.flatMap( col => rangePredicatesByName.get(col.columnName))
}
require(timeUUIDNonEqual.isEmpty,
s"""
| You are attempting to do a non-equality comparison on a TimeUUID column in Cassandra.
| Spark can only compare TimeUUIDs Lexically which means that the comparison will be
| different than the comparison done in C* which is done based on the Time Portion of
| TimeUUID. This will in almost all cases lead to incorrect results. If you are able to
| to pushdown your entire predicate to C* use the RDD interface with ".where()" to
| accomplish this. For more info see
| https://datastax-oss.atlassian.net/browse/SPARKC-405.
""".stripMargin)

/**
* Selects partition key predicates for pushdown:
* 1. Partition key predicates must be equality or IN predicates.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package org.apache.spark.sql.cassandra

import org.scalatest.{Matchers, FlatSpec}

import org.scalatest.{FlatSpec, Matchers}
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.types.IntType
import com.datastax.spark.connector.types.{IntType, TimeUUIDType}

class PredicatePushDownSpec extends FlatSpec with Matchers {

Expand Down Expand Up @@ -40,6 +39,8 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
val r1 = ColumnDef("r1", RegularColumn, IntType)
val r2 = ColumnDef("r2", RegularColumn, IntType)

val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType)

val table = TableDef(
keyspaceName = "test",
tableName = "test",
Expand All @@ -48,6 +49,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
regularColumns = Seq(i1, i2, r1, r2)
)

val timeUUIDTable = TableDef(
keyspaceName = "test",
tableName = "uuidtab",
partitionKey = Seq(pk1, pk2),
clusteringColumns = Seq(timeUUIDc1),
regularColumns = Seq(i1, i2, r1, r2)
)

"PredicatePushDown" should "push down all equality predicates restricting partition key columns" in {
val f1 = EqFilter("pk1")
val f2 = EqFilter("pk2")
Expand All @@ -56,6 +65,20 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
ppd.predicatesToPreserve shouldBe empty
}

it should " break if the user tries to use a TimeUUID column in a non-eq predicate" in {
val f1 = GtFilter("c1")
val ex = intercept[IllegalArgumentException] {
val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable)
}
}

it should " work if the user tries to use a TimeUUID column in a eq predicate" in {
val f1 = EqFilter("c1")
val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable)
ppd.predicatesToPushDown should contain (f1)
ppd.predicatesToPreserve shouldBe empty
}

it should "not push down a partition key predicate for a part of the partition key" in {
val f1 = EqFilter("pk1")
val ppd1 = new PredicatePushDown(Set[Filter](f1), table)
Expand Down

0 comments on commit 57976a0

Please sign in to comment.