Skip to content

Commit

Permalink
Address comments v1
Browse files Browse the repository at this point in the history
  • Loading branch information
miland-db committed Aug 12, 2024
1 parent 5ad3474 commit e8d9506
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 52 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,12 @@
],
"sqlState" : "42614"
},
"DUPLICATE_CONDITION_NAME_FOR_DIFFERENT_SQL_STATE" : {
"message" : [
"Found duplicate condition name <conditionName> for different SQL states. Please, remove one of them."
],
"sqlState" : "42710"
},
"DUPLICATE_HANDLER_FOR_SAME_SQL_STATE" : {
"message" : [
"Found duplicate handlers for the same SQL state <sqlState>. Please, remove one of them."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ class AstBuilder extends DataTypeAstBuilder
stmt match {
case handler: ErrorHandler => handlers += handler
case condition: ErrorCondition =>
assert(!conditions.contains(condition.conditionName)) // Check for duplicate names.
assert(!sqlStates.contains(condition.value)) // Check for duplicate sqlStates.
if (conditions.contains(condition.conditionName)) {
throw SqlScriptingErrors.duplicateConditionNameForDifferentSqlState(
CurrentOrigin.get, condition.conditionName)
}
conditions += condition.conditionName -> condition.value
sqlStates += condition.value
case s => buff += s
Expand Down Expand Up @@ -229,11 +231,7 @@ class AstBuilder extends DataTypeAstBuilder
.map { s =>
SingleStatement(parsedPlan = visit(s).asInstanceOf[LogicalPlan])
}.getOrElse {
val stmt = Option(ctx.beginEndCompoundBlock()).
getOrElse(Option(ctx.declareHandler()).
getOrElse(Option(ctx.declareCondition()).
getOrElse(ctx.ifElseStatement())))
visit(stmt).asInstanceOf[CompoundPlanStatement]
visitChildren(ctx).asInstanceOf[CompoundPlanStatement]
}
}

Expand All @@ -245,15 +243,13 @@ class AstBuilder extends DataTypeAstBuilder
override def visitConditionValueList(ctx: ConditionValueListContext): Seq[String] = {
Option(ctx.SQLEXCEPTION()).map(_ => Seq("SQLEXCEPTION")).getOrElse {
Option(ctx.NOT()).map(_ => Seq("NOT FOUND")).getOrElse {
val buff = ListBuffer[String]()
val seen = scala.collection.mutable.Set[String]()
val buff = scala.collection.mutable.Set[String]()
ctx.conditionValues.forEach { conditionValue =>
val elem = visit(conditionValue).asInstanceOf[String]
if (seen(elem)) {
if (buff(elem)) {
throw SqlScriptingErrors.duplicateSqlStateForSameHandler(CurrentOrigin.get, elem)
}
buff += elem
seen += elem
}
buff.toSeq
}
Expand All @@ -275,8 +271,7 @@ class AstBuilder extends DataTypeAstBuilder

override def visitDeclareCondition(ctx: DeclareConditionContext): ErrorCondition = {
val conditionName = ctx.multipartIdentifier().getText
val conditionValue = Option(ctx.stringLit()).map(_.getText).getOrElse("'45000'").
replace("'", "")
val conditionValue = Option(ctx.stringLit()).map(_.getText.replace("'", "")).getOrElse("45000")

val sqlStateRegex = "^[A-Za-z0-9]{5}$".r
assert(sqlStateRegex.findFirstIn(conditionValue).isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ case class SingleStatement(parsedPlan: LogicalPlan)
* @param label Label set to CompoundBody by user or UUID otherwise.
* It can be None in case when CompoundBody is not part of BeginEndCompoundBlock
* for example when CompoundBody is inside loop or conditional block.
* @param handlers Collection of handlers defined in the compound body.
* @param conditions Map of Condition Name - Sql State values declared in the compound body.
*/
case class CompoundBody(
collection: Seq[CompoundPlanStatement],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ private[sql] object SqlScriptingErrors {
messageParameters = Map("sqlState" -> sqlState))
}

def duplicateConditionNameForDifferentSqlState(
origin: Origin,
conditionName: String): Throwable = {
new SqlScriptingException(
origin = origin,
errorClass = "DUPLICATE_CONDITION_NAME_FOR_DIFFERENT_SQL_STATE",
cause = null,
messageParameters = Map("conditionName" -> conditionName))
}

def variableDeclarationNotAllowedInScope(
origin: Origin,
varName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,25 @@ sealed trait CompoundStatementExec extends Logging {
*/
trait LeafStatementExec extends CompoundStatementExec {

/**
* Execute the statement.
* @param session Spark session.
*/
def execute(session: SparkSession): Unit

/**
* Whether an error was raised during the execution of this statement.
*/
/** Whether an error was raised during the execution of this statement. */
var raisedError: Boolean = false

/**
* Error state of the statement.
*/
var errorState: Option[String] = None

/**
* Error raised during statement execution.
*/
/** Error raised during statement execution. */
var error: Option[SparkThrowable] = None

/** Throwable to rethrow after the statement execution if the error is not handled. */
var rethrow: Option[Throwable] = None

/**
* Throwable to rethrow after the statement execution if the error is not handled.
* Execute the statement.
* @param session Spark session.
*/
var rethrow: Option[Throwable] = None
def execute(session: SparkSession): Unit
}

/**
Expand Down Expand Up @@ -141,9 +135,7 @@ class SingleStatementExec(
val shouldCollectResult: Boolean = false)
extends LeafStatementExec with WithOrigin {

/**
* Data returned after execution.
*/
/** Data returned after execution. */
var result: Option[Array[Row]] = None

/**
Expand All @@ -164,7 +156,7 @@ class SingleStatementExec(
result = None // Should we do this?
}

def execute(session: SparkSession): Unit = {
override def execute(session: SparkSession): Unit = {
try {
val rows = Some(Dataset.ofRows(session, parsedPlan).collect())
if (shouldCollectResult) {
Expand Down Expand Up @@ -340,18 +332,6 @@ class LeaveStatementExec(val label: String) extends LeafStatementExec {
override def reset(): Unit = used = false
}

/**
* Executable node for Continue statement.
*/
class ContinueStatementExec() extends LeafStatementExec {

var used: Boolean = false

override def execute(session: SparkSession): Unit = ()

override def reset(): Unit = used = false
}

/**
* Executable node for IfElseStatement.
* @param conditions Collection of executable conditions. First condition corresponds to IF clause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.scripting

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
Expand All @@ -39,12 +38,10 @@ case class SqlScriptingInterpreter(session: SparkSession) {
*
* @param compound
* CompoundBody for which to build the plan.
* @param session
* Spark session that SQL script is executed within.
* @return
* Iterator through collection of statements to be executed.
*/
def buildExecutionPlan(compound: CompoundBody): Iterator[CompoundStatementExec] = {
private def buildExecutionPlan(compound: CompoundBody): Iterator[CompoundStatementExec] = {
transformTreeIntoExecutable(compound).asInstanceOf[CompoundBodyExec].getTreeIterator
}

Expand All @@ -61,10 +58,22 @@ case class SqlScriptingInterpreter(session: SparkSession) {
case _ => None
}

/**
* Fetch the name of the Create Variable plan.
* @param compoundBody
* CompoundBody to be transformed into CompoundBodyExec.
* @param isExitHandler
* Flag to indicate if the body is an exit handler body to add leave statement at the end.
* @param exitHandlerLabel
* If body is an exit handler body, this is the label of surrounding CompoundBody
* that should be exited.
* @return
* Executable version of the CompoundBody .
*/
private def transformBodyIntoExec(
compoundBody: CompoundBody,
isExitHandler: Boolean = false,
label: String = ""): CompoundBodyExec = {
exitHandlerLabel: String = ""): CompoundBodyExec = {
val variables = compoundBody.collection.flatMap {
case st: SingleStatement => getDeclareVarNameFromPlan(st.parsedPlan)
case _ => None
Expand All @@ -75,7 +84,6 @@ case class SqlScriptingInterpreter(session: SparkSession) {
.reverse

val conditionHandlerMap = mutable.HashMap[String, ErrorHandlerExec]()
val handlers = ListBuffer[ErrorHandlerExec]()
compoundBody.handlers.foreach(handler => {
val handlerBodyExec =
transformBodyIntoExec(handler.body,
Expand All @@ -92,12 +100,11 @@ case class SqlScriptingInterpreter(session: SparkSession) {
case None => conditionHandlerMap.put(conditionValue, handlerExec)
}
})

handlers += handlerExec
})

if (isExitHandler) {
val leave = new LeaveStatementExec(label)
// Create leave statement to exit the surrounding CompoundBody after handler execution.
val leave = new LeaveStatementExec(exitHandlerLabel)
val statements = compoundBody.collection.map(st => transformTreeIntoExecutable(st)) ++
dropVariables :+ leave

Expand Down

0 comments on commit e8d9506

Please sign in to comment.