Skip to content

Commit

Permalink
Make ST_Intersects polymorphic (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Apr 10, 2022
1 parent 76f77bf commit d30e97b
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 75 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x
val scalaTestVersion = "3.2.11"
val jtsVersion = "1.18.1"
val geomesaVersion = "3.3.0"
val hivelessVersion = "0.0.6"
val hivelessVersion = "0.0.7"
val geotrellisVersion = "3.6.2"

// GeoTrellis depends on Shapeless 2.3.7
Expand Down
3 changes: 1 addition & 2 deletions core/sql/createUDFs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ CREATE OR REPLACE FUNCTION st_crsFromText as 'com.carto.analyticstoolbox.index.S
CREATE OR REPLACE FUNCTION st_extentFromGeom as 'com.carto.analyticstoolbox.index.ST_ExtentFromGeom';
CREATE OR REPLACE FUNCTION st_extentToGeom as 'com.carto.analyticstoolbox.index.ST_ExtentToGeom';
CREATE OR REPLACE FUNCTION st_geomReproject as 'com.carto.analyticstoolbox.index.ST_GeomReproject';
CREATE OR REPLACE FUNCTION st_intersectsExtent as 'com.carto.analyticstoolbox.index.ST_IntersectsExtent';
CREATE OR REPLACE FUNCTION st_intersectsExtents as 'com.carto.analyticstoolbox.index.ST_IntersectsExtents';
CREATE OR REPLACE FUNCTION st_makeExtent as 'com.carto.analyticstoolbox.index.ST_MakeExtent';
CREATE OR REPLACE FUNCTION st_partitionCentroid as 'com.carto.analyticstoolbox.index.ST_PartitionCentroid';
CREATE OR REPLACE FUNCTION st_z2LatLon as 'com.carto.analyticstoolbox.index.ST_Z2LatLon';
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,33 @@

package com.carto.analyticstoolbox.core

import com.carto.analyticstoolbox.core._
import com.carto.analyticstoolbox.index._

import com.azavea.hiveless.HUDF
import com.azavea.hiveless.implicits.tupler._
import org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions
import org.locationtech.jts.geom.Geometry
import com.azavea.hiveless.serializers.UnaryDeserializer
import geotrellis.vector._
import shapeless._

class ST_Intersects extends HUDF[(ST_Intersects.Arg, ST_Intersects.Arg), Boolean] {
val name: String = "st_intersects"
def function = ST_Intersects.function
}

object ST_Intersects {
import UnaryDeserializer.Errors.ProductDeserializationError

type Arg = Extent :+: Geometry :+: CNil

def parseExtent(a: Arg): Option[Extent] = a.select[Extent].orElse(a.select[Geometry].map(_.extent))

private def parseExtentUnsafe(a: Arg, aname: String): Extent =
parseExtent(a).getOrElse(throw ProductDeserializationError[Arg](classOf[ST_Intersects], aname))

import java.{lang => jl}
def function(left: Arg, right: Arg): Boolean = {
val (l, r) = (parseExtentUnsafe(left, "first"), parseExtentUnsafe(right, "second"))

class ST_Intersects extends HUDF[(Geometry, Geometry), jl.Boolean] {
val name: String = "st_intersection"
def function = SpatialRelationFunctions.ST_Intersects
l.intersects(r)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.carto.analyticstoolbox.core._

import com.azavea.hiveless.HUDF
import geotrellis.vector._
import org.locationtech.jts.geom.Geometry

class ST_ExtentFromGeom extends HUDF[Geometry, Extent] {
val name: String = "st_extentFromGeom"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.carto.analyticstoolbox.core._

import com.azavea.hiveless.HUDF
import geotrellis.vector._
import org.locationtech.jts.geom.Geometry

class ST_ExtentToGeom extends HUDF[Extent, Geometry] {
val name: String = "st_extentToGeom"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.azavea.hiveless.HUDF
import com.azavea.hiveless.implicits.tupler._
import geotrellis.proj4.CRS
import geotrellis.vector._
import org.locationtech.jts.geom.Geometry

class ST_GeomReproject extends HUDF[(Geometry, CRS, CRS), Geometry] {
val name: String = "st_geomReproject"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Azavea
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,10 @@
package com.carto.analyticstoolbox.index

import com.azavea.hiveless.HUDF
import geotrellis.vector.Extent
import com.azavea.hiveless.implicits.tupler._
import geotrellis.vector._

class ST_IntersectsExtents extends HUDF[(Extent, Extent), Boolean] {
val name: String = "st_intersectsExtents"
def function = { (e1: Extent, e2: Extent) => e1.intersects(e2) }
class ST_MakeExtent extends HUDF[(Double, Double, Double, Double), Extent] {
val name: String = "st_makeExtent"
def function = Extent.apply
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import geotrellis.layer.{SpatialKey, ZoomedLayoutScheme}
import geotrellis.vector._
import geotrellis.proj4.{CRS, LatLng}
import geotrellis.store.index.zcurve.Z2
import org.locationtech.jts.geom.Geometry

class ST_PartitionCentroid extends HUDF[(Geometry, Int, Option[Int], Option[Int], Option[CRS], Option[Double]), Long] {
val name: String = "st_partitionCentroid"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.azavea.hiveless.HUDF
import com.carto.analyticstoolbox.core._
import com.carto.analyticstoolbox.spark.geotrellis.Z2Index
import geotrellis.store.index.zcurve.Z2
import org.locationtech.jts.geom.Geometry
import geotrellis.vector.Geometry

class ST_Z2LatLon extends HUDF[Geometry, Z2Index] {
val name: String = "st_z2LatLon"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ package object index extends StandardEncoders {
def convert(argument: Any): CRS = CRS.fromString(argument.convert[String])
}

implicit def extentConverter: HConverter[Extent] = new HConverter[Extent] {
def convert(argument: Any): Extent = argument.convert[InternalRow].as[Extent]
}

implicit def crsUnaryDeserializer: UnaryDeserializer[Id, CRS] =
(arguments, inspectors) => arguments.deserialize[String](inspectors).convert[CRS]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,104 @@
package com.carto.analyticstoolbox.spark.rules

import com.carto.analyticstoolbox.core._
import com.carto.analyticstoolbox.index.ST_IntersectsExtent
import com.azavea.hiveless.serializers.syntax._
import com.carto.analyticstoolbox.index._
import com.azavea.hiveless.spark.rules.syntax._
import com.azavea.hiveless.serializers.syntax._
import geotrellis.vector._
import cats.syntax.option._
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.sql.hive.HivelessInternals.HiveGenericUDF
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.log4s.getLogger

import scala.util.{Failure, Success, Try}

object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
@transient private[this] lazy val logger = getLogger

def apply(plan: LogicalPlan): LogicalPlan =
plan.transformDown {
// HiveGenericUDF is a private[hive] case class
case Filter(condition: HiveGenericUDF, plan) if condition.of[ST_IntersectsExtent] =>
// extract bbox, snd
val Seq(bboxExpr, geometryExpr) = condition.children
// extract extent from the right
val extent = geometryExpr.eval(null).convert[Geometry].extent

// transform expression
val expr = AndList(
List(
IsNotNull(bboxExpr),
GreaterThanOrEqual(GetStructField(bboxExpr, 0, "xmin".some), Literal(extent.xmin)),
GreaterThanOrEqual(GetStructField(bboxExpr, 1, "ymin".some), Literal(extent.ymin)),
LessThanOrEqual(GetStructField(bboxExpr, 2, "xmax".some), Literal(extent.xmax)),
LessThanOrEqual(GetStructField(bboxExpr, 3, "ymax".some), Literal(extent.ymax))
)
)

Filter(expr, plan)
case f @ Filter(condition: HiveGenericUDF, plan) if condition.of[ST_Intersects] =>
try {
val Seq(extentExpr, geometryExpr) = condition.children

// ST_Intersects is polymorphic by the first argument
// Optimization is done only when the first argument is Extent
if (!extentExpr.dataType.conformsToSchema(extentEncoder.schema))
throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works on the Extent column data type only."
)

// transform expression
val expr = Try {
// ST_Intersects is polymorphic by the second argument
// Extract Extent literal from the right
// The second argument can be Geometry or Extent
val g = geometryExpr.eval(null)
Try(g.convert[Geometry].extent).getOrElse(g.convert[Extent])
} match {
// Literals push-down support only
case Success(extent) =>
// transform expression
AndList(
List(
IsNotNull(extentExpr),
GreaterThanOrEqual(GetStructField(extentExpr, 0, "xmin".some), Literal(extent.xmin)),
GreaterThanOrEqual(GetStructField(extentExpr, 1, "ymin".some), Literal(extent.ymin)),
LessThanOrEqual(GetStructField(extentExpr, 2, "xmax".some), Literal(extent.xmax)),
LessThanOrEqual(GetStructField(extentExpr, 3, "ymax".some), Literal(extent.ymax))
)
)
// Expression
case Failure(_) =>
// In case on the right we have an Expression, no further optimizations needed and
// such predicates won't be pushed down.
//
// In case Geometry is on the right, we can't extract Envelope coordinates, to perform it we need to define
// User Defined Expression and that won't be pushed down.
//
// However, it is possible to extract coordinates out of Extent.
// In this case the GetStructField can be used to extract values and transform the request,
// though such predicates are not pushed down as well.
//
// The rough implementation of the idea above (The transformed plan for Extent, which is not pushed down):
/*if (geometryExpr.dataType.conformsToSchema(extentEncoder.schema)) {
AndList(
List(
IsNotNull(extentExpr),
GreaterThanOrEqual(GetStructField(extentExpr, 0, "xmin".some), GetStructField(geometryExpr, 0, "xmin".some)),
GreaterThanOrEqual(GetStructField(extentExpr, 1, "ymin".some), GetStructField(geometryExpr, 1, "ymin".some)),
LessThanOrEqual(GetStructField(extentExpr, 2, "xmax".some), GetStructField(geometryExpr, 2, "xmax".some)),
LessThanOrEqual(GetStructField(extentExpr, 3, "ymax".some), GetStructField(geometryExpr, 3, "ymax".some))
)
)
} else {
throw new UnsupportedOperationException(
"Geometry Envelope values extraction is not supported by the internal Geometry representation.".stripMargin
)
}*/

throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works with Geometry and Extent Literals only."
)
}

Filter(expr, plan)
} catch {
// fallback to the unoptimized node if optimization failed
case e: Throwable =>
logger.warn(
s"""
|${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed.
|StackTrace: ${ExceptionUtils.getStackTrace(e)}
|""".stripMargin
)
f
}
}

def registerOptimizations(sqlContext: SQLContext): Unit =
Expand Down
Loading

0 comments on commit d30e97b

Please sign in to comment.