diff --git a/build.sbt b/build.sbt index 4cf923fc2..c8c94ad1c 100644 --- a/build.sbt +++ b/build.sbt @@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application")) libraryDependencies ++= deps(sparkVersion), libraryDependencies ++= Seq( "com.typesafe.play" %% "play-json" % "2.9.2", + "com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), // handle AmazonS3Exception "com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided" // the transitive jackson.core dependency conflicts with existing scala diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 6a081a740..4cdfcee01 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -27,6 +27,11 @@ public final class MetricConstants { */ public static final String S3_ERR_CNT_METRIC = "s3.error.count"; + /** + * Metric name for counting the errors encountered with Amazon Glue operations. + */ + public static final String GLUE_ERR_CNT_METRIC = "glue.error.count"; + /** * Metric name for counting the number of sessions currently running. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 1e5df21e1..70272cbf5 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -7,6 +7,7 @@ package org.apache.spark.sql import java.util.Locale +import com.amazonaws.services.glue.model.AWSGlueException import com.amazonaws.services.s3.model.AmazonS3Exception import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.IRestHighLevelClient @@ -17,6 +18,7 @@ import play.api.libs.json._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY import org.apache.spark.sql.types._ import org.apache.spark.sql.util._ @@ -65,6 +67,9 @@ trait FlintJobExecutor { "sessionId": { "type": "keyword" }, + "jobType": { + "type": "keyword" + }, "updateTime": { "type": "date", "format": "strict_date_time||epoch_millis" @@ -190,6 +195,7 @@ trait FlintJobExecutor { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), // number is not nullable StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = true))) @@ -218,6 +224,7 @@ trait FlintJobExecutor { queryId, query, sessionId, + spark.conf.get(FlintSparkConf.JOB_TYPE.key), endTime, endTime - startTime)) @@ -248,6 +255,7 @@ trait FlintJobExecutor { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), // number is not nullable StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = true))) @@ -267,6 +275,7 @@ trait FlintJobExecutor { queryId, query, sessionId, + spark.conf.get(FlintSparkConf.JOB_TYPE.key), endTime, endTime - startTime)) @@ -411,12 +420,12 @@ trait FlintJobExecutor { private def handleQueryException( e: Exception, message: String, - spark: SparkSession, - dataSource: String, - query: String, - queryId: String, - sessionId: String): String = { - val error = s"$message: ${e.getMessage}" + errorSource: Option[String] = None, + statusCode: Option[String] = None): String = { + val sourcePrefix = errorSource.map(src => s"$src: ").getOrElse("") + statusCode + .map(st => s"Status $st: ") + .getOrElse("") + val error = s"${sourcePrefix}$message: ${e.getMessage}" logError(error, e) error } @@ -426,53 +435,30 @@ trait FlintJobExecutor { else getRootCause(e.getCause) } - def processQueryException( - ex: Exception, - spark: SparkSession, - dataSource: String, - query: String, - queryId: String, - sessionId: String): String = { + /** + * This method converts query exception into error string, which then persist to query result + * metadata + */ + def processQueryException(ex: Exception): String = { getRootCause(ex) match { case r: ParseException => - handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId) + handleQueryException(r, "Syntax error") case r: AmazonS3Exception => incrementCounter(MetricConstants.S3_ERR_CNT_METRIC) + handleQueryException(r, "Failed to read data from S3.", Some("S3"), Some(r.getErrorCode)) + case r: AWSGlueException => + incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC) handleQueryException( r, - "Fail to read data from S3. Cause", - spark, - dataSource, - query, - queryId, - sessionId) + "Failed to read data from Glue.", + Some("Glue"), + Some(r.getErrorCode)) case r: AnalysisException => - handleQueryException( - r, - "Fail to analyze query. Cause", - spark, - dataSource, - query, - queryId, - sessionId) + handleQueryException(r, "Failed to analyze query.") case r: SparkException => - handleQueryException( - r, - "Spark exception. Cause", - spark, - dataSource, - query, - queryId, - sessionId) + handleQueryException(r, "Spark exception encountered.") case r: Exception => - handleQueryException( - r, - "Fail to run query, cause", - spark, - dataSource, - query, - queryId, - sessionId) + handleQueryException(r, "Failed to run query.") } } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index b96163693..1adb592c8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor { currentTimeProvider) } - def processQueryException( - ex: Exception, - spark: SparkSession, - dataSource: String, - flintCommand: FlintCommand, - sessionId: String): String = { - val error = super.processQueryException( - ex, - spark, - dataSource, - flintCommand.query, - flintCommand.queryId, - sessionId) + def processQueryException(ex: Exception, flintCommand: FlintCommand): String = { + val error = super.processQueryException(ex) flintCommand.fail() flintCommand.error = Some(error) error @@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor { sessionId, startTime) case e: Exception => - val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "") + val error = processQueryException(e, flintCommand) Some( handleCommandFailureAndGetFailedData( spark, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 5969f0573..6421c7d57 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -66,7 +66,7 @@ case class JobOperator( dataToWrite = Some( getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)) case e: Exception => - val error = processQueryException(e, spark, dataSource, query, "", "") + val error = processQueryException(e) dataToWrite = Some( getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)) } finally { diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala index aceb9468f..11c8644d0 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala @@ -6,6 +6,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider} @@ -13,7 +14,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() - + spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming") // Define input dataframe val inputSchema = StructType( Seq( @@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = false))) @@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { "10", "select 1", "20", + "streaming", currentTime, queryRunTime)) val expected: DataFrame = @@ -93,7 +96,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { """{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"}, |"jobRunId":{"type":"keyword"},"applicationId":{ |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}} - |"error":{"type":"text"}}} + |"error":{"type":"text"}, "jobType":{"type":"keyword"}}} |""".stripMargin assert(FlintJob.isSuperset(input, mapping)) } diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index ea789c161..7321988e3 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -216,6 +216,7 @@ class FlintREPLTest StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = false))) @@ -235,10 +236,11 @@ class FlintREPLTest "10", "select 1", "20", + "interactive", currentTime, queryRunTime)) val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() - + spark.conf.set(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get) val expected = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) @@ -547,10 +549,13 @@ class FlintREPLTest test("executeAndHandle should handle TimeoutException properly") { val mockSparkSession = mock[SparkSession] val mockFlintCommand = mock[FlintCommand] + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) // val mockExecutionContextExecutor: ExecutionContextExecutor = mock[ExecutionContextExecutor] val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - try { val dataSource = "someDataSource" val sessionId = "someSessionId" @@ -596,6 +601,10 @@ class FlintREPLTest test("executeAndHandle should handle ParseException properly") { val mockSparkSession = mock[SparkSession] + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) val flintCommand = new FlintCommand( "Running", @@ -606,7 +615,6 @@ class FlintREPLTest None) val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - try { val dataSource = "someDataSource" val sessionId = "someSessionId" @@ -1020,6 +1028,11 @@ class FlintREPLTest val sparkContext = mock[SparkContext] when(mockSparkSession.sparkContext).thenReturn(sparkContext) + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) + when(expectedDataFrame.toDF(any[Seq[String]]: _*)).thenReturn(expectedDataFrame) val flintSessionIndexUpdater = mock[OpenSearchUpdater]