Skip to content

Commit

Permalink
Replace transformDown with transform for the better DataBricks compat (
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin authored Apr 12, 2022
1 parent be8c74e commit 29cd9e8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x
val scalaTestVersion = "3.2.11"
val jtsVersion = "1.18.1"
val geomesaVersion = "3.3.0"
val hivelessVersion = "0.0.10"
val hivelessVersion = "0.0.11"
val geotrellisVersion = "3.6.2"

// GeoTrellis depends on Shapeless 2.3.7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ST_Intersects {
def parseGeometry(a: Arg): Option[Geometry] = a.select[Geometry].orElse(a.select[Extent].map(_.toPolygon()))

private def parseGeometryUnsafe(a: Arg, aname: String): Geometry =
parseGeometry(a).getOrElse(throw ProductDeserializationError[Arg](classOf[ST_Intersects], aname))
parseGeometry(a).getOrElse(throw ProductDeserializationError[ST_Intersects, Arg](aname))

def function(left: Arg, right: Arg): Boolean = {
val (l, r) = (parseGeometryUnsafe(left, "first"), parseGeometryUnsafe(right, "second"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
@transient private[this] lazy val logger = getLogger

def apply(plan: LogicalPlan): LogicalPlan =
plan.transformDown {
// format: off
/**
* transform is an alias to transformDown
* The transformDown usage causes the following error on DataBricks 9.1:
* java.lang.NoClassDefFoundError: LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
*/
// format: on
plan.transform {
case f @ Filter(condition: HiveGenericUDF, plan) if condition.of[ST_Intersects] =>
try {
val Seq(extentExpr, geometryExpr) = condition.children
Expand All @@ -46,7 +53,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
// Optimization is done only when the first argument is Extent
if (!extentExpr.dataType.conformsToSchema(extentEncoder.schema))
throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works on the Extent column data type only."
s"${classOf[ST_Intersects]} push-down optimization works on the ${classOf[Extent]} column data type only."
)

// transform expression
Expand All @@ -58,7 +65,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
// The second argument can be Geometry or Extent
val (extent, isGeometry) = Try(g.convert[Geometry].extent -> true)
.orElse(Try(g.convert[Extent] -> false))
.getOrElse(throw ProductDeserializationError[ST_Intersects.Arg](classOf[ST_Intersects], "second"))
.getOrElse(throw ProductDeserializationError[ST_Intersects, ST_Intersects.Arg]("second"))

// transform expression
AndList(
Expand Down Expand Up @@ -97,12 +104,12 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
)
} else {
throw new UnsupportedOperationException(
"Geometry Envelope values extraction is not supported by the internal Geometry representation.".stripMargin
s"${classOf[Geometry]} Envelope values extraction is not supported by the internal ${classOf[Geometry]} representation.".stripMargin
)
}*/

throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works with Geometry and Extent Literals only."
s"${classOf[ST_Intersects]} push-down optimization works with ${classOf[Geometry]} and ${classOf[Extent]} Literals only."
)
}

Expand All @@ -112,7 +119,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
case e: Throwable =>
logger.warn(
s"""
|${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed.
|${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed, using the original plan.
|StackTrace: ${ExceptionUtils.getStackTrace(e)}
|""".stripMargin
)
Expand Down

0 comments on commit 29cd9e8

Please sign in to comment.