diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala new file mode 100644 index 000000000..caa6b73ab --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging + +// Define constants for common error messages +object ExceptionMessages { + val SyntaxErrorPrefix = "Syntax error" + val S3ErrorPrefix = "Fail to read data from S3. Cause" + val GlueErrorPrefix = "Fail to read data from Glue. Cause" + val QueryAnalysisErrorPrefix = "Fail to analyze query. Cause" + val SparkExceptionErrorPrefix = "Spark exception. Cause" + val QueryRunErrorPrefix = "Fail to run query. Cause" + val GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions." +} diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 665ec5a27..7fbeefd39 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -13,7 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.IRestHighLevelClient -import org.opensearch.flint.core.logging.{CustomLogging, OperationMessage} +import org.opensearch.flint.core.logging.{CustomLogging, ExceptionMessages, OperationMessage} import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import play.api.libs.json._ @@ -454,12 +454,12 @@ trait FlintJobExecutor { def processQueryException(ex: Exception): String = { getRootCause(ex) match { case r: ParseException => - handleQueryException(r, "Syntax error") + handleQueryException(r, ExceptionMessages.SyntaxErrorPrefix) case r: AmazonS3Exception => incrementCounter(MetricConstants.S3_ERR_CNT_METRIC) handleQueryException( r, - "Fail to read data from S3. Cause", + ExceptionMessages.S3ErrorPrefix, Some(r.getServiceName), Some(r.getStatusCode)) case r: AWSGlueException => @@ -467,21 +467,34 @@ trait FlintJobExecutor { // Redact Access denied in AWS Glue service r match { case accessDenied: AccessDeniedException => - accessDenied.setErrorMessage( - "Access denied in AWS Glue service. Please check permissions.") + accessDenied.setErrorMessage(ExceptionMessages.GlueAccessDeniedMessage) case _ => // No additional action for other types of AWSGlueException } handleQueryException( r, - "Fail to read data from Glue. Cause", + ExceptionMessages.GlueErrorPrefix, Some(r.getServiceName), Some(r.getStatusCode)) case r: AnalysisException => - handleQueryException(r, "Fail to analyze query. Cause") + handleQueryException(r, ExceptionMessages.QueryAnalysisErrorPrefix) case r: SparkException => - handleQueryException(r, "Spark exception. Cause") + handleQueryException(r, ExceptionMessages.SparkExceptionErrorPrefix) case r: Exception => - handleQueryException(r, "Fail to run query. Cause") + handleGeneralException(r) + } + } + + private def handleGeneralException(ex: Exception): String = { + val rootCauseClassName = ex.getClass.getName + val errMsg = ex.getMessage + logDebug(s"Root cause class name: $rootCauseClassName") + logDebug(s"Root cause error message: $errMsg") + if (rootCauseClassName == "org.apache.hadoop.hive.metastore.api.MetaException" && + errMsg.contains("com.amazonaws.services.glue.model.AccessDeniedException")) { + val e = new SecurityException(ExceptionMessages.GlueAccessDeniedMessage) + handleQueryException(e, ExceptionMessages.QueryRunErrorPrefix) + } else { + handleQueryException(ex, ExceptionMessages.QueryRunErrorPrefix) } } diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index 546cd8e97..1caf6dabf 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -464,6 +464,23 @@ class FlintREPLTest assert(result == expectedError) } + test("handleGeneralException should handle MetaException with AccessDeniedException properly") { + val mockFlintCommand = mock[FlintCommand] + + // Simulate the root cause being MetaException + val exception = new org.apache.hadoop.hive.metastore.api.MetaException( + "AWSCatalogMetastoreClient: Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: User: ****** is not authorized to perform: ******") + + val result = FlintREPL.processQueryException(exception, mockFlintCommand) + + val expectedError = + """{"Message":"Fail to run query. Cause: Access denied in AWS Glue service. Please check permissions."}""" + + result shouldEqual expectedError + verify(mockFlintCommand).fail() + verify(mockFlintCommand).error = Some(expectedError) + } + test("Doc Exists and excludeJobIds is an ArrayList Containing JobId") { val sessionId = "session123" val jobId = "jobABC"