Skip to content

Commit

Permalink
Improve flint error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed May 10, 2024
1 parent d9c0ba8 commit e24c398
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 64 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
libraryDependencies ++= deps(sparkVersion),
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % "2.9.2",
"com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
// handle AmazonS3Exception
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
// the transitive jackson.core dependency conflicts with existing scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public final class MetricConstants {
*/
public static final String S3_ERR_CNT_METRIC = "s3.error.count";

/**
* Metric name for counting the errors encountered with Amazon Glue operations.
*/
public static final String GLUE_ERR_CNT_METRIC = "glue.error.count";

/**
* Metric name for counting the number of sessions currently running.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.apache.spark.sql

import java.util.Locale

import com.amazonaws.services.glue.model.AWSGlueException
import com.amazonaws.services.s3.model.AmazonS3Exception
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.IRestHighLevelClient
Expand All @@ -17,6 +18,7 @@ import play.api.libs.json._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
import org.apache.spark.sql.util._
Expand Down Expand Up @@ -65,6 +67,9 @@ trait FlintJobExecutor {
"sessionId": {
"type": "keyword"
},
"jobType": {
"type": "keyword"
},
"updateTime": {
"type": "date",
"format": "strict_date_time||epoch_millis"
Expand Down Expand Up @@ -190,6 +195,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand Down Expand Up @@ -218,6 +224,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -248,6 +255,7 @@ trait FlintJobExecutor {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
// number is not nullable
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = true)))
Expand All @@ -267,6 +275,7 @@ trait FlintJobExecutor {
queryId,
query,
sessionId,
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
endTime,
endTime - startTime))

Expand Down Expand Up @@ -411,12 +420,12 @@ trait FlintJobExecutor {
private def handleQueryException(
e: Exception,
message: String,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
val error = s"$message: ${e.getMessage}"
errorSource: Option[String] = None,
statusCode: Option[String] = None): String = {
val sourcePrefix = errorSource.map(src => s"$src: ").getOrElse("") + statusCode
.map(st => s"Status $st: ")
.getOrElse("")
val error = s"${sourcePrefix}$message: ${e.getMessage}"
logError(error, e)
error
}
Expand All @@ -426,53 +435,30 @@ trait FlintJobExecutor {
else getRootCause(e.getCause)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
query: String,
queryId: String,
sessionId: String): String = {
/**
* This method converts query exception into error string, which then persist to query result
* metadata
*/
def processQueryException(ex: Exception): String = {
getRootCause(ex) match {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
handleQueryException(r, "Syntax error")
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
handleQueryException(r, "Failed to read data from S3.", Some("S3"), Some(r.getErrorCode))
case r: AWSGlueException =>
incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
"Failed to read data from Glue.",
Some("Glue"),
Some(r.getErrorCode))
case r: AnalysisException =>
handleQueryException(
r,
"Fail to analyze query. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Failed to analyze query.")
case r: SparkException =>
handleQueryException(
r,
"Spark exception. Cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Spark exception encountered.")
case r: Exception =>
handleQueryException(
r,
"Fail to run query, cause",
spark,
dataSource,
query,
queryId,
sessionId)
handleQueryException(r, "Failed to run query.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
currentTimeProvider)
}

def processQueryException(
ex: Exception,
spark: SparkSession,
dataSource: String,
flintCommand: FlintCommand,
sessionId: String): String = {
val error = super.processQueryException(
ex,
spark,
dataSource,
flintCommand.query,
flintCommand.queryId,
sessionId)
def processQueryException(ex: Exception, flintCommand: FlintCommand): String = {
val error = super.processQueryException(ex)
flintCommand.fail()
flintCommand.error = Some(error)
error
Expand Down Expand Up @@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionId,
startTime)
case e: Exception =>
val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "")
val error = processQueryException(e, flintCommand)
Some(
handleCommandFailureAndGetFailedData(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class JobOperator(
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
case e: Exception =>
val error = processQueryException(e, spark, dataSource, query, "", "")
val error = processQueryException(e)
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider}

class FlintJobTest extends SparkFunSuite with JobMatchers {

val spark =
SparkSession.builder().appName("Test").master("local").getOrCreate()

spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming")
// Define input dataframe
val inputSchema = StructType(
Seq(
Expand All @@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = false)))

Expand All @@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
"10",
"select 1",
"20",
"streaming",
currentTime,
queryRunTime))
val expected: DataFrame =
Expand Down Expand Up @@ -93,7 +96,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
|"jobRunId":{"type":"keyword"},"applicationId":{
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
|"error":{"type":"text"}}}
|"error":{"type":"text"}, "jobType":{"type":"keyword"}}}
|""".stripMargin
assert(FlintJob.isSuperset(input, mapping))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class FlintREPLTest
StructField("queryId", StringType, nullable = true),
StructField("queryText", StringType, nullable = true),
StructField("sessionId", StringType, nullable = true),
StructField("jobType", StringType, nullable = true),
StructField("updateTime", LongType, nullable = false),
StructField("queryRunTime", LongType, nullable = false)))

Expand All @@ -235,10 +236,11 @@ class FlintREPLTest
"10",
"select 1",
"20",
"interactive",
currentTime,
queryRunTime))
val spark = SparkSession.builder().appName("Test").master("local").getOrCreate()

spark.conf.set(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get)
val expected =
spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema)

Expand Down Expand Up @@ -547,10 +549,13 @@ class FlintREPLTest
test("executeAndHandle should handle TimeoutException properly") {
val mockSparkSession = mock[SparkSession]
val mockFlintCommand = mock[FlintCommand]
val mockConf = mock[RuntimeConfig]
when(mockSparkSession.conf).thenReturn(mockConf)
when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key))
.thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get)
// val mockExecutionContextExecutor: ExecutionContextExecutor = mock[ExecutionContextExecutor]
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

try {
val dataSource = "someDataSource"
val sessionId = "someSessionId"
Expand Down Expand Up @@ -596,6 +601,10 @@ class FlintREPLTest

test("executeAndHandle should handle ParseException properly") {
val mockSparkSession = mock[SparkSession]
val mockConf = mock[RuntimeConfig]
when(mockSparkSession.conf).thenReturn(mockConf)
when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key))
.thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get)
val flintCommand =
new FlintCommand(
"Running",
Expand All @@ -606,7 +615,6 @@ class FlintREPLTest
None)
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

try {
val dataSource = "someDataSource"
val sessionId = "someSessionId"
Expand Down Expand Up @@ -1020,6 +1028,11 @@ class FlintREPLTest
val sparkContext = mock[SparkContext]
when(mockSparkSession.sparkContext).thenReturn(sparkContext)

val mockConf = mock[RuntimeConfig]
when(mockSparkSession.conf).thenReturn(mockConf)
when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key))
.thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get)

when(expectedDataFrame.toDF(any[Seq[String]]: _*)).thenReturn(expectedDataFrame)

val flintSessionIndexUpdater = mock[OpenSearchUpdater]
Expand Down

0 comments on commit e24c398

Please sign in to comment.