Skip to content

Commit

Permalink
Fix iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
miland-db committed Dec 3, 2024
1 parent 8018fec commit 09c132c
Show file tree
Hide file tree
Showing 13 changed files with 707 additions and 56 deletions.
18 changes: 18 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,18 @@
],
"sqlState" : "42614"
},
"DUPLICATE_CONDITION_NAME_FOR_DIFFERENT_SQL_STATE" : {
"message" : [
"Found duplicate condition name <conditionName> for different SQL states. Please, remove one of them."
],
"sqlState" : "42710"
},
"DUPLICATE_HANDLER_FOR_SAME_SQL_STATE" : {
"message" : [
"Found duplicate handlers for the same SQL state <sqlState>. Please, remove one of them."
],
"sqlState" : "42710"
},
"DUPLICATE_KEY" : {
"message" : [
"Found duplicate keys <keyColumn>."
Expand All @@ -1218,6 +1230,12 @@
},
"sqlState" : "4274K"
},
"DUPLICATE_SQL_STATE_FOR_SAME_HANDLER" : {
"message" : [
"Found duplicate SQL state <sqlState> for the same handler. Please, remove one of them."
],
"sqlState" : "42710"
},
"EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
"message" : [
"Previous node emitted a row with eventTime=<emittedRowEventTime> which is older than current_watermark_value=<currentWatermark>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,10 @@ COMPACTIONS: 'COMPACTIONS';
COMPENSATION: 'COMPENSATION';
COMPUTE: 'COMPUTE';
CONCATENATE: 'CONCATENATE';
CONDITION: 'CONDITION';
CONSTRAINT: 'CONSTRAINT';
CONTAINS: 'CONTAINS';
CONTINUE: 'CONTINUE';
COST: 'COST';
CREATE: 'CREATE';
CROSS: 'CROSS';
Expand Down Expand Up @@ -226,6 +228,7 @@ EXCEPT: 'EXCEPT';
EXCHANGE: 'EXCHANGE';
EXCLUDE: 'EXCLUDE';
EXISTS: 'EXISTS';
EXIT: 'EXIT';
EXPLAIN: 'EXPLAIN';
EXPORT: 'EXPORT';
EXTEND: 'EXTEND';
Expand All @@ -244,6 +247,7 @@ FOR: 'FOR';
FOREIGN: 'FOREIGN';
FORMAT: 'FORMAT';
FORMATTED: 'FORMATTED';
FOUND: 'FOUND';
FROM: 'FROM';
FULL: 'FULL';
FUNCTION: 'FUNCTION';
Expand All @@ -253,6 +257,7 @@ GLOBAL: 'GLOBAL';
GRANT: 'GRANT';
GROUP: 'GROUP';
GROUPING: 'GROUPING';
HANDLER: 'HANDLER';
HAVING: 'HAVING';
BINARY_HEX: 'X';
HOUR: 'HOUR';
Expand Down Expand Up @@ -412,6 +417,7 @@ SORTED: 'SORTED';
SOURCE: 'SOURCE';
SPECIFIC: 'SPECIFIC';
SQL: 'SQL';
SQLEXCEPTION: 'SQLEXCEPTION';
START: 'START';
STATISTICS: 'STATISTICS';
STORED: 'STORED';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ compoundBody
;

compoundStatement
: statement
: declareCondition
| declareHandler
| statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
Expand All @@ -73,6 +75,23 @@ compoundStatement
| forStatement
;

conditionValue
: stringLit
| multipartIdentifier
;

conditionValueList
: ((conditionValues+=conditionValue (COMMA conditionValues+=conditionValue)*) | SQLEXCEPTION | NOT FOUND)
;

declareCondition
: DECLARE multipartIdentifier CONDITION (FOR stringLit)?
;

declareHandler
: DECLARE (CONTINUE | EXIT) HANDLER FOR conditionValueList (BEGIN compoundBody END | statement | setStatementWithOptionalVarKeyword)
;

setStatementWithOptionalVarKeyword
: SET variable? assignmentList #setVariableWithOptionalKeyword
| SET variable? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
Expand Down Expand Up @@ -1592,6 +1611,7 @@ ansiNonReserved
| COMPUTE
| CONCATENATE
| CONTAINS
| CONTINUE
| COST
| CUBE
| CURRENT
Expand Down Expand Up @@ -1631,6 +1651,7 @@ ansiNonReserved
| EXCHANGE
| EXCLUDE
| EXISTS
| EXIT
| EXPLAIN
| EXPORT
| EXTEND
Expand All @@ -1644,11 +1665,13 @@ ansiNonReserved
| FOLLOWING
| FORMAT
| FORMATTED
| FOUND
| FUNCTION
| FUNCTIONS
| GENERATED
| GLOBAL
| GROUPING
| HANDLER
| HOUR
| HOURS
| IDENTIFIER_KW
Expand Down Expand Up @@ -1780,6 +1803,7 @@ ansiNonReserved
| SORTED
| SOURCE
| SPECIFIC
| SQLEXCEPTION
| START
| STATISTICS
| STORED
Expand Down Expand Up @@ -1927,8 +1951,10 @@ nonReserved
| COMPENSATION
| COMPUTE
| CONCATENATE
| CONDITION
| CONSTRAINT
| CONTAINS
| CONTINUE
| COST
| CREATE
| CUBE
Expand Down Expand Up @@ -1978,6 +2004,7 @@ nonReserved
| EXCLUDE
| EXECUTE
| EXISTS
| EXIT
| EXPLAIN
| EXPORT
| EXTEND
Expand All @@ -1996,6 +2023,7 @@ nonReserved
| FOREIGN
| FORMAT
| FORMATTED
| FOUND
| FROM
| FUNCTION
| FUNCTIONS
Expand All @@ -2004,6 +2032,7 @@ nonReserved
| GRANT
| GROUP
| GROUPING
| HANDLER
| HAVING
| HOUR
| HOURS
Expand Down Expand Up @@ -2153,6 +2182,7 @@ nonReserved
| SOURCE
| SPECIFIC
| SQL
| SQLEXCEPTION
| START
| STATISTICS
| STORED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, ListBuffer, Set}
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Set}
import scala.jdk.CollectionConverters._
import scala.util.{Left, Right}

