From b7362b34e6ed1c23ceafaaabb199f7fdb80ba839 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 10 May 2024 14:35:43 -0700 Subject: [PATCH] Error field in json format Signed-off-by: Louis Chu --- .../apache/spark/sql/FlintJobExecutor.scala | 19 +++++++++++++------ .../org/apache/spark/sql/FlintREPLTest.scala | 8 ++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 26ebdfc3b..e6385af1d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -9,6 +9,8 @@ import java.util.Locale import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException} import com.amazonaws.services.s3.model.AmazonS3Exception +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.IRestHighLevelClient import org.opensearch.flint.core.metrics.MetricConstants @@ -26,6 +28,9 @@ import org.apache.spark.sql.util._ trait FlintJobExecutor { this: Logging => + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + var currentTimeProvider: TimeProvider = new RealTimeProvider() var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory() var envinromentProvider: EnvironmentProvider = new RealEnvironment() @@ -422,12 +427,14 @@ trait FlintJobExecutor { message: String, errorSource: Option[String] = None, statusCode: Option[Int] = None): String = { - val sourcePrefix = errorSource.map(src => s"##ErrorSource: $src ").getOrElse("") + statusCode - .map(st => s"##StatusCode: $st ") - .getOrElse("") - val error = s"${sourcePrefix}##$message: ${e.getMessage}" - logError(error, e) - error + + val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++ + errorSource.map("ErrorSource" -> _) ++ + statusCode.map(code => "StatusCode" -> code.toString) + + val errorJson = mapper.writeValueAsString(errorDetails) + logError(errorJson, e) + errorJson } def getRootCause(e: Throwable): Throwable = { diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index 726ce7bf2..1a6aea4f4 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration._ import scala.concurrent.duration.{Duration, MINUTES} import scala.reflect.runtime.universe.TypeTag -import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException} +import com.amazonaws.services.glue.model.AccessDeniedException import com.codahale.metrics.Timer import org.mockito.ArgumentMatchersSugar import org.mockito.Mockito._ @@ -450,9 +450,9 @@ class FlintREPLTest val mockFlintCommand = mock[FlintCommand] val expectedError = ( - "##ErrorSource: AWSGlue ##StatusCode: 400 " + - "##Fail to read data from Glue. Cause: Access denied in AWS Glue service. Please check permissions. " + - "(Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: null; Proxy: null)" + """{"Message":"Fail to read data from Glue. Cause: Access denied in AWS Glue service. Please check permissions. (Service: AWSGlue; """ + + """Status Code: 400; Error Code: AccessDeniedException; Request ID: null; Proxy: null)",""" + + """"ErrorSource":"AWSGlue","StatusCode":"400"}""" ) val result = FlintREPL.processQueryException(exception, mockFlintCommand)