Skip to content

Commit

Permalink
Refactor query input
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Feb 14, 2024
1 parent 9c15194 commit fb530e5
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val QUERY =
FlintConfig("spark.flint.job.query")
.doc("Flint query for batch and streaming job")
.createOptional()
val JOB_TYPE =
FlintConfig(s"spark.flint.job.type")
.doc("Flint job type. Including interactive and streaming")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId))
FlintREPL.enableHiveSupport = false
FlintREPL.terminateJVM = false
FlintREPL.main(Array("select 1", resultIndex))
FlintREPL.main(Array(resultIndex))
}
futureResult.onComplete {
case Success(result) => logInfo(s"Success result: $result")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import play.api.libs.json._
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types.{StructField, _}

/**
Expand All @@ -34,15 +35,16 @@ import org.apache.spark.sql.types.{StructField, _}
object FlintJob extends Logging with FlintJobExecutor {
def main(args: Array[String]): Unit = {
// Validate command line arguments
if (args.length != 2) {
throw new IllegalArgumentException("Usage: FlintJob <query> <resultIndex>")
if (args.length != 1) {
throw new IllegalArgumentException("Usage: FlintJob <resultIndex>")
}

val Array(query, resultIndex) = args
val Array(resultIndex) = args

val conf = createSparkConf()
val wait = conf.get("spark.flint.job.type", "continue")
val dataSource = conf.get("spark.flint.datasource.name", "")
val query = conf.get(FlintSparkConf.QUERY.key, "")
// https://github.com/opensearch-project/opensearch-spark/issues/138
/*
* To execute queries such as `CREATE SKIPPING INDEX ON my_glue1.default.http_logs_plain (`@timestamp` VALUE_SET) WITH (auto_refresh = true)`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ object FlintREPL extends Logging with FlintJobExecutor {
}

def main(args: Array[String]) {
val Array(query, resultIndex) = args
val Array(resultIndex) = args
if (Strings.isNullOrEmpty(resultIndex)) {
throw new IllegalArgumentException("resultIndex is not set")
}

// init SparkContext
val conf: SparkConf = createSparkConf()
val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "unknown")
val query = conf.get(FlintSparkConf.QUERY.key, "")
// https://github.com/opensearch-project/opensearch-spark/issues/138
/*
* To execute queries such as `CREATE SKIPPING INDEX ON my_glue1.default.http_logs_plain (`@timestamp` VALUE_SET) WITH (auto_refresh = true)`,
Expand Down

0 comments on commit fb530e5

Please sign in to comment.