From 45835cb27407166669f660cc665436ed6cc93200 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 10 May 2024 18:16:30 -0700 Subject: [PATCH] Improve flint error handling (#335) * Improve flint error handling Signed-off-by: Louis Chu * Error field in json format Signed-off-by: Louis Chu * Superset update Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- build.sbt | 2 + .../flint/core/metrics/MetricConstants.java | 5 + .../apache/spark/sql/FlintJobExecutor.scala | 94 ++++++++++--------- .../org/apache/spark/sql/FlintREPL.scala | 17 +--- .../org/apache/spark/sql/JobOperator.scala | 2 +- .../org/apache/spark/sql/FlintJobTest.scala | 18 +++- .../org/apache/spark/sql/FlintREPLTest.scala | 45 ++++++++- 7 files changed, 115 insertions(+), 68 deletions(-) diff --git a/build.sbt b/build.sbt index 4cf923fc2..c8c94ad1c 100644 --- a/build.sbt +++ b/build.sbt @@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application")) libraryDependencies ++= deps(sparkVersion), libraryDependencies ++= Seq( "com.typesafe.play" %% "play-json" % "2.9.2", + "com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), // handle AmazonS3Exception "com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided" // the transitive jackson.core dependency conflicts with existing scala diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 6a081a740..4cdfcee01 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -27,6 +27,11 @@ public final class MetricConstants { */ public static final String S3_ERR_CNT_METRIC = "s3.error.count"; + /** + * Metric name for counting the errors encountered with Amazon Glue operations. + */ + public static final String GLUE_ERR_CNT_METRIC = "glue.error.count"; + /** * Metric name for counting the number of sessions currently running. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 1e5df21e1..fcb2ab56c 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -7,7 +7,10 @@ package org.apache.spark.sql import java.util.Locale +import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException} import com.amazonaws.services.s3.model.AmazonS3Exception +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.IRestHighLevelClient import org.opensearch.flint.core.metrics.MetricConstants @@ -17,6 +20,7 @@ import play.api.libs.json._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY import org.apache.spark.sql.types._ import org.apache.spark.sql.util._ @@ -24,6 +28,9 @@ import org.apache.spark.sql.util._ trait FlintJobExecutor { this: Logging => + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + var currentTimeProvider: TimeProvider = new RealTimeProvider() var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory() var envinromentProvider: EnvironmentProvider = new RealEnvironment() @@ -65,6 +72,9 @@ trait FlintJobExecutor { "sessionId": { "type": "keyword" }, + "jobType": { + "type": "keyword" + }, "updateTime": { "type": "date", "format": "strict_date_time||epoch_millis" @@ -190,6 +200,7 @@ trait FlintJobExecutor { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), // number is not nullable StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = true))) @@ -218,6 +229,7 @@ trait FlintJobExecutor { queryId, query, sessionId, + spark.conf.get(FlintSparkConf.JOB_TYPE.key), endTime, endTime - startTime)) @@ -248,6 +260,7 @@ trait FlintJobExecutor { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), // number is not nullable StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = true))) @@ -267,6 +280,7 @@ trait FlintJobExecutor { queryId, query, sessionId, + spark.conf.get(FlintSparkConf.JOB_TYPE.key), endTime, endTime - startTime)) @@ -330,7 +344,7 @@ trait FlintJobExecutor { val inputJson = Json.parse(input) val mappingJson = Json.parse(mapping) - compareJson(inputJson, mappingJson) + compareJson(inputJson, mappingJson) || compareJson(mappingJson, inputJson) } def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = { @@ -411,14 +425,16 @@ trait FlintJobExecutor { private def handleQueryException( e: Exception, message: String, - spark: SparkSession, - dataSource: String, - query: String, - queryId: String, - sessionId: String): String = { - val error = s"$message: ${e.getMessage}" - logError(error, e) - error + errorSource: Option[String] = None, + statusCode: Option[Int] = None): String = { + + val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++ + errorSource.map("ErrorSource" -> _) ++ + statusCode.map(code => "StatusCode" -> code.toString) + + val errorJson = mapper.writeValueAsString(errorDetails) + logError(errorJson, e) + errorJson } def getRootCause(e: Throwable): Throwable = { @@ -426,53 +442,41 @@ trait FlintJobExecutor { else getRootCause(e.getCause) } - def processQueryException( - ex: Exception, - spark: SparkSession, - dataSource: String, - query: String, - queryId: String, - sessionId: String): String = { + /** + * This method converts query exception into error string, which then persist to query result + * metadata + */ + def processQueryException(ex: Exception): String = { getRootCause(ex) match { case r: ParseException => - handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId) + handleQueryException(r, "Syntax error") case r: AmazonS3Exception => incrementCounter(MetricConstants.S3_ERR_CNT_METRIC) handleQueryException( r, "Fail to read data from S3. Cause", - spark, - dataSource, - query, - queryId, - sessionId) - case r: AnalysisException => + Some(r.getServiceName), + Some(r.getStatusCode)) + case r: AWSGlueException => + incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC) + // Redact Access denied in AWS Glue service + r match { + case accessDenied: AccessDeniedException => + accessDenied.setErrorMessage( + "Access denied in AWS Glue service. Please check permissions.") + case _ => // No additional action for other types of AWSGlueException + } handleQueryException( r, - "Fail to analyze query. Cause", - spark, - dataSource, - query, - queryId, - sessionId) + "Fail to read data from Glue. Cause", + Some(r.getServiceName), + Some(r.getStatusCode)) + case r: AnalysisException => + handleQueryException(r, "Fail to analyze query. Cause") case r: SparkException => - handleQueryException( - r, - "Spark exception. Cause", - spark, - dataSource, - query, - queryId, - sessionId) + handleQueryException(r, "Spark exception. Cause") case r: Exception => - handleQueryException( - r, - "Fail to run query, cause", - spark, - dataSource, - query, - queryId, - sessionId) + handleQueryException(r, "Fail to run query. Cause") } } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index b96163693..1adb592c8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor { currentTimeProvider) } - def processQueryException( - ex: Exception, - spark: SparkSession, - dataSource: String, - flintCommand: FlintCommand, - sessionId: String): String = { - val error = super.processQueryException( - ex, - spark, - dataSource, - flintCommand.query, - flintCommand.queryId, - sessionId) + def processQueryException(ex: Exception, flintCommand: FlintCommand): String = { + val error = super.processQueryException(ex) flintCommand.fail() flintCommand.error = Some(error) error @@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor { sessionId, startTime) case e: Exception => - val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "") + val error = processQueryException(e, flintCommand) Some( handleCommandFailureAndGetFailedData( spark, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 5969f0573..6421c7d57 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -66,7 +66,7 @@ case class JobOperator( dataToWrite = Some( getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)) case e: Exception => - val error = processQueryException(e, spark, dataSource, query, "", "") + val error = processQueryException(e) dataToWrite = Some( getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)) } finally { diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala index aceb9468f..19f596e31 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala @@ -6,6 +6,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider} @@ -13,7 +14,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() - + spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming") // Define input dataframe val inputSchema = StructType( Seq( @@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = false))) @@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { "10", "select 1", "20", + "streaming", currentTime, queryRunTime)) val expected: DataFrame = @@ -82,7 +85,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { } test("test isSuperset") { - // note in input false has enclosed double quotes, while mapping just has false + // Note in input false has enclosed double quotes, while mapping just has false val input = """{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, |"applicationId":{"type":"keyword"},"jobRunId":{ @@ -90,12 +93,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { |"error":{"type":"text"}}} |""".stripMargin val mapping = - """{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"}, - |"jobRunId":{"type":"keyword"},"applicationId":{ - |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}} + """{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, "jobType":{"type": "keyword"}, + |"applicationId":{"type":"keyword"},"jobRunId":{ + |"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}, |"error":{"type":"text"}}} |""".stripMargin + + // Assert that input is a superset of mapping assert(FlintJob.isSuperset(input, mapping)) + + // Assert that mapping is a superset of input + assert(FlintJob.isSuperset(mapping, input)) } test("default streaming query maxExecutors is 10") { diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index ea789c161..1a6aea4f4 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -15,6 +15,7 @@ import scala.concurrent.duration._ import scala.concurrent.duration.{Duration, MINUTES} import scala.reflect.runtime.universe.TypeTag +import com.amazonaws.services.glue.model.AccessDeniedException import com.codahale.metrics.Timer import org.mockito.ArgumentMatchersSugar import org.mockito.Mockito._ @@ -216,6 +217,7 @@ class FlintREPLTest StructField("queryId", StringType, nullable = true), StructField("queryText", StringType, nullable = true), StructField("sessionId", StringType, nullable = true), + StructField("jobType", StringType, nullable = true), StructField("updateTime", LongType, nullable = false), StructField("queryRunTime", LongType, nullable = false))) @@ -235,10 +237,11 @@ class FlintREPLTest "10", "select 1", "20", + "interactive", currentTime, queryRunTime)) val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() - + spark.conf.set(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get) val expected = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) @@ -436,6 +439,31 @@ class FlintREPLTest assert(result) } + test("processQueryException should handle exceptions, fail the command, and set the error") { + val exception = new AccessDeniedException( + "Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: " + + "User: ****** is not authorized to perform: glue:GetDatabase on resource: ****** " + + "because no identity-based policy allows the glue:GetDatabase action") + exception.setStatusCode(400) + exception.setErrorCode("AccessDeniedException") + exception.setServiceName("AWSGlue") + + val mockFlintCommand = mock[FlintCommand] + val expectedError = ( + """{"Message":"Fail to read data from Glue. Cause: Access denied in AWS Glue service. Please check permissions. (Service: AWSGlue; """ + + """Status Code: 400; Error Code: AccessDeniedException; Request ID: null; Proxy: null)",""" + + """"ErrorSource":"AWSGlue","StatusCode":"400"}""" + ) + + val result = FlintREPL.processQueryException(exception, mockFlintCommand) + + result shouldEqual expectedError + verify(mockFlintCommand).fail() + verify(mockFlintCommand).error = Some(expectedError) + + assert(result == expectedError) + } + test("Doc Exists and excludeJobIds is an ArrayList Containing JobId") { val sessionId = "session123" val jobId = "jobABC" @@ -547,10 +575,13 @@ class FlintREPLTest test("executeAndHandle should handle TimeoutException properly") { val mockSparkSession = mock[SparkSession] val mockFlintCommand = mock[FlintCommand] + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) // val mockExecutionContextExecutor: ExecutionContextExecutor = mock[ExecutionContextExecutor] val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - try { val dataSource = "someDataSource" val sessionId = "someSessionId" @@ -596,6 +627,10 @@ class FlintREPLTest test("executeAndHandle should handle ParseException properly") { val mockSparkSession = mock[SparkSession] + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) val flintCommand = new FlintCommand( "Running", @@ -606,7 +641,6 @@ class FlintREPLTest None) val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - try { val dataSource = "someDataSource" val sessionId = "someSessionId" @@ -1020,6 +1054,11 @@ class FlintREPLTest val sparkContext = mock[SparkContext] when(mockSparkSession.sparkContext).thenReturn(sparkContext) + val mockConf = mock[RuntimeConfig] + when(mockSparkSession.conf).thenReturn(mockConf) + when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) + .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) + when(expectedDataFrame.toDF(any[Seq[String]]: _*)).thenReturn(expectedDataFrame) val flintSessionIndexUpdater = mock[OpenSearchUpdater]