Skip to content

Commit

Permalink
Support backword compatiblity
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 13, 2024
1 parent 35c0711 commit 82a2821
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge
Expand All @@ -38,23 +37,28 @@ import org.apache.spark.sql.types.{StructField, _}
*/
object FlintJob extends Logging with FlintJobExecutor {
def main(args: Array[String]): Unit = {
CustomLogging.logInfo("Spark Job is Launching...")
// Validate command line arguments
if (args.length != 1) {
throw new IllegalArgumentException("Usage: FlintJob <resultIndex>")
val (queryOption, resultIndex) = args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}

val Array(resultIndex) = args

val conf = createSparkConf()
val jobType = conf.get("spark.flint.job.type", "batch")
logInfo(s"""Job type is: ${jobType}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val dataSource = conf.get("spark.flint.datasource.name", "")
val query = conf.get(FlintSparkConf.QUERY.key, "")
val query = queryOption.getOrElse(conf.get(FlintSparkConf.QUERY.key, ""))
if (query.isEmpty) {
throw new IllegalArgumentException("Query undefined for the batch job.")
throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.")
}
// https://github.com/opensearch-project/opensearch-spark/issues/138
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}
Expand Down Expand Up @@ -69,8 +68,19 @@ object FlintREPL extends Logging with FlintJobExecutor {
private val statementRunningCount = new AtomicInteger(0)

def main(args: Array[String]) {
CustomLogging.logInfo("Spark Job is Launching...")
val Array(resultIndex) = args
val (queryOption, resultIndex) = args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}

if (Strings.isNullOrEmpty(resultIndex)) {
throw new IllegalArgumentException("resultIndex is not set")
}
Expand All @@ -92,11 +102,17 @@ object FlintREPL extends Logging with FlintJobExecutor {
logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val query = queryOption.getOrElse {
if (jobType.equalsIgnoreCase("streaming")) {
val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "")
if (defaultQuery.isEmpty) {
throw new IllegalArgumentException("Query undefined for the streaming job.")
}
defaultQuery
} else ""
}

if (jobType.equalsIgnoreCase("streaming")) {
val query = conf.get(FlintSparkConf.QUERY.key, "")
if (query.isEmpty) {
throw new IllegalArgumentException("Query undefined for the streaming job.")
}
logInfo(s"""streaming query ${query}""")
val streamingRunningCount = new AtomicInteger(0)
val jobOperator =
Expand Down

0 comments on commit 82a2821

Please sign in to comment.