Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
dusantism-db committed Nov 19, 2024
1 parent f4e9684 commit 446fc05
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ case class LoopStatement(

/**
* Logical operator for FOR statement.
* @param query Query which is executed once, then it's result is iterated on, row by row
* @param variableName Name of variable which is used to access the current row during iteration
* @param body Compound body is a collection of statements that are executed once for each row in
* the result set of the query
* @param query Query which is executed once, then it's result set is iterated on, row by row.
* @param variableName Name of variable which is used to access the current row during iteration.
* @param body Compound body is a collection of statements that are executed for each row in
* the result set of the query.
* @param label An optional label for the loop which is unique amongst all labels for statements
* within which the FOR statement is contained.
* If an end label is specified it must match the beginning label.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.contains("lbl"))
}

test("for statement no label") {
test("for statement - no label") {
val sqlScriptText =
"""
|BEGIN
Expand All @@ -1655,7 +1655,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.isDefined)
}

test("for statement with complex subquery") {
test("for statement - with complex subquery") {
val sqlScriptText =
"""
|BEGIN
Expand Down Expand Up @@ -1683,7 +1683,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.contains("lbl"))
}

test("nested for statement") {
test("for statement - nested") {
val sqlScriptText =
"""
|BEGIN
Expand Down Expand Up @@ -1720,7 +1720,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
head.asInstanceOf[SingleStatement].getText == "SELECT i + j")
}

test("for statement no variable") {
test("for statement - no variable") {
val sqlScriptText =
"""
|BEGIN
Expand All @@ -1745,7 +1745,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.contains("lbl"))
}

test("for statement no label no variable") {
test("for statement - no variable - no label") {
val sqlScriptText =
"""
|BEGIN
Expand All @@ -1771,7 +1771,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.isDefined)
}

test("for statement with complex subquery no variable") {
test("for statement - no variable - with complex subquery") {
val sqlScriptText =
"""
|BEGIN
Expand Down Expand Up @@ -1799,7 +1799,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(forStmt.label.contains("lbl"))
}

test("nested for statement no variable") {
test("for statement - no variable - nested") {
val sqlScriptText =
"""
|BEGIN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class SingleStatementExec(
var isExecuted = false

/**
* Builds a DataFrame from the parsedPlan of this SingleStatementExec
* @param session The SparkSession on which the parsedPlan is built
* Builds a DataFrame from the parsedPlan of this SingleStatementExec.
* @param session The SparkSession used.
* @return
* The DataFrame.
*/
Expand Down Expand Up @@ -689,7 +689,7 @@ class ForStatementExec(
// (variableName -> variableExpression)
private var variablesMap: Map[String, Expression] = Map()

// compound body used for dropping variables
// compound body used for dropping variables while in ForState.VariableAssignment
private var dropVariablesExec: CompoundBodyExec = null

private var queryResult: Array[Row] = null
Expand All @@ -703,22 +703,23 @@ class ForStatementExec(
}

/**
* Loop can be interrupted by LeaveStatementExec
* For can be interrupted by LeaveStatementExec
*/
private var interrupted: Boolean = false

private lazy val treeIterator: Iterator[CompoundStatementExec] =
new Iterator[CompoundStatementExec] {

override def hasNext: Boolean = {
val resultSize = cachedQueryResult().length
state == ForState.VariableCleanup ||
(state == ForState.VariableCleanup && dropVariablesExec.getTreeIterator.hasNext) ||
(!interrupted && resultSize > 0 && currRow < resultSize)
}

override def next(): CompoundStatementExec = state match {

case ForState.VariableAssignment =>
variablesMap = createVariablesMapFromRow(currRow)
variablesMap = createVariablesMapFromRow(cachedQueryResult()(currRow))

if (!areVariablesDeclared) {
// create and execute declare var statements
Expand All @@ -727,9 +728,12 @@ class ForStatementExec(
.foreach(declareVarExec => declareVarExec.buildDataFrame(session).collect())
areVariablesDeclared = true
}

// create and execute set var statements
variablesMap.keys.toSeq
.map(colName => createSetVarExec(colName, variablesMap(colName)))
.foreach(setVarExec => setVarExec.buildDataFrame(session).collect())

state = ForState.Body
body.reset()
next()
Expand Down Expand Up @@ -771,17 +775,12 @@ class ForStatementExec(
retStmt

case ForState.VariableCleanup =>
val ret = dropVariablesExec.getTreeIterator.next()
if (!dropVariablesExec.getTreeIterator.hasNext) {
// stops execution, as at this point currRow == resultSize
state = ForState.VariableAssignment
}
ret
dropVariablesExec.getTreeIterator.next()
}
}

/**
* Creates a Catalyst expression from Scala value.<br>
* Recursively creates a Catalyst expression from Scala value.<br>
* See https://spark.apache.org/docs/latest/sql-ref-datatypes.html for Spark -> Scala mappings
*/
private def createExpressionFromValue(value: Any): Expression = value match {
Expand All @@ -792,7 +791,7 @@ class ForStatementExec(
}
CreateMap(mapArgs, useStringTypeWhenEmpty = false)

// structs match this case
// structs and rows match this case
case s: Row =>
// arguments of CreateNamedStruct are in the format: (name1, val1, name2, val2, ...)
val namedStructArgs = s.schema.names.toSeq.flatMap { colName =>
Expand All @@ -808,8 +807,7 @@ class ForStatementExec(
case _ => Literal(value)
}

private def createVariablesMapFromRow(rowIndex: Int): Map[String, Expression] = {
val row = cachedQueryResult()(rowIndex)
private def createVariablesMapFromRow(row: Row): Map[String, Expression] = {
var variablesMap = row.schema.names.toSeq.map { colName =>
colName -> createExpressionFromValue(row.getAs(colName))
}.toMap
Expand All @@ -824,6 +822,9 @@ class ForStatementExec(
variablesMap
}

/**
* Create and immediately execute dropVariable exec nodes for all variables in variablesMap.
*/
private def dropVars(): Unit = {
variablesMap.keys.toSeq
.map(colName => createDropVarExec(colName))
Expand All @@ -835,6 +836,7 @@ class ForStatementExec(
currRow += 1
state = if (currRow < cachedQueryResult().length) ForState.VariableAssignment
else {
// create compound body for dropping nodes after execution is complete
dropVariablesExec = new CompoundBodyExec(
variablesMap.keys.toSeq.map(colName => createDropVarExec(colName))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
statement: LeafStatementExec): Boolean = evaluator.evaluateLoopBooleanCondition()
}

case class TestForStatementQuery(numberOfRows: Int, columnName: String, description: String)
case class MockQuery(numberOfRows: Int, columnName: String, description: String)
extends SingleStatementExec(
DummyLogicalPlan(),
Origin(startIndex = Some(0), stopIndex = Some(description.length)),
Expand Down Expand Up @@ -709,7 +709,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - enters body once") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(1, "intCol", "query1"),
query = MockQuery(1, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(TestLeafStatement("body"))),
label = Some("for1"),
Expand All @@ -727,7 +727,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - enters body with multiple statements multiple times") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -750,7 +750,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - empty result") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(0, "intCol", "query1"),
query = MockQuery(0, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
label = Some("for1"),
Expand All @@ -764,11 +764,11 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - nested") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol1", "query2"),
query = MockQuery(2, "intCol1", "query2"),
variableName = Some("y"),
body = new CompoundBodyExec(Seq(TestLeafStatement("body"))),
label = Some("for2"),
Expand Down Expand Up @@ -797,7 +797,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - enters body once") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(1, "intCol", "query1"),
query = MockQuery(1, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(TestLeafStatement("body"))),
label = Some("for1"),
Expand All @@ -814,7 +814,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - enters body with multiple statements multiple times") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -833,7 +833,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - empty result") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(0, "intCol", "query1"),
query = MockQuery(0, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
label = Some("for1"),
Expand All @@ -847,11 +847,11 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - nested") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol1", "query2"),
query = MockQuery(2, "intCol1", "query2"),
variableName = None,
body = new CompoundBodyExec(Seq(TestLeafStatement("body"))),
label = Some("for2"),
Expand All @@ -875,7 +875,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - iterate") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -899,7 +899,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - leave") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -919,12 +919,12 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - nested - iterate outer loop") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("outer_body"),
new ForStatementExec(
query = TestForStatementQuery(2, "intCol1", "query2"),
query = MockQuery(2, "intCol1", "query2"),
variableName = Some("y"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("body1"),
Expand Down Expand Up @@ -954,11 +954,11 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement - nested - leave outer loop") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = Some("x"),
body = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query2"),
query = MockQuery(2, "intCol", "query2"),
variableName = Some("y"),
body = new CompoundBodyExec(Seq(
TestLeafStatement("body1"),
Expand All @@ -982,7 +982,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - iterate") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -1002,7 +1002,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - leave") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("statement1"),
Expand All @@ -1019,12 +1019,12 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - nested - iterate outer loop") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("outer_body"),
new ForStatementExec(
query = TestForStatementQuery(2, "intCol1", "query2"),
query = MockQuery(2, "intCol1", "query2"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("body1"),
Expand All @@ -1048,11 +1048,11 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite with SharedSparkSessi
test("for statement no variable - nested - leave outer loop") {
val iter = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol", "query1"),
query = MockQuery(2, "intCol", "query1"),
variableName = None,
body = new CompoundBodyExec(Seq(
new ForStatementExec(
query = TestForStatementQuery(2, "intCol1", "query2"),
query = MockQuery(2, "intCol1", "query2"),
variableName = None,
body = new CompoundBodyExec(Seq(
TestLeafStatement("body1"),
Expand Down

0 comments on commit 446fc05

Please sign in to comment.