Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Oct 4, 2023
1 parent 64cbfd9 commit 5c7bcf9
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 75 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"com.github.sbt" % "junit-interface" % "0.13.3" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"),
Expand Down
4 changes: 3 additions & 1 deletion spark-sql-application/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ For FlintJob, OpenSearch index document will look like
],
"jobRunId" : "s-JZSB1139WIVU",
"applicationId" : "application_1687726870985_0003",
"dataSourceName": "myS3Glue"
"dataSourceName": "myS3Glue",
"status": "SUCCESS",
"error": ""
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.duration.{Duration, MINUTES}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.util.ThreadUtils

/**
Expand All @@ -28,26 +28,27 @@ import org.apache.spark.util.ThreadUtils
* (0) sql query
* @param args
* (1) opensearch index name
* @param args
* (2) opensearch data source name
* @return
* write sql query result to given opensearch index
*/
object FlintJob extends Logging {
def main(args: Array[String]): Unit = {
// Validate command line arguments
if (args.length != 3) {
throw new IllegalArgumentException("Usage: FlintJob <query> <resultIndex> <dataSource>")
if (args.length != 2) {
throw new IllegalArgumentException("Usage: FlintJob <query> <resultIndex>")
}

val Array(query, resultIndex, dataSource) = args
val Array(query, resultIndex) = args

val conf = createSparkConf()
val wait = conf.get("spark.flint.job.type", "continue")
val dataSource = conf.get("spark.flint.datasource.name", "")
val spark = createSparkSession(conf)

val threadPool = ThreadUtils.newDaemonFixedThreadPool(1, "check-create-index")
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

var dataToWrite : Option[DataFrame] = None
try {
// flintClient needs spark session to be created first. Otherwise, we will have connection
// exception from EMR-S to OS.
Expand All @@ -57,19 +58,26 @@ object FlintJob extends Logging {
}
val data = executeQuery(spark, query, dataSource)

val correctMapping = ThreadUtils.awaitResult(futureMappingCheck, Duration(10, MINUTES))
writeData(spark, data, resultIndex, correctMapping, dataSource)

val (correctMapping, error) = ThreadUtils.awaitResult(futureMappingCheck, Duration(1, MINUTES))
dataToWrite = Some(if (correctMapping) data else getFailedData(spark, dataSource, error))
} catch {
case e: TimeoutException =>
logError("Future operations timed out", e)
throw e
val error = "Future operations timed out"
logError(error, e)
dataToWrite = Some(getFailedData(spark, dataSource, error))
case e: Exception =>
logError("Fail to verify existing mapping or write result", e)
throw e
val error = "Fail to verify existing mapping or write result"
logError(error, e)
dataToWrite = Some(getFailedData(spark, dataSource, error))
} finally {
// Stop SparkSession
spark.stop()
dataToWrite.foreach(df => writeData(df, resultIndex))
// Stop SparkSession if it is not streaming job
if (wait.equalsIgnoreCase("streaming")) {
spark.streams.awaitAnyTermination()
} else {
spark.stop()
}

threadPool.shutdown()
}
}
Expand All @@ -84,9 +92,7 @@ object FlintJob extends Logging {
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}

def writeData(spark: SparkSession, data: DataFrame, resultIndex: String, correctMapping: Boolean,
dataSource: String): Unit = {
val resultData = if (correctMapping) data else getFailedData(spark, dataSource)
def writeData(resultData: DataFrame, resultIndex: String): Unit = {
resultData.write
.format("flint")
.mode("append")
Expand Down Expand Up @@ -138,7 +144,8 @@ object FlintJob extends Logging {
StructField("jobRunId", StringType, nullable = true),
StructField("applicationId", StringType, nullable = true),
StructField("dataSourceName", StringType, nullable = true),
StructField("status", StringType, nullable = true)
StructField("status", StringType, nullable = true),
StructField("error", StringType, nullable = true)
)
)

Expand All @@ -151,15 +158,16 @@ object FlintJob extends Logging {
sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown"),
sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"),
dataSource,
"SUCCESS"
"SUCCESS",
""
)
)

// Create the DataFrame for data
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*)
}

