Skip to content

Commit

Permalink
Improve flint error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed May 9, 2024
1 parent d9c0ba8 commit ae48ce8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 60 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
libraryDependencies ++= deps(sparkVersion),
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % "2.9.2",
"com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
// handle AmazonS3Exception
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
// the transitive jackson.core dependency conflicts with existing scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public final class MetricConstants {
*/
public static final String S3_ERR_CNT_METRIC = "s3.error.count";

/**
* Metric name for counting the errors encountered with Amazon Glue operations.
*/
public static final String GLUE_ERR_CNT_METRIC = "glue.error.count";

/**
* Metric name for counting the number of sessions currently running.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.apache.spark.sql

import java.util.Locale

import com.amazonaws.services.glue.model.AWSGlueException
import com.amazonaws.services.s3.model.AmazonS3Exception
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.IRestHighLevelClient
Expand All @@ -17,6 +18,7 @@ import play.api.libs.json._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
import org.apache.spark.sql.util._
Expand Down Expand Up @@ -65,6 +67,9 @@ trait FlintJobExecutor {
"sessionId": {
"type": "keyword"
},
"jobType": {
"type": "keyword"
},
"updateTime": {
"type": "date",
"format": "strict_date_time||epoch_millis"
Expand Down Expand Up @@ -190,6 +195,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand Down Expand Up @@ -218,6 +224,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -248,6 +255,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand All @@ -267,6 +275,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -411,12 +420,12 @@ trait FlintJobExecutor {
private def handleQueryException(
e: Exception,
message: String,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
val error = s"$message: ${e.getMessage}"
errorSource: Option[String] = None,
statusCode: Option[String] = None): String = {
val sourcePrefix = errorSource.map(src => s"$src: ").getOrElse("") + statusCode
.map(st => s"Status $st: ")
.getOrElse("")
val error = s"${sourcePrefix}$message: ${e.getMessage}"
logError(error, e)
error
}
Expand All @@ -426,53 +435,30 @@ trait FlintJobExecutor {
else getRootCause(e.getCause)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
/**
* This method converts query exception into error string, which then persist to query result
* metadata
*/
def processQueryException(ex: Exception): String = {
getRootCause(ex) match {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
handleQueryException(r, "Syntax error")
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
handleQueryException(r, "Failed to read data from S3.", Some("S3"), Some(r.getErrorCode))
case r: AWSGlueException =>
incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
"Failed to read data from Glue.",
Some("Glue"),
Some(r.getErrorCode))
case r: AnalysisException =>
handleQueryException(
r,
"Fail to analyze query. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Failed to analyze query.")
case r: SparkException =>
handleQueryException(
r,
"Spark exception. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Spark exception encountered.")
case r: Exception =>
handleQueryException(
r,
"Fail to run query, cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Failed to run query.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
currentTimeProvider)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
flintCommand: FlintCommand,
sessionId: String): String = {
val error = super.processQueryException(
ex,
spark,
dataSource,
flintCommand.query,
flintCommand.queryId,
sessionId)
def processQueryException(ex: Exception, flintCommand: FlintCommand): String = {
val error = super.processQueryException(ex)
flintCommand.fail()
flintCommand.error = Some(error)
error
Expand Down Expand Up @@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionId,
startTime)
case e: Exception =>
val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "")
val error = processQueryException(e, flintCommand)
Some(
handleCommandFailureAndGetFailedData(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class JobOperator(
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
case e: Exception =>
val error = processQueryException(e, spark, dataSource, query, "", "")
val error = processQueryException(e)
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"jobRunId":{"type":"keyword"},"applicationId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
|"error":{"type":"text"}}}
|"error":{"type":"text"}, "jobType":{"type":"keyword"}}}
|""".stripMargin
assert(FlintJob.isSuperset(input, mapping))
}
Expand Down

0 comments on commit ae48ce8

Please sign in to comment.