Expand Down Expand Up @@ -142,6 +142,52 @@ class AstBuilder extends DataTypeAstBuilder
}
}

override def visitConditionValue(ctx: ConditionValueContext): String = {
Option(ctx.multipartIdentifier()).map(_.getText)
.getOrElse(ctx.stringLit().getText).replace("'", "")
}

override def visitConditionValueList(ctx: ConditionValueListContext): Seq[String] = {
Option(ctx.SQLEXCEPTION()).map(_ => Seq("SQLEXCEPTION")).getOrElse {
Option(ctx.NOT()).map(_ => Seq("NOT FOUND")).getOrElse {
val buff = scala.collection.mutable.Set[String]()
ctx.conditionValues.forEach { conditionValue =>
val elem = visit(conditionValue).asInstanceOf[String]
if (buff(elem)) {
throw SqlScriptingErrors.duplicateSqlStateForSameHandler(CurrentOrigin.get, elem)
}
buff += elem
}
buff.toSeq
}
}
}

override def visitDeclareCondition(ctx: DeclareConditionContext): ErrorCondition = {
val conditionName = ctx.multipartIdentifier().getText
val conditionValue = Option(ctx.stringLit()).map(_.getText.replace("'", "")).getOrElse("45000")

val sqlStateRegex = "^[A-Za-z0-9]{5}$".r
assert(sqlStateRegex.findFirstIn(conditionValue).isDefined)

ErrorCondition(conditionName, conditionValue)
}

def visitDeclareHandlerImpl(
ctx: DeclareHandlerContext, labelCtx: SqlScriptingLabelContext): ErrorHandler = {
val conditions = visit(ctx.conditionValueList()).asInstanceOf[Seq[String]]
val handlerType = Option(ctx.EXIT()).map(_ => HandlerType.EXIT).getOrElse(HandlerType.CONTINUE)

val body = if (!ctx.compoundBody().isEmpty) {
visitCompoundBodyImpl(ctx.compoundBody(), None, allowVarDeclare = true, labelCtx)
} else {
val logicalPlan = visitChildren(ctx).asInstanceOf[LogicalPlan]
CompoundBody(Seq(SingleStatement(parsedPlan = logicalPlan)), None)
}.asInstanceOf[CompoundBody]

ErrorHandler(conditions, body, handlerType)
}

