From 35c07116fcb48a0eb6bf740fb9177ba159902c30 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 13 Feb 2024 18:10:02 -0800 Subject: [PATCH 1/3] Refactor query input Signed-off-by: Louis Chu --- .../spark/sql/flint/config/FlintSparkConf.scala | 4 ++++ .../org/apache/spark/sql/FlintREPLITSuite.scala | 2 +- .../main/scala/org/apache/spark/sql/FlintJob.scala | 12 +++++++++--- .../main/scala/org/apache/spark/sql/FlintREPL.scala | 8 +++++++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 359994c56..fbbea9176 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -150,6 +150,10 @@ object FlintSparkConf { FlintConfig(s"spark.flint.datasource.name") .doc("data source name") .createOptional() + val QUERY = + FlintConfig("spark.flint.job.query") + .doc("Flint query for batch and streaming job") + .createOptional() val JOB_TYPE = FlintConfig(s"spark.flint.job.type") .doc("Flint job type. Including interactive and streaming") diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala index 9a2afc71e..f2d4be911 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala @@ -168,7 +168,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest { Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId)) FlintREPL.enableHiveSupport = false FlintREPL.terminateJVM = false - FlintREPL.main(Array("select 1", resultIndex)) + FlintREPL.main(Array(resultIndex)) } futureResult.onComplete { case Success(result) => logInfo(s"Success result: $result") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 32747e20f..2686556b4 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -14,6 +14,7 @@ import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge @@ -37,12 +38,13 @@ import org.apache.spark.sql.types.{StructField, _} */ object FlintJob extends Logging with FlintJobExecutor { def main(args: Array[String]): Unit = { + CustomLogging.logInfo("Spark Job is Launching...") // Validate command line arguments - if (args.length != 2) { - throw new IllegalArgumentException("Usage: FlintJob ") + if (args.length != 1) { + throw new IllegalArgumentException("Usage: FlintJob ") } - val Array(query, resultIndex) = args + val Array(resultIndex) = args val conf = createSparkConf() val jobType = conf.get("spark.flint.job.type", "batch") @@ -50,6 +52,10 @@ object FlintJob extends Logging with FlintJobExecutor { conf.set(FlintSparkConf.JOB_TYPE.key, jobType) val dataSource = conf.get("spark.flint.datasource.name", "") + val query = conf.get(FlintSparkConf.QUERY.key, "") + if (query.isEmpty) { + throw new IllegalArgumentException("Query undefined for the batch job.") + } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* * To execute queries such as `CREATE SKIPPING INDEX ON my_glue1.default.http_logs_plain (`@timestamp` VALUE_SET) WITH (auto_refresh = true)`, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index d30669cca..d21770cc4 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -21,6 +21,7 @@ import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} @@ -68,7 +69,8 @@ object FlintREPL extends Logging with FlintJobExecutor { private val statementRunningCount = new AtomicInteger(0) def main(args: Array[String]) { - val Array(query, resultIndex) = args + CustomLogging.logInfo("Spark Job is Launching...") + val Array(resultIndex) = args if (Strings.isNullOrEmpty(resultIndex)) { throw new IllegalArgumentException("resultIndex is not set") } @@ -91,6 +93,10 @@ object FlintREPL extends Logging with FlintJobExecutor { conf.set(FlintSparkConf.JOB_TYPE.key, jobType) if (jobType.equalsIgnoreCase("streaming")) { + val query = conf.get(FlintSparkConf.QUERY.key, "") + if (query.isEmpty) { + throw new IllegalArgumentException("Query undefined for the streaming job.") + } logInfo(s"""streaming query ${query}""") val streamingRunningCount = new AtomicInteger(0) val jobOperator = From 82a28218b658be72edf3fece73852e120e310de9 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 12 Mar 2024 20:18:26 -0700 Subject: [PATCH 2/3] Support backword compatiblity Signed-off-by: Louis Chu --- .../scala/org/apache/spark/sql/FlintJob.scala | 22 ++++++++------ .../org/apache/spark/sql/FlintREPL.scala | 30 ++++++++++++++----- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 2686556b4..8b4bdeeaf 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -14,7 +14,6 @@ import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} -import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge @@ -38,23 +37,28 @@ import org.apache.spark.sql.types.{StructField, _} */ object FlintJob extends Logging with FlintJobExecutor { def main(args: Array[String]): Unit = { - CustomLogging.logInfo("Spark Job is Launching...") - // Validate command line arguments - if (args.length != 1) { - throw new IllegalArgumentException("Usage: FlintJob ") + val (queryOption, resultIndex) = args.length match { + case 1 => + (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument + case 2 => + ( + Some(args(0)), + args(1) + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => + throw new IllegalArgumentException( + "Unsupported number of arguments. Expected 1 or 2 arguments.") } - val Array(resultIndex) = args - val conf = createSparkConf() val jobType = conf.get("spark.flint.job.type", "batch") logInfo(s"""Job type is: ${jobType}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) val dataSource = conf.get("spark.flint.datasource.name", "") - val query = conf.get(FlintSparkConf.QUERY.key, "") + val query = queryOption.getOrElse(conf.get(FlintSparkConf.QUERY.key, "")) if (query.isEmpty) { - throw new IllegalArgumentException("Query undefined for the batch job.") + throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.") } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index d21770cc4..256910e94 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -21,7 +21,6 @@ import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} @@ -69,8 +68,19 @@ object FlintREPL extends Logging with FlintJobExecutor { private val statementRunningCount = new AtomicInteger(0) def main(args: Array[String]) { - CustomLogging.logInfo("Spark Job is Launching...") - val Array(resultIndex) = args + val (queryOption, resultIndex) = args.length match { + case 1 => + (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument + case 2 => + ( + Some(args(0)), + args(1) + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => + throw new IllegalArgumentException( + "Unsupported number of arguments. Expected 1 or 2 arguments.") + } + if (Strings.isNullOrEmpty(resultIndex)) { throw new IllegalArgumentException("resultIndex is not set") } @@ -92,11 +102,17 @@ object FlintREPL extends Logging with FlintJobExecutor { logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) + val query = queryOption.getOrElse { + if (jobType.equalsIgnoreCase("streaming")) { + val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") + if (defaultQuery.isEmpty) { + throw new IllegalArgumentException("Query undefined for the streaming job.") + } + defaultQuery + } else "" + } + if (jobType.equalsIgnoreCase("streaming")) { - val query = conf.get(FlintSparkConf.QUERY.key, "") - if (query.isEmpty) { - throw new IllegalArgumentException("Query undefined for the streaming job.") - } logInfo(s"""streaming query ${query}""") val streamingRunningCount = new AtomicInteger(0) val jobOperator = From 86aa1f3b718113b301658513d6f2287f96c447be Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 13 Mar 2024 11:26:44 -0700 Subject: [PATCH 3/3] Add more UTs Signed-off-by: Louis Chu --- .../org/apache/spark/sql/FlintREPL.scala | 50 +++++++------ .../org/apache/spark/sql/FlintREPLTest.scala | 73 +++++++++++++++++++ 2 files changed, 102 insertions(+), 21 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 256910e94..78314a68b 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -68,18 +68,7 @@ object FlintREPL extends Logging with FlintJobExecutor { private val statementRunningCount = new AtomicInteger(0) def main(args: Array[String]) { - val (queryOption, resultIndex) = args.length match { - case 1 => - (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument - case 2 => - ( - Some(args(0)), - args(1) - ) // Before OS 2.13, there are two arguments, the second one is resultIndex - case _ => - throw new IllegalArgumentException( - "Unsupported number of arguments. Expected 1 or 2 arguments.") - } + val (queryOption, resultIndex) = parseArgs(args) if (Strings.isNullOrEmpty(resultIndex)) { throw new IllegalArgumentException("resultIndex is not set") @@ -102,15 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor { logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) - val query = queryOption.getOrElse { - if (jobType.equalsIgnoreCase("streaming")) { - val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") - if (defaultQuery.isEmpty) { - throw new IllegalArgumentException("Query undefined for the streaming job.") - } - defaultQuery - } else "" - } + val query = getQuery(queryOption, jobType, conf) if (jobType.equalsIgnoreCase("streaming")) { logInfo(s"""streaming query ${query}""") @@ -250,6 +231,33 @@ object FlintREPL extends Logging with FlintJobExecutor { } } + def parseArgs(args: Array[String]): (Option[String], String) = { + args.length match { + case 1 => + (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument + case 2 => + ( + Some(args(0)), + args(1) + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => + throw new IllegalArgumentException( + "Unsupported number of arguments. Expected 1 or 2 arguments.") + } + } + + def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = { + queryOption.getOrElse { + if (jobType.equalsIgnoreCase("streaming")) { + val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") + if (defaultQuery.isEmpty) { + throw new IllegalArgumentException("Query undefined for the streaming job.") + } + defaultQuery + } else "" + } + } + /** * Sets up a Flint job with exclusion checks based on the job configuration. * diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index abae546b6..421457c4e 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -30,6 +30,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.{LongType, NullType, StringType, StructField, StructType} import org.apache.spark.sql.util.{DefaultThreadPoolFactory, MockThreadPoolFactory, MockTimeProvider, RealTimeProvider, ShutdownHookManagerTrait} import org.apache.spark.util.ThreadUtils @@ -42,6 +43,78 @@ class FlintREPLTest // By using a type alias and casting, I can bypass the type checking error. type AnyScheduledFuture = ScheduledFuture[_] + test( + "parseArgs with one argument should return None for query and the argument as resultIndex") { + val args = Array("resultIndexName") + val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + queryOption shouldBe None + resultIndex shouldBe "resultIndexName" + } + + test( + "parseArgs with two arguments should return the first argument as query and the second as resultIndex") { + val args = Array("SELECT * FROM table", "resultIndexName") + val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + queryOption shouldBe Some("SELECT * FROM table") + resultIndex shouldBe "resultIndexName" + } + + test( + "parseArgs with no arguments should throw IllegalArgumentException with specific message") { + val args = Array.empty[String] + val exception = intercept[IllegalArgumentException] { + FlintREPL.parseArgs(args) + } + exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + } + + test( + "parseArgs with more than two arguments should throw IllegalArgumentException with specific message") { + val args = Array("arg1", "arg2", "arg3") + val exception = intercept[IllegalArgumentException] { + FlintREPL.parseArgs(args) + } + exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + } + + test("getQuery should return query from queryOption if present") { + val queryOption = Some("SELECT * FROM table") + val jobType = "streaming" + val conf = new SparkConf() + + val query = FlintREPL.getQuery(queryOption, jobType, conf) + query shouldBe "SELECT * FROM table" + } + + test("getQuery should return default query for streaming job if queryOption is None") { + val queryOption = None + val jobType = "streaming" + val conf = new SparkConf().set(FlintSparkConf.QUERY.key, "SELECT * FROM table") + + val query = FlintREPL.getQuery(queryOption, jobType, conf) + query shouldBe "SELECT * FROM table" + } + + test( + "getQuery should throw IllegalArgumentException if queryOption is None and default query is not defined for streaming job") { + val queryOption = None + val jobType = "streaming" + val conf = new SparkConf() // Default query not set + + intercept[IllegalArgumentException] { + FlintREPL.getQuery(queryOption, jobType, conf) + }.getMessage shouldBe "Query undefined for the streaming job." + } + + test("getQuery should return empty string for non-streaming job if queryOption is None") { + val queryOption = None + val jobType = "interactive" + val conf = new SparkConf() // Default query not needed + + val query = FlintREPL.getQuery(queryOption, jobType, conf) + query shouldBe "" + } + test("createHeartBeatUpdater should update heartbeat correctly") { // Mocks val flintSessionUpdater = mock[OpenSearchUpdater]