From 82a28218b658be72edf3fece73852e120e310de9 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 12 Mar 2024 20:18:26 -0700 Subject: [PATCH] Support backword compatiblity Signed-off-by: Louis Chu --- .../scala/org/apache/spark/sql/FlintJob.scala | 22 ++++++++------ .../org/apache/spark/sql/FlintREPL.scala | 30 ++++++++++++++----- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 2686556b4..8b4bdeeaf 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -14,7 +14,6 @@ import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} -import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge @@ -38,23 +37,28 @@ import org.apache.spark.sql.types.{StructField, _} */ object FlintJob extends Logging with FlintJobExecutor { def main(args: Array[String]): Unit = { - CustomLogging.logInfo("Spark Job is Launching...") - // Validate command line arguments - if (args.length != 1) { - throw new IllegalArgumentException("Usage: FlintJob ") + val (queryOption, resultIndex) = args.length match { + case 1 => + (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument + case 2 => + ( + Some(args(0)), + args(1) + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => + throw new IllegalArgumentException( + "Unsupported number of arguments. Expected 1 or 2 arguments.") } - val Array(resultIndex) = args - val conf = createSparkConf() val jobType = conf.get("spark.flint.job.type", "batch") logInfo(s"""Job type is: ${jobType}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) val dataSource = conf.get("spark.flint.datasource.name", "") - val query = conf.get(FlintSparkConf.QUERY.key, "") + val query = queryOption.getOrElse(conf.get(FlintSparkConf.QUERY.key, "")) if (query.isEmpty) { - throw new IllegalArgumentException("Query undefined for the batch job.") + throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.") } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index d21770cc4..256910e94 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -21,7 +21,6 @@ import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} @@ -69,8 +68,19 @@ object FlintREPL extends Logging with FlintJobExecutor { private val statementRunningCount = new AtomicInteger(0) def main(args: Array[String]) { - CustomLogging.logInfo("Spark Job is Launching...") - val Array(resultIndex) = args + val (queryOption, resultIndex) = args.length match { + case 1 => + (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument + case 2 => + ( + Some(args(0)), + args(1) + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => + throw new IllegalArgumentException( + "Unsupported number of arguments. Expected 1 or 2 arguments.") + } + if (Strings.isNullOrEmpty(resultIndex)) { throw new IllegalArgumentException("resultIndex is not set") } @@ -92,11 +102,17 @@ object FlintREPL extends Logging with FlintJobExecutor { logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) + val query = queryOption.getOrElse { + if (jobType.equalsIgnoreCase("streaming")) { + val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") + if (defaultQuery.isEmpty) { + throw new IllegalArgumentException("Query undefined for the streaming job.") + } + defaultQuery + } else "" + } + if (jobType.equalsIgnoreCase("streaming")) { - val query = conf.get(FlintSparkConf.QUERY.key, "") - if (query.isEmpty) { - throw new IllegalArgumentException("Query undefined for the streaming job.") - } logInfo(s"""streaming query ${query}""") val streamingRunningCount = new AtomicInteger(0) val jobOperator =