Skip to content

Commit

Permalink
Introduce pattern object for LogicalRelation to avoid coupling with a…
Browse files Browse the repository at this point in the history
…ll params (#603)
  • Loading branch information
HeartSaVioR authored Nov 7, 2024
1 parent f8d5720 commit fc04e3b
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package io.delta.sharing.spark.perf

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.sources.BaseRelation

import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.RemoteDeltaSnapshotFileIndex
Expand All @@ -38,9 +40,9 @@ object DeltaSharingLimitPushDown extends Rule[LogicalPlan] {
p transform {
case localLimit @ LocalLimit(
literalExpr @ IntegerLiteral(limit),
l @ LogicalRelation(
l @ LogicalRelationWithTable(
r @ HadoopFsRelation(remoteIndex: RemoteDeltaSnapshotFileIndex, _, _, _, _, _),
_, _, _)
_)
) =>
if (remoteIndex.limitHint.isEmpty) {
val spark = SparkSession.active
Expand All @@ -59,3 +61,17 @@ object DeltaSharingLimitPushDown extends Rule[LogicalPlan] {
}
}
}

/**
* Extract the [[BaseRelation]] and [[CatalogTable]] from [[LogicalRelation]]. You can also
* retrieve the instance of LogicalRelation like following:
*
* case l @ LogicalRelationWithTable(relation, catalogTable) => ...
*
* NOTE: This is copied from Spark 4.0 codebase - license: Apache-2.0.
*/
object LogicalRelationWithTable {
def unapply(plan: LogicalRelation): Option[(BaseRelation, Option[CatalogTable])] = {
Some(plan.relation, plan.catalogTable)
}
}

0 comments on commit fc04e3b

Please sign in to comment.