Skip to content

Commit

Permalink
Add session to be constructor argument for SqlScriptingInterpreter
Browse files Browse the repository at this point in the history
  • Loading branch information
miland-db committed Nov 25, 2024
1 parent c7f79b6 commit b25ee02
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SqlScriptingExecution(

// Build the execution plan for the script
private val executionPlan: Iterator[CompoundStatementExec] =
SqlScriptingInterpreter().buildExecutionPlan(sqlScript, session, args)
SqlScriptingInterpreter(session).buildExecutionPlan(sqlScript, args)

private var current = getNextResult

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import org.apache.spark.sql.catalyst.trees.Origin

/**
* SQL scripting interpreter - builds SQL script execution plan.
*
* @param session
* Spark session that SQL script is executed within.
*/
case class SqlScriptingInterpreter() {
case class SqlScriptingInterpreter(session: SparkSession) {

/**
* Build execution plan and return statements that need to be executed,
* wrapped in the execution node.
*
* @param compound
* CompoundBody for which to build the plan.
* @param session
* Spark session that SQL script is executed within.
* @return
* Iterator through collection of statements to be executed.
*/
def buildExecutionPlan(
compound: CompoundBody,
session: SparkSession,
args: Map[String, Expression]): Iterator[CompoundStatementExec] = {
transformTreeIntoExecutable(compound, session, args)
transformTreeIntoExecutable(compound, args)
.asInstanceOf[CompoundBodyExec].getTreeIterator
}

Expand All @@ -65,14 +65,11 @@ case class SqlScriptingInterpreter() {
*
* @param node
* Root node of the parsed tree.
* @param session
* Spark session that SQL script is executed within.
* @return
* Executable statement.
*/
private def transformTreeIntoExecutable(
node: CompoundPlanStatement,
session: SparkSession,
args: Map[String, Expression]): CompoundStatementExec =
node match {
case CompoundBody(collection, label) =>
Expand All @@ -86,7 +83,7 @@ case class SqlScriptingInterpreter() {
.map(new SingleStatementExec(_, Origin(), args, isInternal = true))
.reverse
new CompoundBodyExec(
collection.map(st => transformTreeIntoExecutable(st, session, args)) ++ dropVariables,
collection.map(st => transformTreeIntoExecutable(st, args)) ++ dropVariables,
label)

case IfElseStatement(conditions, conditionalBodies, elseBody) =>
Expand All @@ -97,9 +94,9 @@ case class SqlScriptingInterpreter() {
args,
isInternal = false))
val conditionalBodiesExec = conditionalBodies.map(body =>
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec])
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec])
val unconditionalBodiesExec = elseBody.map(body =>
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec])
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec])
new IfElseStatementExec(
conditionsExec, conditionalBodiesExec, unconditionalBodiesExec, session)

Expand All @@ -112,9 +109,9 @@ case class SqlScriptingInterpreter() {
args,
isInternal = false))
val conditionalBodiesExec = conditionalBodies.map(body =>
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec])
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec])
val unconditionalBodiesExec = elseBody.map(body =>
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec])
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec])
new CaseStatementExec(
conditionsExec, conditionalBodiesExec, unconditionalBodiesExec, session)

Expand All @@ -126,7 +123,7 @@ case class SqlScriptingInterpreter() {
args,
isInternal = false)
val bodyExec =
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec]
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec]
new WhileStatementExec(conditionExec, bodyExec, label, session)

case RepeatStatement(condition, body, label) =>
Expand All @@ -137,11 +134,11 @@ case class SqlScriptingInterpreter() {
args,
isInternal = false)
val bodyExec =
transformTreeIntoExecutable(body, session, args).asInstanceOf[CompoundBodyExec]
transformTreeIntoExecutable(body, args).asInstanceOf[CompoundBodyExec]
new RepeatStatementExec(conditionExec, bodyExec, label, session)

case LoopStatement(body, label) =>
val bodyExec = transformTreeIntoExecutable(body, session, args)
val bodyExec = transformTreeIntoExecutable(body, args)
.asInstanceOf[CompoundBodyExec]
new LoopStatementExec(bodyExec, label)

Expand Down
Loading

0 comments on commit b25ee02

Please sign in to comment.