Skip to content

Commit

Permalink
replace the static spark extension with input args
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 4, 2023
1 parent 3b63b12 commit a046696
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ import org.apache.spark.util.ThreadUtils
* Spark SQL Application entrypoint
*
* @param args
* (0) sql query
* (0) spark extensions (Flint, PPL)
* (1) sql query
* @param args
* (1) opensearch index name
* (2) opensearch index name
* @return
* write sql query result to given opensearch index
*/
object FlintJob extends Logging {
def main(args: Array[String]): Unit = {
// Validate command line arguments
if (args.length != 2) {
throw new IllegalArgumentException("Usage: FlintJob <query> <resultIndex>")
if (args.length != 3) {
throw new IllegalArgumentException("Usage: FlintJob <spark extensions> <query> <resultIndex>")
}

val Array(query, resultIndex) = args
val Array(sparkExtensions, query, resultIndex) = args

val conf = createSparkConf()
val conf = createSparkConf(sparkExtensions)
val wait = conf.get("spark.flint.job.type", "continue")
val dataSource = conf.get("spark.flint.datasource.name", "")
val spark = createSparkSession(conf)
Expand Down Expand Up @@ -82,10 +83,10 @@ object FlintJob extends Logging {
}
}

def createSparkConf(): SparkConf = {
def createSparkConf(sparkExtensions: String): SparkConf = {
new SparkConf()
.setAppName("FlintJob")
.set("spark.sql.extensions", "org.opensearch.flint.spark.FlintSparkExtensions")
.set("spark.sql.extensions", sparkExtensions)
}

def createSparkSession(conf: SparkConf): SparkSession = {
Expand Down

0 comments on commit a046696

Please sign in to comment.