Skip to content

Commit

Permalink
[SPARK-45133][CONNECT] Make Spark Connect queries be FINISHED when la…
Browse files Browse the repository at this point in the history
…st result task is finished

### What changes were proposed in this pull request?

In the situation before, query will only be FINISHED when all results have been pushed into the output buffers (not necessarily received by client, but pushed out of the server).

For LocalTableScanExec, post FINISHED before sending result batches, because nothing is executed, only cached local results are returned. For regular execution, post FINISHED after all task results have been returned from Spark, not after they have been processed and sent out.

### Why are the changes needed?

Currently, even if a query finished running in Spark, it keeps being RUNNING until all results are sent. Then there is a very small difference between FINISHED and CLOSED. This change makes it behave more similar to e.g. Thriftserver.

### Does this PR introduce _any_ user-facing change?

Yes. Queries will be posted as FINISHED when they finish executing, not when they finish sending results.

### How was this patch tested?

Will add test in #42560

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42889 from juliuszsompolski/SPARK-45133.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
juliuszsompolski authored and HyukjinKwon committed Sep 13, 2023
1 parent ef89b27 commit c119b8a
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
errorOnDuplicatedFieldNames = false)

var numSent = 0
var totalNumRows: Long = 0
def sendBatch(bytes: Array[Byte], count: Long): Unit = {
val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
val batch = proto.ExecutePlanResponse.ArrowBatch
Expand All @@ -121,15 +120,14 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
numSent += 1
totalNumRows += count
}

dataframe.queryExecution.executedPlan match {
case LocalTableScanExec(_, rows) =>
executePlan.eventsManager.postFinished(Some(rows.length))
converter(rows.iterator).foreach { case (bytes, count) =>
sendBatch(bytes, count)
}
executePlan.eventsManager.postFinished(Some(totalNumRows))
case _ =>
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
val rows = dataframe.queryExecution.executedPlan.execute()
Expand All @@ -142,6 +140,8 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)

val signal = new Object
val partitions = new Array[Array[Batch]](numPartitions)
var numFinishedPartitions = 0
var totalNumRows: Long = 0
var error: Option[Throwable] = None

// This callback is executed by the DAGScheduler thread.
Expand All @@ -150,6 +150,12 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
signal.synchronized {
partitions(partitionId) = partition
totalNumRows += partition.map(_._2).sum
numFinishedPartitions += 1
if (numFinishedPartitions == numPartitions) {
// Execution is finished, when all partitions returned results.
executePlan.eventsManager.postFinished(Some(totalNumRows))
}
signal.notify()
}
()
Expand Down Expand Up @@ -201,9 +207,8 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
currentPartitionId += 1
}
ThreadUtils.awaitReady(future, Duration.Inf)
executePlan.eventsManager.postFinished(Some(totalNumRows))
} else {
executePlan.eventsManager.postFinished(Some(totalNumRows))
executePlan.eventsManager.postFinished(Some(0))
}
}
}
Expand Down

0 comments on commit c119b8a

Please sign in to comment.