Skip to content

Commit

Permalink
Use reflection to address the new parameter in LogicalRelation in Spa…
Browse files Browse the repository at this point in the history
…rk 4.0 (#604)
  • Loading branch information
HeartSaVioR authored Nov 11, 2024
1 parent fc04e3b commit 4def0f6
Showing 1 changed file with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package io.delta.sharing.spark.perf

import scala.reflect.runtime.universe.termNames
import scala.reflect.runtime.universe.typeOf
import scala.reflect.runtime.universe.typeTag

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -47,10 +52,10 @@ object DeltaSharingLimitPushDown extends Rule[LogicalPlan] {
if (remoteIndex.limitHint.isEmpty) {
val spark = SparkSession.active
LocalLimit(literalExpr,
l.copy(
relation = r.copy(
location = remoteIndex.copy(limitHint = Some(limit)))(spark)
)
LogicalRelationShim.copyWithNewRelation(
l,
r.copy(
location = remoteIndex.copy(limitHint = Some(limit)))(spark))
)
} else {
localLimit
Expand All @@ -75,3 +80,34 @@ object LogicalRelationWithTable {
Some(plan.relation, plan.catalogTable)
}
}

/**
* This class helps the codebase to address the differences among multiple Spark versions.
*/
object LogicalRelationShim {
/**
* This method provides the ability of copying LogicalRelation instance across Spark versions,
* when the caller only wants to replace the relation in the LogicalRelation.
*/
def copyWithNewRelation(src: LogicalRelation, newRelation: BaseRelation): LogicalRelation = {
// We assume Spark would not change the order of the existing parameter, but it's even safe
// as long as the first parameter is reserved to the `relation`.
val paramsForPrimaryConstructor = src.productIterator.toArray
paramsForPrimaryConstructor(0) = newRelation

val constructor = typeOf[LogicalRelation]
.decl(termNames.CONSTRUCTOR)
// Getting all the constructors
.alternatives
.map(_.asMethod)
// Picking the primary constructor
.find(_.isPrimaryConstructor)
// A class must always have a primary constructor, so this is safe
.get
val constructorMirror = typeTag[LogicalRelation].mirror
.reflectClass(typeOf[LogicalRelation].typeSymbol.asClass)
.reflectConstructor(constructor)

constructorMirror.apply(paramsForPrimaryConstructor: _*).asInstanceOf[LogicalRelation]
}
}

0 comments on commit 4def0f6

Please sign in to comment.