def getFailedData(spark: SparkSession, dataSource: String): DataFrame = {
def getFailedData(spark: SparkSession, dataSource: String, error: String): DataFrame = {

// Define the data schema
val schema = StructType(
Expand All @@ -177,7 +185,8 @@ object FlintJob extends Logging {
StructField("jobRunId", StringType, nullable = true),
StructField("applicationId", StringType, nullable = true),
StructField("dataSourceName", StringType, nullable = true),
StructField("status", StringType, nullable = true)
StructField("status", StringType, nullable = true),
StructField("error", StringType, nullable = true)
)
)

Expand All @@ -189,7 +198,8 @@ object FlintJob extends Logging {
sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown"),
sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"),
dataSource,
"FAILED"
"FAILED",
error
)
)

Expand Down Expand Up @@ -258,39 +268,26 @@ object FlintJob extends Logging {

val inputJson = Json.parse(input)
val mappingJson = Json.parse(mapping)
logInfo(s"inputJson $inputJson")
logInfo(s"mappingJson $mappingJson")

compareJson(inputJson, mappingJson)
}

def checkAndCreateIndex(
flintClient: FlintClient,
resultIndex: String
): Boolean = {
var correctMapping = false

flintClient: FlintClient,
resultIndex: String
): (Boolean, String) = {
// The enabled setting, which can be applied only to the top-level mapping definition and to object fields,
val mapping =
"""{
"dynamic": false,
"properties": {
"result": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
"type": "object",
"enabled": false
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
"type": "object",
"enabled": false
},
"jobRunId": {
"type": "keyword"
Expand All @@ -303,39 +300,47 @@ object FlintJob extends Logging {
},
"status": {
"type": "keyword"
},
"error": {
"type": "text"
}
}
}""".stripMargin

try {
val existingSchema = flintClient.getIndexMetadata(resultIndex).getContent
if (!isSuperset(existingSchema, mapping)) {
logError(s"The mapping of $resultIndex is incorrect.")
(false, s"The mapping of $resultIndex is incorrect.")
} else {
correctMapping = true
(true, "")
}
} catch {
case e: IllegalStateException =>
logInfo("get mapping exception", e)
val cause = ExceptionsHelper.unwrapCause(e.getCause())
logInfo("cause", cause)
logInfo("cause2", cause.getCause())
if (cause.getMessage().contains("index_not_found_exception")) {
try {
logInfo(s"create $resultIndex")
flintClient.createIndex(resultIndex, new FlintMetadata(mapping))
logInfo(s"create $resultIndex successfully")
correctMapping = true
} catch {
case _: Exception =>
logError(s"Fail to create result index $resultIndex")
}
}
case e: Exception => logError("Fail to verify existing mapping", e);
case e: IllegalStateException if e.getCause().getMessage().contains("index_not_found_exception") =>
handleIndexNotFoundException(flintClient, resultIndex, mapping)
case e: Exception =>
val error = "Failed to verify existing mapping"
logError(error, e)
(false, error)
}
correctMapping
}

def handleIndexNotFoundException(
flintClient: FlintClient,
resultIndex: String,
mapping: String
): (Boolean, String) = {
try {
logInfo(s"create $resultIndex")
flintClient.createIndex(resultIndex, new FlintMetadata(mapping))
logInfo(s"create $resultIndex successfully")
(true, "")
} catch {
case e: Exception =>
val error = s"Failed to create result index $resultIndex"
logError(error, e)
(false, error)
}
}
def executeQuery(
spark: SparkSession,
query: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class FlintJobTest extends SparkFunSuite with Matchers {
StructField("jobRunId", StringType, nullable = true),
StructField("applicationId", StringType, nullable = true),
StructField("dataSourceName", StringType, nullable = true),
StructField("status", StringType, nullable = true)
StructField("status", StringType, nullable = true),
StructField("error", StringType, nullable = true)
))
val expectedRows = Seq(
Row(
Expand All @@ -48,7 +49,8 @@ class FlintJobTest extends SparkFunSuite with Matchers {
"unknown",
"unknown",
dataSourceName,
"SUCCESS"
"SUCCESS",
""
))
val expected: DataFrame =
spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema)
Expand All @@ -66,16 +68,16 @@ class FlintJobTest extends SparkFunSuite with Matchers {
test("test isSuperset") {
// note in input false has enclosed double quotes, while mapping just has false
val input =
"""{"dynamic":"false","properties":{"result":{"type":"text","fields":{"keyword":{
|"ignore_above":256,"type":"keyword"}}},"schema":{"type":"text","fields":{"keyword":{
|"ignore_above":256,"type":"keyword"}}},"applicationId":{"type":"keyword"},"jobRunId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"applicationId":{"type":"keyword"},"jobRunId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
|"error":{"type":"text"}}}
|""".stripMargin
val mapping =
"""{"dynamic":false,"properties":{"result":{"type":"text","fields":{"keyword":{
|"type":"keyword","ignore_above":256}}},"schema":{"type":"text","fields":{"keyword":{
|"type":"keyword","ignore_above":256}}},"jobRunId":{"type":"keyword"},"applicationId":{
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"jobRunId":{"type":"keyword"},"applicationId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
|"error":{"type":"text"}}}
|""".stripMargin
assert(FlintJob.isSuperset(input, mapping))
}
Expand Down

0 comments on commit 5c7bcf9

Please sign in to comment.