Skip to content

Commit

Permalink
Address comments v2
Browse files Browse the repository at this point in the history
  • Loading branch information
miland-db committed Aug 12, 2024
1 parent e8d9506 commit dc7f521
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ trait LeafStatementExec extends CompoundStatementExec {
/** Whether an error was raised during the execution of this statement. */
var raisedError: Boolean = false

/**
* Error state of the statement.
*/
/** Error state of the statement. */
var errorState: Option[String] = None

/** Error raised during statement execution. */
Expand All @@ -68,6 +66,13 @@ trait LeafStatementExec extends CompoundStatementExec {
* @param session Spark session.
*/
def execute(session: SparkSession): Unit

override def reset(): Unit = {
raisedError = false
errorState = None
error = None
rethrow = None
}
}

/**
Expand Down Expand Up @@ -149,11 +154,8 @@ class SingleStatementExec(
}

override def reset(): Unit = {
raisedError = false
errorState = None
error = None
rethrow = None
result = None // Should we do this?
super.reset()
result = None
}

override def execute(session: SparkSession): Unit = {
Expand Down Expand Up @@ -188,12 +190,17 @@ class SingleStatementExec(
* Spark session.
*/
class CompoundBodyExec(
label: Option[String] = None,
statements: Seq[CompoundStatementExec],
conditionHandlerMap: mutable.HashMap[String, ErrorHandlerExec] = mutable.HashMap(),
session: SparkSession)
statements: Seq[CompoundStatementExec],
session: SparkSession,
label: Option[String] = None,
conditionHandlerMap: mutable.HashMap[String, ErrorHandlerExec] = mutable.HashMap())
extends NonLeafStatementExec {

/**
* Get handler to handle error given by condition.
* @param condition SqlState of the error raised during statement execution.
* @return Corresponding error handler executable node.
*/
private def getHandler(condition: String): Option[ErrorHandlerExec] = {
conditionHandlerMap.get(condition)
.orElse(conditionHandlerMap.get("NOT FOUND") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,27 @@ case class SqlScriptingInterpreter(session: SparkSession) {
.map(new SingleStatementExec(_, Origin(), isInternal = true))
.reverse

// Create a map of conditions (SqlStates) to their respective handlers.
val conditionHandlerMap = mutable.HashMap[String, ErrorHandlerExec]()
compoundBody.handlers.foreach(handler => {
val handlerBodyExec =
transformBodyIntoExec(handler.body,
handler.handlerType == HandlerType.EXIT,
compoundBody.label.get)

// Execution node of handler.
val handlerExec = new ErrorHandlerExec(handlerBodyExec)

// For each condition handler is defined for, add corresponding key value pair
// to the conditionHandlerMap.
handler.conditions.foreach(condition => {
// Condition can either be the key in conditions map or SqlState.
val conditionValue = compoundBody.conditions.getOrElse(condition, condition)
conditionHandlerMap.get(conditionValue) match {
case Some(_) =>
throw SqlScriptingErrors.duplicateHandlerForSameSqlState(
CurrentOrigin.get, conditionValue)
case None => conditionHandlerMap.put(conditionValue, handlerExec)
if (conditionHandlerMap.contains(conditionValue)) {
throw SqlScriptingErrors.duplicateHandlerForSameSqlState(
CurrentOrigin.get, conditionValue)
} else {
conditionHandlerMap.put(conditionValue, handlerExec)
}
})
})
Expand All @@ -108,18 +114,14 @@ case class SqlScriptingInterpreter(session: SparkSession) {
val statements = compoundBody.collection.map(st => transformTreeIntoExecutable(st)) ++
dropVariables :+ leave

return new CompoundBodyExec(
compoundBody.label,
statements,
conditionHandlerMap,
session)
return new CompoundBodyExec(statements, session, compoundBody.label, conditionHandlerMap)
}

new CompoundBodyExec(
compoundBody.label,
compoundBody.collection.map(st => transformTreeIntoExecutable(st)) ++ dropVariables,
conditionHandlerMap,
session)
session,
compoundBody.label,
conditionHandlerMap)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
}

case class TestBody(statements: Seq[CompoundStatementExec])
extends CompoundBodyExec(None, statements, mutable.HashMap(), null)
extends CompoundBodyExec(statements, null, None, mutable.HashMap())

case class TestSparkStatementWithPlan(testVal: String)
case class TestIfElseCondition(condVal: Boolean, description: String)
Expand Down

0 comments on commit dc7f521

Please sign in to comment.