From ca37c4dd83520e4e1160298827625ee1bf8415b2 Mon Sep 17 00:00:00 2001 From: xavierzyxue Date: Thu, 1 Aug 2024 21:55:29 +0800 Subject: [PATCH 1/5] change to 9 parameters constructor --- .../plugin/embedded/BigQueryPlugin.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala index 53a31261..f544ce48 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala @@ -66,13 +66,15 @@ class BigQueryPlugin(spark: SparkSession) def bigQueryConfig = { SparkBigQueryConfig.from( - ImmutableMap.copyOf(cmd.options.asJava), + cmd.options.asJava, ImmutableMap.copyOf(spark.conf.getAll.asJava), spark.sparkContext.hadoopConfiguration, + ImmutableMap.of, 0, spark.sessionState.conf, spark.version, - Optional.empty() + Optional.empty(), + true ) } @@ -145,9 +147,9 @@ object BigQueryPlugin { .find( m => m.getName == "from" && isStatic(m.getModifiers) - && m.getParameterTypes.length == 7 + && m.getParameterTypes.length == 9 && m.getReturnType.getSimpleName == "SparkBigQueryConfig" - ).getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {7 args} ...)` in the class `$clazz`")) + ).getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {9 args} ...)` in the class `$clazz`")) object ImmutableMap { type ImmutableMap = AnyRef @@ -157,11 +159,16 @@ object BigQueryPlugin { .getMethod("copyOf", classOf[java.util.Map[_, _]]) .invoke(imClass, _) .asInstanceOf[ImmutableMap] + val of: ImmutableMap = + imClass + .getMethod("of") + .invoke(imClass) + .asInstanceOf[ImmutableMap] } - val from: (ImmutableMap.ImmutableMap, ImmutableMap.ImmutableMap, Configuration, Integer, SQLConf, String, Optional[StructType]) => SparkBigQueryConfig = + val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = methodFrom - .invoke(clazz, _, _, _, _, _, _, _) + .invoke(clazz, _, _, _, _, _, _, _, _, _) .asInstanceOf[SparkBigQueryConfig] } From 198414420e69cc58e35ba58edceb715a43582257 Mon Sep 17 00:00:00 2001 From: xavierzyxue Date: Mon, 5 Aug 2024 14:56:27 +0800 Subject: [PATCH 2/5] add 7|8|9 parameters separately to be compatible with different version --- .../plugin/embedded/BigQueryPlugin.scala | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala index f544ce48..90635585 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala @@ -16,7 +16,12 @@ package za.co.absa.spline.harvester.plugin.embedded +import java.lang.reflect.Method +import java.lang.reflect.Modifier.isStatic +import java.util.Optional + import io.github.classgraph.ClassGraph +import javax.annotation.Priority import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{LogicalRelation, SaveIntoDataSourceCommand} @@ -31,10 +36,6 @@ import za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin.SparkBigQueryC import za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin._ import za.co.absa.spline.harvester.plugin.{BaseRelationProcessing, Plugin, RelationProviderProcessing} -import java.lang.reflect.Method -import java.lang.reflect.Modifier.isStatic -import java.util.Optional -import javax.annotation.Priority import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -142,14 +143,39 @@ object BigQueryPlugin { def getParentProjectId: String } private val clazz = findPossiblyShadedClass("com.google.cloud", "com.google.cloud.spark.bigquery.SparkBigQueryConfig") - private val methodFrom: Method = clazz + val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { + case (a, b, c, d, e, f, g, h, i) => + if (methodFrom7.isDefined) + methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] + else if (methodFrom8.isDefined) + methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + else + methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + } + private val methodFrom7: Option[Method] = clazz + .getMethods + .find( + m => m.getName == "from" + && isStatic(m.getModifiers) + && m.getParameterTypes.length == 7 + && m.getReturnType.getSimpleName == "SparkBigQueryConfig" + ) + private val methodFrom8: Option[Method] = clazz + .getMethods + .find( + m => m.getName == "from" + && isStatic(m.getModifiers) + && m.getParameterTypes.length == 8 + && m.getReturnType.getSimpleName == "SparkBigQueryConfig" + ) + private val methodFrom9: Option[Method] = clazz .getMethods .find( m => m.getName == "from" && isStatic(m.getModifiers) && m.getParameterTypes.length == 9 && m.getReturnType.getSimpleName == "SparkBigQueryConfig" - ).getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {9 args} ...)` in the class `$clazz`")) + ) object ImmutableMap { type ImmutableMap = AnyRef @@ -166,10 +192,10 @@ object BigQueryPlugin { .asInstanceOf[ImmutableMap] } - val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = - methodFrom - .invoke(clazz, _, _, _, _, _, _, _, _, _) - .asInstanceOf[SparkBigQueryConfig] + private val methodFrom: Method = methodFrom7 + .orElse(methodFrom8) + .orElse(methodFrom9) + .getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {7|8|9 args} ...)` in the class `$clazz`")) } private object `_: DirectBigQueryRelation` extends SafeTypeMatchingExtractor[AnyRef]("com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation") From b5278decd432005ba0e259aa294dad07f38eda59 Mon Sep 17 00:00:00 2001 From: xavierzyxue Date: Mon, 5 Aug 2024 15:04:31 +0800 Subject: [PATCH 3/5] update --- .../plugin/embedded/BigQueryPlugin.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala index 90635585..502b1e81 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala @@ -143,15 +143,6 @@ object BigQueryPlugin { def getParentProjectId: String } private val clazz = findPossiblyShadedClass("com.google.cloud", "com.google.cloud.spark.bigquery.SparkBigQueryConfig") - val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { - case (a, b, c, d, e, f, g, h, i) => - if (methodFrom7.isDefined) - methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] - else if (methodFrom8.isDefined) - methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] - else - methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] - } private val methodFrom7: Option[Method] = clazz .getMethods .find( @@ -176,6 +167,15 @@ object BigQueryPlugin { && m.getParameterTypes.length == 9 && m.getReturnType.getSimpleName == "SparkBigQueryConfig" ) + val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { + case (a, b, c, d, e, f, g, h, i) => + if (methodFrom7.isDefined) + methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] + else if (methodFrom8.isDefined) + methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + else + methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + } object ImmutableMap { type ImmutableMap = AnyRef @@ -191,7 +191,6 @@ object BigQueryPlugin { .invoke(imClass) .asInstanceOf[ImmutableMap] } - private val methodFrom: Method = methodFrom7 .orElse(methodFrom8) .orElse(methodFrom9) From 36c4f7b2fa0ff66015b7b32f3aad63387bc3e88f Mon Sep 17 00:00:00 2001 From: xavierzyxue Date: Mon, 5 Aug 2024 15:05:13 +0800 Subject: [PATCH 4/5] change to 9 parameters constructor --- .../plugin/embedded/BigQueryPlugin.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala index 502b1e81..59d16322 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala @@ -167,15 +167,10 @@ object BigQueryPlugin { && m.getParameterTypes.length == 9 && m.getReturnType.getSimpleName == "SparkBigQueryConfig" ) - val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { - case (a, b, c, d, e, f, g, h, i) => - if (methodFrom7.isDefined) - methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] - else if (methodFrom8.isDefined) - methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] - else - methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] - } + private val methodFrom: Method = methodFrom7 + .orElse(methodFrom8) + .orElse(methodFrom9) + .getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {7|8|9 args} ...)` in the class `$clazz`")) object ImmutableMap { type ImmutableMap = AnyRef @@ -191,10 +186,16 @@ object BigQueryPlugin { .invoke(imClass) .asInstanceOf[ImmutableMap] } - private val methodFrom: Method = methodFrom7 - .orElse(methodFrom8) - .orElse(methodFrom9) - .getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {7|8|9 args} ...)` in the class `$clazz`")) + + val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { + case (a, b, c, d, e, f, g, h, i) => + if (methodFrom7.isDefined) + methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] + else if (methodFrom8.isDefined) + methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + else + methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + } } private object `_: DirectBigQueryRelation` extends SafeTypeMatchingExtractor[AnyRef]("com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation") From 4925c68d036db36ef6d8ccf7d053ff8ee23cbc96 Mon Sep 17 00:00:00 2001 From: xavierzyxue Date: Tue, 13 Aug 2024 09:31:46 +0800 Subject: [PATCH 5/5] change to match parameter type length --- .../plugin/embedded/BigQueryPlugin.scala | 46 ++++++------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala index 59d16322..9b566053 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/BigQueryPlugin.scala @@ -16,12 +16,7 @@ package za.co.absa.spline.harvester.plugin.embedded -import java.lang.reflect.Method -import java.lang.reflect.Modifier.isStatic -import java.util.Optional - import io.github.classgraph.ClassGraph -import javax.annotation.Priority import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{LogicalRelation, SaveIntoDataSourceCommand} @@ -36,6 +31,10 @@ import za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin.SparkBigQueryC import za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin._ import za.co.absa.spline.harvester.plugin.{BaseRelationProcessing, Plugin, RelationProviderProcessing} +import java.lang.reflect.Method +import java.lang.reflect.Modifier.isStatic +import java.util.Optional +import javax.annotation.Priority import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -143,33 +142,14 @@ object BigQueryPlugin { def getParentProjectId: String } private val clazz = findPossiblyShadedClass("com.google.cloud", "com.google.cloud.spark.bigquery.SparkBigQueryConfig") - private val methodFrom7: Option[Method] = clazz - .getMethods - .find( - m => m.getName == "from" - && isStatic(m.getModifiers) - && m.getParameterTypes.length == 7 - && m.getReturnType.getSimpleName == "SparkBigQueryConfig" - ) - private val methodFrom8: Option[Method] = clazz + private val methodFrom: Method = clazz .getMethods .find( m => m.getName == "from" && isStatic(m.getModifiers) - && m.getParameterTypes.length == 8 + && (m.getParameterTypes.length == 7 || m.getParameterTypes.length == 8 || m.getParameterTypes.length == 9) && m.getReturnType.getSimpleName == "SparkBigQueryConfig" ) - private val methodFrom9: Option[Method] = clazz - .getMethods - .find( - m => m.getName == "from" - && isStatic(m.getModifiers) - && m.getParameterTypes.length == 9 - && m.getReturnType.getSimpleName == "SparkBigQueryConfig" - ) - private val methodFrom: Method = methodFrom7 - .orElse(methodFrom8) - .orElse(methodFrom9) .getOrElse(sys.error(s"Cannot find method `public static SparkBigQueryConfig from(... {7|8|9 args} ...)` in the class `$clazz`")) object ImmutableMap { @@ -188,13 +168,13 @@ object BigQueryPlugin { } val from: (java.util.Map[_, _], ImmutableMap.ImmutableMap, Configuration, ImmutableMap.ImmutableMap, Integer, SQLConf, String, Optional[StructType], java.lang.Boolean) => SparkBigQueryConfig = { - case (a, b, c, d, e, f, g, h, i) => - if (methodFrom7.isDefined) - methodFrom7.get.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] - else if (methodFrom8.isDefined) - methodFrom8.get.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] - else - methodFrom9.get.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + case (a, b, c, d, e, f, g, h, i) => { + methodFrom.getParameterTypes.length match { + case 7 => methodFrom.invoke(clazz, a, b, c, e, f, g, h).asInstanceOf[SparkBigQueryConfig] + case 8 => methodFrom.invoke(clazz, a, b, c, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + case 9 => methodFrom.invoke(clazz, a, b, c, d, e, f, g, h, i).asInstanceOf[SparkBigQueryConfig] + } + } } }