From fe330f1c16dd389531c0653a0289c495fa13b855 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 13 Feb 2024 18:10:02 -0800 Subject: [PATCH] Refactor query input Signed-off-by: Louis Chu --- .../apache/spark/sql/flint/config/FlintSparkConf.scala | 4 ++++ .../scala/org/apache/spark/sql/FlintREPLITSuite.scala | 2 +- .../src/main/scala/org/apache/spark/sql/FlintJob.scala | 8 +++++--- .../src/main/scala/org/apache/spark/sql/FlintREPL.scala | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 359994c56..fbbea9176 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -150,6 +150,10 @@ object FlintSparkConf { FlintConfig(s"spark.flint.datasource.name") .doc("data source name") .createOptional() + val QUERY = + FlintConfig("spark.flint.job.query") + .doc("Flint query for batch and streaming job") + .createOptional() val JOB_TYPE = FlintConfig(s"spark.flint.job.type") .doc("Flint job type. Including interactive and streaming") diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala index 9a2afc71e..f2d4be911 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala @@ -168,7 +168,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest { Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId)) FlintREPL.enableHiveSupport = false FlintREPL.terminateJVM = false - FlintREPL.main(Array("select 1", resultIndex)) + FlintREPL.main(Array(resultIndex)) } futureResult.onComplete { case Success(result) => logInfo(s"Success result: $result") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index df0bf5c4e..c0cfd4164 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -19,6 +19,7 @@ import play.api.libs.json._ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.{StructField, _} /** @@ -34,15 +35,16 @@ import org.apache.spark.sql.types.{StructField, _} object FlintJob extends Logging with FlintJobExecutor { def main(args: Array[String]): Unit = { // Validate command line arguments - if (args.length != 2) { - throw new IllegalArgumentException("Usage: FlintJob ") + if (args.length != 1) { + throw new IllegalArgumentException("Usage: FlintJob ") } - val Array(query, resultIndex) = args + val Array(resultIndex) = args val conf = createSparkConf() val wait = conf.get("spark.flint.job.type", "continue") val dataSource = conf.get("spark.flint.datasource.name", "") + val query = conf.get(FlintSparkConf.QUERY.key, "") // https://github.com/opensearch-project/opensearch-spark/issues/138 /* * To execute queries such as `CREATE SKIPPING INDEX ON my_glue1.default.http_logs_plain (`@timestamp` VALUE_SET) WITH (auto_refresh = true)`, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 2a63653e3..b1df5379d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -61,7 +61,7 @@ object FlintREPL extends Logging with FlintJobExecutor { } def main(args: Array[String]) { - val Array(query, resultIndex) = args + val Array(resultIndex) = args if (Strings.isNullOrEmpty(resultIndex)) { throw new IllegalArgumentException("resultIndex is not set") } @@ -69,6 +69,7 @@ object FlintREPL extends Logging with FlintJobExecutor { // init SparkContext val conf: SparkConf = createSparkConf() val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "unknown") + val query = conf.get(FlintSparkConf.QUERY.key, "") // https://github.com/opensearch-project/opensearch-spark/issues/138 /* * To execute queries such as `CREATE SKIPPING INDEX ON my_glue1.default.http_logs_plain (`@timestamp` VALUE_SET) WITH (auto_refresh = true)`,