override def visitSingleCompoundStatement(ctx: SingleCompoundStatementContext): CompoundBody = {
val labelCtx = new SqlScriptingLabelContext()
visitCompoundBodyImpl(ctx.compoundBody(), Some("root"), allowVarDeclare = true, labelCtx)
Expand All @@ -153,8 +199,27 @@ class AstBuilder extends DataTypeAstBuilder
allowVarDeclare: Boolean,
labelCtx: SqlScriptingLabelContext): CompoundBody = {
val buff = ListBuffer[CompoundPlanStatement]()
ctx.compoundStatements.forEach(
compoundStatement => buff += visitCompoundStatementImpl(compoundStatement, labelCtx))
val handlers = ListBuffer[ErrorHandler]()
val conditions = HashMap[String, String]()
val sqlStates = Set[String]()

ctx.compoundStatements.forEach(compoundStatement => {
val stmt = visitCompoundStatementImpl(compoundStatement, labelCtx)
stmt match {
case handler: ErrorHandler => handlers += handler
case condition: ErrorCondition =>
if (conditions.contains(condition.conditionName)) {
throw SqlScriptingErrors.duplicateConditionNameForDifferentSqlState(
CurrentOrigin.get, condition.conditionName)
}
conditions += condition.conditionName -> condition.value
sqlStates += condition.value
case s => buff += s
}
})

// ctx.compoundStatements.forEach(
// compoundStatement => buff += visitCompoundStatementImpl(compoundStatement, labelCtx))

val compoundStatements = buff.toList

Expand Down Expand Up @@ -183,7 +248,7 @@ class AstBuilder extends DataTypeAstBuilder
case _ =>
}

CompoundBody(buff.toSeq, label)
CompoundBody(buff.toSeq, label, handlers.toSeq, conditions)
}

private def visitBeginEndCompoundBlockImpl(
Expand Down Expand Up @@ -228,6 +293,8 @@ class AstBuilder extends DataTypeAstBuilder
visitSimpleCaseStatementImpl(simpleCaseContext, labelCtx)
case forStatementContext: ForStatementContext =>
visitForStatementImpl(forStatementContext, labelCtx)
case declareHandlerContext: DeclareHandlerContext =>
visitDeclareHandlerImpl(declareHandlerContext, labelCtx)
case stmt => visit(stmt).asInstanceOf[CompoundPlanStatement]
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.mutable.HashMap

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.HandlerType.HandlerType
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}


Expand Down Expand Up @@ -65,7 +68,9 @@ case class SingleStatement(parsedPlan: LogicalPlan)
*/
case class CompoundBody(
collection: Seq[CompoundPlanStatement],
label: Option[String]) extends Command with CompoundPlanStatement {
label: Option[String],
handlers: Seq[ErrorHandler] = Seq.empty,
conditions: HashMap[String, String] = HashMap()) extends Command with CompoundPlanStatement {

override def children: Seq[LogicalPlan] = collection

Expand Down Expand Up @@ -295,3 +300,53 @@ case class ForStatement(
ForStatement(query, variableName, body, label)
}
}

/**
* Logical operator for an error condition.
* @param conditionName Name of the error condition.
* @param value SQLSTATE or Error Code.
*/
case class ErrorCondition(
conditionName: String,
value: String) extends CompoundPlanStatement {
override def output: Seq[Attribute] = Seq.empty

/**
* Returns a Seq of the children of this node.
* Children should not change. Immutability required for containsChild optimization
*/
override def children: Seq[LogicalPlan] = Seq.empty

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this.copy()
}

object HandlerType extends Enumeration {
type HandlerType = Value
val EXIT, CONTINUE = Value
}

/**
* Logical operator for an error condition.
* @param conditions Name of the error condition variable for which the handler is built.
* @param body CompoundBody of the handler.
* @param handlerType Type of the handler (CONTINUE or EXIT).
*/
case class ErrorHandler(
conditions: Seq[String],
body: CompoundBody,
handlerType: HandlerType) extends CompoundPlanStatement {
override def output: Seq[Attribute] = Seq.empty

/**
* Returns a Seq of the children of this node.
* Children should not change. Immutability required for containsChild optimization
*/
override def children: Seq[LogicalPlan] = Seq(body)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = {
assert(newChildren.length == 1)
ErrorHandler(conditions, newChildren(0).asInstanceOf[CompoundBody], handlerType)
}
}
Loading

0 comments on commit 09c132c

Please sign in to comment.