Skip to content

Commit

Permalink
Support UnrecoverableException
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Nov 14, 2024
1 parent 439cf3e commit b66aac0
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql.exception

/**
* Represents an unrecoverable exception in session management and statement execution. This
* exception is used for errors that cannot be handled or recovered from.
*/
class UnrecoverableException private (message: String, cause: Throwable)
extends RuntimeException(message, cause) {

def this(cause: Throwable) =
this(cause.getMessage, cause)
}

object UnrecoverableException {
def apply(cause: Throwable): UnrecoverableException =
new UnrecoverableException(cause)

def apply(message: String, cause: Throwable): UnrecoverableException =
new UnrecoverableException(message, cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkConfConstants.{DEFAULT_SQL_EXTENSIONS, SQL_EXTENSIONS_KEY}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.exception.UnrecoverableException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
Expand All @@ -44,12 +45,13 @@ trait FlintJobExecutor {
this: Logging =>

val mapper = new ObjectMapper()
val throwableHandler = new ThrowableHandler()

var currentTimeProvider: TimeProvider = new RealTimeProvider()
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
var environmentProvider: EnvironmentProvider = new RealEnvironment()
var enableHiveSupport: Boolean = true
// termiante JVM in the presence non-deamon thread before exiting
// terminate JVM in the presence non-daemon thread before exiting
var terminateJVM = true

// The enabled setting, which can be applied only to the top-level mapping definition and to object fields,
Expand Down Expand Up @@ -435,11 +437,13 @@ trait FlintJobExecutor {
}

private def handleQueryException(
e: Exception,
t: Throwable,
messagePrefix: String,
errorSource: Option[String] = None,
statusCode: Option[Int] = None): String = {
val errorMessage = s"$messagePrefix: ${e.getMessage}"
throwableHandler.setThrowable(t)

val errorMessage = s"$messagePrefix: ${t.getMessage}"
val errorDetails = new java.util.LinkedHashMap[String, String]()
errorDetails.put("Message", errorMessage)
errorSource.foreach(es => errorDetails.put("ErrorSource", es))
Expand All @@ -450,25 +454,25 @@ trait FlintJobExecutor {
// CustomLogging will call log4j logger.error() underneath
statusCode match {
case Some(code) =>
CustomLogging.logError(new OperationMessage(errorMessage, code), e)
CustomLogging.logError(new OperationMessage(errorMessage, code), t)
case None =>
CustomLogging.logError(errorMessage, e)
CustomLogging.logError(errorMessage, t)
}

errorJson
}

def getRootCause(e: Throwable): Throwable = {
if (e.getCause == null) e
else getRootCause(e.getCause)
def getRootCause(t: Throwable): Throwable = {
if (t.getCause == null) t
else getRootCause(t.getCause)
}

/**
* This method converts query exception into error string, which then persist to query result
* metadata
*/
def processQueryException(ex: Exception): String = {
getRootCause(ex) match {
def processQueryException(throwable: Throwable): String = {
getRootCause(throwable) match {
case r: ParseException =>
handleQueryException(r, ExceptionMessages.SyntaxErrorPrefix)
case r: AmazonS3Exception =>
Expand All @@ -495,15 +499,15 @@ trait FlintJobExecutor {
handleQueryException(r, ExceptionMessages.QueryAnalysisErrorPrefix)
case r: SparkException =>
handleQueryException(r, ExceptionMessages.SparkExceptionErrorPrefix)
case r: Exception =>
val rootCauseClassName = r.getClass.getName
val errMsg = r.getMessage
case t: Throwable =>
val rootCauseClassName = t.getClass.getName
val errMsg = t.getMessage
if (rootCauseClassName == "org.apache.hadoop.hive.metastore.api.MetaException" &&
errMsg.contains("com.amazonaws.services.glue.model.AccessDeniedException")) {
val e = new SecurityException(ExceptionMessages.GlueAccessDeniedMessage)
handleQueryException(e, ExceptionMessages.QueryRunErrorPrefix)
} else {
handleQueryException(r, ExceptionMessages.QueryRunErrorPrefix)
handleQueryException(t, ExceptionMessages.QueryRunErrorPrefix)
}
}
}
Expand Down Expand Up @@ -532,6 +536,14 @@ trait FlintJobExecutor {
throw t
}

def checkAndThrowUnrecoverableExceptions(): Unit = {
throwableHandler.exceptionThrown.foreach {
case e: UnrecoverableException =>
throw e
case _ => // Do nothing for other types of exceptions
}
}

def instantiate[T](defaultConstructor: => T, className: String, args: Any*): T = {
if (Strings.isNullOrEmpty(className)) {
defaultConstructor
Expand All @@ -551,5 +563,4 @@ trait FlintJobExecutor {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ object FlintREPL extends Logging with FlintJobExecutor {
private val statementRunningCount = new AtomicInteger(0)
private var queryCountMetric = 0

// After handling any exceptions from stopping the Spark session,
// check if there's a stored exception and throw it if it's an UnrecoverableException
sys.addShutdownHook(checkAndThrowUnrecoverableExceptions())

def main(args: Array[String]) {
val (queryOption, resultIndexOption) = parseArgs(args)

Expand Down Expand Up @@ -187,9 +191,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
recordSessionSuccess(sessionTimerContext)
} catch {
case e: Exception =>
case t: Throwable =>
handleSessionError(
e,
t,
applicationId,
jobId,
sessionId,
Expand Down Expand Up @@ -420,24 +424,29 @@ object FlintREPL extends Logging with FlintJobExecutor {
}

def handleSessionError(
e: Exception,
t: Throwable,
applicationId: String,
jobId: String,
sessionId: String,
sessionManager: SessionManager,
jobStartTime: Long,
sessionTimerContext: Timer.Context): Unit = {
val error = s"Session error: ${e.getMessage}"
CustomLogging.logError(error, e)
throwableHandler.recordThrowable(s"Session error: ${t.getMessage}", t)

try {
refreshSessionState(
applicationId,
jobId,
sessionId,
sessionManager,
jobStartTime,
SessionStates.FAIL,
Some(t.getMessage))
} catch {
case t: Throwable =>
throwableHandler.recordThrowable(s"Failed to update session state.", t)
}

refreshSessionState(
applicationId,
jobId,
sessionId,
sessionManager,
jobStartTime,
SessionStates.FAIL,
Some(e.getMessage))
recordSessionFailed(sessionTimerContext)
}

Expand Down Expand Up @@ -485,8 +494,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
startTime)
}

def processQueryException(ex: Exception, flintStatement: FlintStatement): String = {
val error = super.processQueryException(ex)
def processQueryException(t: Throwable, flintStatement: FlintStatement): String = {
val error = super.processQueryException(t)
flintStatement.fail()
flintStatement.error = Some(error)
error
Expand Down Expand Up @@ -581,11 +590,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
} catch {
// e.g., maybe due to authentication service connection issue
// or invalid catalog (e.g., we are operating on data not defined in provided data source)
case e: Exception =>
val error = s"""Fail to write result of ${flintStatement}, cause: ${e.getMessage}"""
CustomLogging.logError(error, e)
case e: Throwable =>
throwableHandler.recordThrowable(
s"""Fail to write result of ${flintStatement}, cause: ${e.getMessage}""",
e)
flintStatement.fail()
} finally {
logInfo(s"command complete: $flintStatement")
statementExecutionManager.updateStatement(flintStatement)
recordStatementStateChange(flintStatement, statementTimerContext)
}
Expand Down Expand Up @@ -671,8 +682,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintStatement,
sessionId,
startTime))
case e: Exception =>
val error = processQueryException(e, flintStatement)
case t: Throwable =>
val error = processQueryException(t, flintStatement)
Some(
handleCommandFailureAndGetFailedData(
applicationId,
Expand Down Expand Up @@ -747,7 +758,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
startTime))
case NonFatal(e) =>
val error = s"An unexpected error occurred: ${e.getMessage}"
CustomLogging.logError(error, e)
throwableHandler.recordThrowable(error, e)
dataToWrite = Some(
handleCommandFailureAndGetFailedData(
applicationId,
Expand Down Expand Up @@ -786,7 +797,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
queryWaitTimeMillis)
}

logInfo(s"command complete: $flintStatement")
(dataToWrite, verificationResult)
}

Expand Down Expand Up @@ -858,7 +868,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
}
} catch {
case e: Exception => logError(s"Failed to update session state for $sessionId", e)
case t: Throwable =>
throwableHandler.recordThrowable(s"Failed to update session state for $sessionId", t)
}
}
}
Expand Down Expand Up @@ -897,10 +908,10 @@ object FlintREPL extends Logging with FlintJobExecutor {
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
) // Record heartbeat failure metric
// maybe due to invalid sequence number or primary term
case e: Exception =>
CustomLogging.logWarning(
case t: Throwable =>
throwableHandler.recordThrowable(
s"""Fail to update the last update time of the flint instance ${sessionId}""",
e)
t)
incrementCounter(
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
) // Record heartbeat failure metric
Expand Down Expand Up @@ -948,8 +959,10 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
} catch {
// still proceed since we are not sure what happened (e.g., OpenSearch cluster may be unresponsive)
case e: Exception =>
CustomLogging.logError(s"""Fail to find id ${sessionId} from session index.""", e)
case t: Throwable =>
throwableHandler.recordThrowable(
s"""Fail to find id ${sessionId} from session index.""",
t)
true
}
}
Expand Down
Loading

0 comments on commit b66aac0

Please sign in to comment.