From 5c7bcf9db6903fbeda184891e006f199d0f9d5ff Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 3 Oct 2023 23:43:16 -0700 Subject: [PATCH] address comments Signed-off-by: Kaituo Li --- build.sbt | 1 - spark-sql-application/README.md | 4 +- .../scala/org/apache/spark/sql/FlintJob.scala | 133 +++++++++--------- .../org/apache/spark/sql/FlintJobTest.scala | 20 +-- 4 files changed, 83 insertions(+), 75 deletions(-) diff --git a/build.sbt b/build.sbt index 7bb01226d..bc018c265 100644 --- a/build.sbt +++ b/build.sbt @@ -114,7 +114,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", "com.github.sbt" % "junit-interface" % "0.13.3" % "test"), libraryDependencies ++= deps(sparkVersion), - libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2", // ANTLR settings Antlr4 / antlr4Version := "4.8", Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index 533ee81cb..bd12ee933 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -129,7 +129,9 @@ For FlintJob, OpenSearch index document will look like ], "jobRunId" : "s-JZSB1139WIVU", "applicationId" : "application_1687726870985_0003", - "dataSourceName": "myS3Glue" + "dataSourceName": "myS3Glue", + "status": "SUCCESS", + "error": "" } } ``` diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 9f4a464e7..8da7d2072 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -18,7 +18,7 @@ import scala.concurrent.duration.{Duration, MINUTES} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{StructField, _} import org.apache.spark.util.ThreadUtils /** @@ -28,26 +28,27 @@ import org.apache.spark.util.ThreadUtils * (0) sql query * @param args * (1) opensearch index name - * @param args - * (2) opensearch data source name * @return * write sql query result to given opensearch index */ object FlintJob extends Logging { def main(args: Array[String]): Unit = { // Validate command line arguments - if (args.length != 3) { - throw new IllegalArgumentException("Usage: FlintJob ") + if (args.length != 2) { + throw new IllegalArgumentException("Usage: FlintJob ") } - val Array(query, resultIndex, dataSource) = args + val Array(query, resultIndex) = args val conf = createSparkConf() + val wait = conf.get("spark.flint.job.type", "continue") + val dataSource = conf.get("spark.flint.datasource.name", "") val spark = createSparkSession(conf) val threadPool = ThreadUtils.newDaemonFixedThreadPool(1, "check-create-index") implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + var dataToWrite : Option[DataFrame] = None try { // flintClient needs spark session to be created first. Otherwise, we will have connection // exception from EMR-S to OS. @@ -57,19 +58,26 @@ object FlintJob extends Logging { } val data = executeQuery(spark, query, dataSource) - val correctMapping = ThreadUtils.awaitResult(futureMappingCheck, Duration(10, MINUTES)) - writeData(spark, data, resultIndex, correctMapping, dataSource) - + val (correctMapping, error) = ThreadUtils.awaitResult(futureMappingCheck, Duration(1, MINUTES)) + dataToWrite = Some(if (correctMapping) data else getFailedData(spark, dataSource, error)) } catch { case e: TimeoutException => - logError("Future operations timed out", e) - throw e + val error = "Future operations timed out" + logError(error, e) + dataToWrite = Some(getFailedData(spark, dataSource, error)) case e: Exception => - logError("Fail to verify existing mapping or write result", e) - throw e + val error = "Fail to verify existing mapping or write result" + logError(error, e) + dataToWrite = Some(getFailedData(spark, dataSource, error)) } finally { - // Stop SparkSession - spark.stop() + dataToWrite.foreach(df => writeData(df, resultIndex)) + // Stop SparkSession if it is not streaming job + if (wait.equalsIgnoreCase("streaming")) { + spark.streams.awaitAnyTermination() + } else { + spark.stop() + } + threadPool.shutdown() } } @@ -84,9 +92,7 @@ object FlintJob extends Logging { SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } - def writeData(spark: SparkSession, data: DataFrame, resultIndex: String, correctMapping: Boolean, - dataSource: String): Unit = { - val resultData = if (correctMapping) data else getFailedData(spark, dataSource) + def writeData(resultData: DataFrame, resultIndex: String): Unit = { resultData.write .format("flint") .mode("append") @@ -138,7 +144,8 @@ object FlintJob extends Logging { StructField("jobRunId", StringType, nullable = true), StructField("applicationId", StringType, nullable = true), StructField("dataSourceName", StringType, nullable = true), - StructField("status", StringType, nullable = true) + StructField("status", StringType, nullable = true), + StructField("error", StringType, nullable = true) ) ) @@ -151,7 +158,8 @@ object FlintJob extends Logging { sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown"), sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"), dataSource, - "SUCCESS" + "SUCCESS", + "" ) ) @@ -159,7 +167,7 @@ object FlintJob extends Logging { spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) } - def getFailedData(spark: SparkSession, dataSource: String): DataFrame = { + def getFailedData(spark: SparkSession, dataSource: String, error: String): DataFrame = { // Define the data schema val schema = StructType( @@ -177,7 +185,8 @@ object FlintJob extends Logging { StructField("jobRunId", StringType, nullable = true), StructField("applicationId", StringType, nullable = true), StructField("dataSourceName", StringType, nullable = true), - StructField("status", StringType, nullable = true) + StructField("status", StringType, nullable = true), + StructField("error", StringType, nullable = true) ) ) @@ -189,7 +198,8 @@ object FlintJob extends Logging { sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown"), sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"), dataSource, - "FAILED" + "FAILED", + error ) ) @@ -258,39 +268,26 @@ object FlintJob extends Logging { val inputJson = Json.parse(input) val mappingJson = Json.parse(mapping) - logInfo(s"inputJson $inputJson") - logInfo(s"mappingJson $mappingJson") compareJson(inputJson, mappingJson) } def checkAndCreateIndex( - flintClient: FlintClient, - resultIndex: String - ): Boolean = { - var correctMapping = false - + flintClient: FlintClient, + resultIndex: String + ): (Boolean, String) = { + // The enabled setting, which can be applied only to the top-level mapping definition and to object fields, val mapping = """{ "dynamic": false, "properties": { "result": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } + "type": "object", + "enabled": false }, "schema": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } + "type": "object", + "enabled": false }, "jobRunId": { "type": "keyword" @@ -303,6 +300,9 @@ object FlintJob extends Logging { }, "status": { "type": "keyword" + }, + "error": { + "type": "text" } } }""".stripMargin @@ -310,32 +310,37 @@ object FlintJob extends Logging { try { val existingSchema = flintClient.getIndexMetadata(resultIndex).getContent if (!isSuperset(existingSchema, mapping)) { - logError(s"The mapping of $resultIndex is incorrect.") + (false, s"The mapping of $resultIndex is incorrect.") } else { - correctMapping = true + (true, "") } } catch { - case e: IllegalStateException => - logInfo("get mapping exception", e) - val cause = ExceptionsHelper.unwrapCause(e.getCause()) - logInfo("cause", cause) - logInfo("cause2", cause.getCause()) - if (cause.getMessage().contains("index_not_found_exception")) { - try { - logInfo(s"create $resultIndex") - flintClient.createIndex(resultIndex, new FlintMetadata(mapping)) - logInfo(s"create $resultIndex successfully") - correctMapping = true - } catch { - case _: Exception => - logError(s"Fail to create result index $resultIndex") - } - } - case e: Exception => logError("Fail to verify existing mapping", e); + case e: IllegalStateException if e.getCause().getMessage().contains("index_not_found_exception") => + handleIndexNotFoundException(flintClient, resultIndex, mapping) + case e: Exception => + val error = "Failed to verify existing mapping" + logError(error, e) + (false, error) } - correctMapping } + def handleIndexNotFoundException( + flintClient: FlintClient, + resultIndex: String, + mapping: String + ): (Boolean, String) = { + try { + logInfo(s"create $resultIndex") + flintClient.createIndex(resultIndex, new FlintMetadata(mapping)) + logInfo(s"create $resultIndex successfully") + (true, "") + } catch { + case e: Exception => + val error = s"Failed to create result index $resultIndex" + logError(error, e) + (false, error) + } + } def executeQuery( spark: SparkSession, query: String, diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala index 56ac45c2f..c32e63194 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala @@ -34,7 +34,8 @@ class FlintJobTest extends SparkFunSuite with Matchers { StructField("jobRunId", StringType, nullable = true), StructField("applicationId", StringType, nullable = true), StructField("dataSourceName", StringType, nullable = true), - StructField("status", StringType, nullable = true) + StructField("status", StringType, nullable = true), + StructField("error", StringType, nullable = true) )) val expectedRows = Seq( Row( @@ -48,7 +49,8 @@ class FlintJobTest extends SparkFunSuite with Matchers { "unknown", "unknown", dataSourceName, - "SUCCESS" + "SUCCESS", + "" )) val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) @@ -66,16 +68,16 @@ class FlintJobTest extends SparkFunSuite with Matchers { test("test isSuperset") { // note in input false has enclosed double quotes, while mapping just has false val input = - """{"dynamic":"false","properties":{"result":{"type":"text","fields":{"keyword":{ - |"ignore_above":256,"type":"keyword"}}},"schema":{"type":"text","fields":{"keyword":{ - |"ignore_above":256,"type":"keyword"}}},"applicationId":{"type":"keyword"},"jobRunId":{ - |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}} + """{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, + |"applicationId":{"type":"keyword"},"jobRunId":{ + |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}, + |"error":{"type":"text"}}} |""".stripMargin val mapping = - """{"dynamic":false,"properties":{"result":{"type":"text","fields":{"keyword":{ - |"type":"keyword","ignore_above":256}}},"schema":{"type":"text","fields":{"keyword":{ - |"type":"keyword","ignore_above":256}}},"jobRunId":{"type":"keyword"},"applicationId":{ + """{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"}, + |"jobRunId":{"type":"keyword"},"applicationId":{ |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}} + |"error":{"type":"text"}}} |""".stripMargin assert(FlintJob.isSuperset(input, mapping)) }