Skip to content

Commit

Permalink
refactor collect() to toLocalIterator()
Browse files Browse the repository at this point in the history
  • Loading branch information
dusantism-db committed Nov 26, 2024
1 parent 54271d0 commit 223612e
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}
import org.apache.spark.sql.errors.SqlScriptingErrors
import org.apache.spark.sql.types.BooleanType

import java.util

/**
* Trait for all SQL scripting execution nodes used during interpretation phase.
*/
Expand Down Expand Up @@ -668,7 +670,6 @@ class ForStatementExec(
val VariableAssignment, Body, VariableCleanup = Value
}
private var state = ForState.VariableAssignment
private var currRow = 0
private var areVariablesDeclared = false

// map of all variables created internally by the for statement
Expand All @@ -678,11 +679,11 @@ class ForStatementExec(
// compound body used for dropping variables while in ForState.VariableAssignment
private var dropVariablesExec: CompoundBodyExec = null

private var queryResult: Array[Row] = null
private var queryResult: util.Iterator[Row] = _
private var isResultCacheValid = false
private def cachedQueryResult(): Array[Row] = {
private def cachedQueryResult(): util.Iterator[Row] = {
if (!isResultCacheValid) {
queryResult = query.buildDataFrame(session).collect()
queryResult = query.buildDataFrame(session).toLocalIterator()
query.isExecuted = true
isResultCacheValid = true
}
Expand All @@ -697,16 +698,16 @@ class ForStatementExec(
private lazy val treeIterator: Iterator[CompoundStatementExec] =
new Iterator[CompoundStatementExec] {

override def hasNext: Boolean = {
val resultSize = cachedQueryResult().length
(state == ForState.VariableCleanup && dropVariablesExec.getTreeIterator.hasNext) ||
(!interrupted && resultSize > 0 && currRow < resultSize)
}
override def hasNext: Boolean = !interrupted && (state match {
case ForState.VariableAssignment => cachedQueryResult().hasNext
case ForState.Body => true
case ForState.VariableCleanup => dropVariablesExec.getTreeIterator.hasNext
})

override def next(): CompoundStatementExec = state match {

case ForState.VariableAssignment =>
variablesMap = createVariablesMapFromRow(cachedQueryResult()(currRow))
variablesMap = createVariablesMapFromRow(cachedQueryResult().next())

if (!areVariablesDeclared) {
// create and execute declare var statements
Expand Down Expand Up @@ -821,8 +822,7 @@ class ForStatementExec(
}

private def switchStateFromBody(): Unit = {
currRow += 1
state = if (currRow < cachedQueryResult().length) ForState.VariableAssignment
state = if (cachedQueryResult().hasNext) ForState.VariableAssignment
else {
// create compound body for dropping nodes after execution is complete
dropVariablesExec = new CompoundBodyExec(
Expand Down Expand Up @@ -862,7 +862,6 @@ class ForStatementExec(
override def reset(): Unit = {
state = ForState.VariableAssignment
isResultCacheValid = false
currRow = 0
variablesMap = Map()
areVariablesDeclared = false
dropVariablesExec = null
Expand Down

0 comments on commit 223612e

Please sign in to comment.