Skip to content

Commit

Permalink
Improve flint error handling (#335)
Browse files Browse the repository at this point in the history
* Improve flint error handling

Signed-off-by: Louis Chu <[email protected]>

* Error field in json format

Signed-off-by: Louis Chu <[email protected]>

* Superset update

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored May 11, 2024
1 parent d9c0ba8 commit 45835cb
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 68 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
libraryDependencies ++= deps(sparkVersion),
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % "2.9.2",
"com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
// handle AmazonS3Exception
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
// the transitive jackson.core dependency conflicts with existing scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public final class MetricConstants {
*/
public static final String S3_ERR_CNT_METRIC = "s3.error.count";

/**
* Metric name for counting the errors encountered with Amazon Glue operations.
*/
public static final String GLUE_ERR_CNT_METRIC = "glue.error.count";

/**
* Metric name for counting the number of sessions currently running.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ package org.apache.spark.sql

import java.util.Locale

import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException}
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.IRestHighLevelClient
import org.opensearch.flint.core.metrics.MetricConstants
Expand All @@ -17,13 +20,17 @@ import play.api.libs.json._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
import org.apache.spark.sql.util._

trait FlintJobExecutor {
this: Logging =>

val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)

var currentTimeProvider: TimeProvider = new RealTimeProvider()
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
var envinromentProvider: EnvironmentProvider = new RealEnvironment()
Expand Down Expand Up @@ -65,6 +72,9 @@ trait FlintJobExecutor {
"sessionId": {
"type": "keyword"
},
"jobType": {
"type": "keyword"
},
"updateTime": {
"type": "date",
"format": "strict_date_time||epoch_millis"
Expand Down Expand Up @@ -190,6 +200,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand Down Expand Up @@ -218,6 +229,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -248,6 +260,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand All @@ -267,6 +280,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -330,7 +344,7 @@ trait FlintJobExecutor {
val inputJson = Json.parse(input)
val mappingJson = Json.parse(mapping)

compareJson(inputJson, mappingJson)
compareJson(inputJson, mappingJson) || compareJson(mappingJson, inputJson)
}

def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = {
Expand Down Expand Up @@ -411,68 +425,58 @@ trait FlintJobExecutor {
private def handleQueryException(
e: Exception,
message: String,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
val error = s"$message: ${e.getMessage}"
logError(error, e)
error
errorSource: Option[String] = None,
statusCode: Option[Int] = None): String = {

val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++
errorSource.map("ErrorSource" -> _) ++
statusCode.map(code => "StatusCode" -> code.toString)

val errorJson = mapper.writeValueAsString(errorDetails)
logError(errorJson, e)
errorJson
}

def getRootCause(e: Throwable): Throwable = {
if (e.getCause == null) e
else getRootCause(e.getCause)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
/**
* This method converts query exception into error string, which then persist to query result
* metadata
*/
def processQueryException(ex: Exception): String = {
getRootCause(ex) match {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
handleQueryException(r, "Syntax error")
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
case r: AnalysisException =>
Some(r.getServiceName),
Some(r.getStatusCode))
case r: AWSGlueException =>
incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC)
// Redact Access denied in AWS Glue service
r match {
case accessDenied: AccessDeniedException =>
accessDenied.setErrorMessage(
"Access denied in AWS Glue service. Please check permissions.")
case _ => // No additional action for other types of AWSGlueException
}
handleQueryException(
r,
"Fail to analyze query. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
"Fail to read data from Glue. Cause",
Some(r.getServiceName),
Some(r.getStatusCode))
case r: AnalysisException =>
handleQueryException(r, "Fail to analyze query. Cause")
case r: SparkException =>
handleQueryException(
r,
"Spark exception. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Spark exception. Cause")
case r: Exception =>
handleQueryException(
r,
"Fail to run query, cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Fail to run query. Cause")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
currentTimeProvider)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
flintCommand: FlintCommand,
sessionId: String): String = {
val error = super.processQueryException(
ex,
spark,
dataSource,
flintCommand.query,
flintCommand.queryId,
sessionId)
def processQueryException(ex: Exception, flintCommand: FlintCommand): String = {
val error = super.processQueryException(ex)
flintCommand.fail()
flintCommand.error = Some(error)
error
Expand Down Expand Up @@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionId,
startTime)
case e: Exception =>
val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "")
val error = processQueryException(e, flintCommand)
Some(
handleCommandFailureAndGetFailedData(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class JobOperator(
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
case e: Exception =>
val error = processQueryException(e, spark, dataSource, query, "", "")
val error = processQueryException(e)
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider}

class FlintJobTest extends SparkFunSuite with JobMatchers {

val spark =
SparkSession.builder().appName("Test").master("local").getOrCreate()

spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming")
// Define input dataframe
val inputSchema = StructType(
Seq(
Expand All @@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = false)))

Expand All @@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
"10",
"select 1",
"20",
"streaming",
currentTime,
queryRunTime))
val expected: DataFrame =
Expand All @@ -82,20 +85,25 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
}

test("test isSuperset") {
// note in input false has enclosed double quotes, while mapping just has false
// Note in input false has enclosed double quotes, while mapping just has false
val input =
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"applicationId":{"type":"keyword"},"jobRunId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
|"error":{"type":"text"}}}
|""".stripMargin
val mapping =
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"jobRunId":{"type":"keyword"},"applicationId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, "jobType":{"type": "keyword"},
|"applicationId":{"type":"keyword"},"jobRunId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
|"error":{"type":"text"}}}
|""".stripMargin

// Assert that input is a superset of mapping
assert(FlintJob.isSuperset(input, mapping))

// Assert that mapping is a superset of input
assert(FlintJob.isSuperset(mapping, input))
}

test("default streaming query maxExecutors is 10") {
Expand Down
Loading

0 comments on commit 45835cb

Please sign in to comment.