From 29cd9e81963fb6f5aec55a8991cf3f4e0cf4ed15 Mon Sep 17 00:00:00 2001 From: Grigory Date: Mon, 11 Apr 2022 22:17:58 -0400 Subject: [PATCH] Replace transformDown with transform for the better DataBricks compat (#10) --- build.sbt | 2 +- .../analyticstoolbox/core/ST_Intersects.scala | 2 +- .../rules/SpatialFilterPushdownRules.scala | 19 +++++++++++++------ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index 198115e..2d49bd3 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x val scalaTestVersion = "3.2.11" val jtsVersion = "1.18.1" val geomesaVersion = "3.3.0" -val hivelessVersion = "0.0.10" +val hivelessVersion = "0.0.11" val geotrellisVersion = "3.6.2" // GeoTrellis depends on Shapeless 2.3.7 diff --git a/core/src/main/scala/com/carto/analyticstoolbox/core/ST_Intersects.scala b/core/src/main/scala/com/carto/analyticstoolbox/core/ST_Intersects.scala index a061375..39da9bb 100644 --- a/core/src/main/scala/com/carto/analyticstoolbox/core/ST_Intersects.scala +++ b/core/src/main/scala/com/carto/analyticstoolbox/core/ST_Intersects.scala @@ -38,7 +38,7 @@ object ST_Intersects { def parseGeometry(a: Arg): Option[Geometry] = a.select[Geometry].orElse(a.select[Extent].map(_.toPolygon())) private def parseGeometryUnsafe(a: Arg, aname: String): Geometry = - parseGeometry(a).getOrElse(throw ProductDeserializationError[Arg](classOf[ST_Intersects], aname)) + parseGeometry(a).getOrElse(throw ProductDeserializationError[ST_Intersects, Arg](aname)) def function(left: Arg, right: Arg): Boolean = { val (l, r) = (parseGeometryUnsafe(left, "first"), parseGeometryUnsafe(right, "second")) diff --git a/core/src/main/scala/com/carto/analyticstoolbox/spark/rules/SpatialFilterPushdownRules.scala b/core/src/main/scala/com/carto/analyticstoolbox/spark/rules/SpatialFilterPushdownRules.scala index 19930a0..e686dd0 100644 --- a/core/src/main/scala/com/carto/analyticstoolbox/spark/rules/SpatialFilterPushdownRules.scala +++ b/core/src/main/scala/com/carto/analyticstoolbox/spark/rules/SpatialFilterPushdownRules.scala @@ -37,7 +37,14 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] { @transient private[this] lazy val logger = getLogger def apply(plan: LogicalPlan): LogicalPlan = - plan.transformDown { + // format: off + /** + * transform is an alias to transformDown + * The transformDown usage causes the following error on DataBricks 9.1: + * java.lang.NoClassDefFoundError: LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan; + */ + // format: on + plan.transform { case f @ Filter(condition: HiveGenericUDF, plan) if condition.of[ST_Intersects] => try { val Seq(extentExpr, geometryExpr) = condition.children @@ -46,7 +53,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] { // Optimization is done only when the first argument is Extent if (!extentExpr.dataType.conformsToSchema(extentEncoder.schema)) throw new UnsupportedOperationException( - s"${classOf[ST_Intersects]} push-down optimization works on the Extent column data type only." + s"${classOf[ST_Intersects]} push-down optimization works on the ${classOf[Extent]} column data type only." ) // transform expression @@ -58,7 +65,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] { // The second argument can be Geometry or Extent val (extent, isGeometry) = Try(g.convert[Geometry].extent -> true) .orElse(Try(g.convert[Extent] -> false)) - .getOrElse(throw ProductDeserializationError[ST_Intersects.Arg](classOf[ST_Intersects], "second")) + .getOrElse(throw ProductDeserializationError[ST_Intersects, ST_Intersects.Arg]("second")) // transform expression AndList( @@ -97,12 +104,12 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] { ) } else { throw new UnsupportedOperationException( - "Geometry Envelope values extraction is not supported by the internal Geometry representation.".stripMargin + s"${classOf[Geometry]} Envelope values extraction is not supported by the internal ${classOf[Geometry]} representation.".stripMargin ) }*/ throw new UnsupportedOperationException( - s"${classOf[ST_Intersects]} push-down optimization works with Geometry and Extent Literals only." + s"${classOf[ST_Intersects]} push-down optimization works with ${classOf[Geometry]} and ${classOf[Extent]} Literals only." ) } @@ -112,7 +119,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] { case e: Throwable => logger.warn( s""" - |${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed. + |${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed, using the original plan. |StackTrace: ${ExceptionUtils.getStackTrace(e)} |""".stripMargin )