Skip to content

Commit

Permalink
Merge branch 'b1.4' into b1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellSpitzer committed Aug 17, 2016
2 parents 14ae351 + 48bd879 commit 5aa9efd
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.cassandra

import com.datastax.spark.connector.cql.TableDef
import com.datastax.spark.connector.types.TimeUUIDType

/**
* Determines which filter predicates can be pushed down to Cassandra.
Expand Down Expand Up @@ -61,6 +62,31 @@ class PredicatePushDown[Predicate : PredicateOps](predicates: Set[Predicate], ta
private def firstNonEmptySet[T](sets: Set[T]*): Set[T] =
sets.find(_.nonEmpty).getOrElse(Set.empty[T])


/** All non-equal predicates on a TimeUUID column are going to fail and fail
* in silent way. The basic issue here is that when you use a comparison on
* a time UUID column in C* it compares based on the Time portion of the UUID. When
* Spark repeats this filter (default behavior) it will compare lexically, this
* will lead to results being incorrectly filtered out of the set. Because
* there is no way to correctly do this computation in Spark < 1.6 we will simply
* throw an error to let the user know they cannot use the DataFrames API
* for this call.
*/
val timeUUIDNonEqual = {
val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType)
timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName))
}
require(timeUUIDNonEqual.isEmpty,
s"""
| You are attempting to do a non-equality comparison on a TimeUUID column in Cassandra.
| Spark can only compare TimeUUIDs Lexically which means that the comparison will be
| different than the comparison done in C* which is done based on the Time Portion of
| TimeUUID. This will in almost all cases lead to incorrect results. If you are able to
| to pushdown your entire predicate to C* use the RDD interface with ".where()" to
| accomplish this. For more info see
| https://datastax-oss.atlassian.net/browse/SPARKC-405.
""".stripMargin)

/**
* Selects partition key predicates for pushdown:
* 1. Partition key predicates must be equality or IN predicates.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package org.apache.spark.sql.cassandra

import org.scalatest.{Matchers, FlatSpec}

import org.scalatest.{FlatSpec, Matchers}
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.types.IntType
import com.datastax.spark.connector.types.{IntType, TimeUUIDType}

class PredicatePushDownSpec extends FlatSpec with Matchers {

Expand Down Expand Up @@ -40,6 +39,8 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
val r1 = ColumnDef("r1", RegularColumn, IntType)
val r2 = ColumnDef("r2", RegularColumn, IntType)

val timeUUIDc1 = ColumnDef("c1", ClusteringColumn(0), TimeUUIDType)

val table = TableDef(
keyspaceName = "test",
tableName = "test",
Expand All @@ -48,6 +49,14 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
regularColumns = Seq(i1, i2, r1, r2)
)

val timeUUIDTable = TableDef(
keyspaceName = "test",
tableName = "uuidtab",
partitionKey = Seq(pk1, pk2),
clusteringColumns = Seq(timeUUIDc1),
regularColumns = Seq(i1, i2, r1, r2)
)

"PredicatePushDown" should "push down all equality predicates restricting partition key columns" in {
val f1 = EqFilter("pk1")
val f2 = EqFilter("pk2")
Expand All @@ -56,6 +65,20 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
ppd.predicatesToPreserve shouldBe empty
}

it should " break if the user tries to use a TimeUUID column in a non-eq predicate" in {
val f1 = GtFilter("c1")
val ex = intercept[IllegalArgumentException] {
val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable)
}
}

it should " work if the user tries to use a TimeUUID column in a eq predicate" in {
val f1 = EqFilter("c1")
val ppd = new PredicatePushDown(Set[Filter](f1), timeUUIDTable)
ppd.predicatesToPushDown should contain (f1)
ppd.predicatesToPreserve shouldBe empty
}

it should "not push down a partition key predicate for a part of the partition key" in {
val f1 = EqFilter("pk1")
val ppd1 = new PredicatePushDown(Set[Filter](f1), table)
Expand Down

0 comments on commit 5aa9efd

Please sign in to comment.