From 800faf0abfa368ad0a5ef1e0fa44b74dbaab724e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Nov 2024 17:12:11 +0800 Subject: [PATCH] [SPARK-50235][SQL] Clean up ColumnVector resource after processing all rows in ColumnarToRowExec ### What changes were proposed in this pull request? This patch cleans up ColumnVector resource after processing all rows in ColumnarToRowExec. This patch only focus on codeben implementation of ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and currently no good way has proposed, so leaving it to a follow up. ### Why are the changes needed? Currently we only assign null to ColumnarBatch object but it doesn't release the resources hold by the vectors in the batch. For OnHeapColumnVector, the Java arrays may be automatically collected by JVM, but for OffHeapColumnVector, the allocated off-heap memory will be leaked. For custom ColumnVector implementations like Arrow-based, it also possibly causes issues on memory safety if the underlying buffers are reused across batches. Because when ColumnarToRowExec begins to fill values for next batch, the arrays in previous batch are still hold. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #48767 from viirya/close_if_not_writable. Authored-by: Liang-Chi Hsieh Signed-off-by: Kent Yao --- .../apache/spark/sql/vectorized/ColumnVector.java | 12 ++++++++++++ .../apache/spark/sql/vectorized/ColumnarBatch.java | 10 ++++++++++ .../execution/vectorized/WritableColumnVector.java | 5 +++++ .../org/apache/spark/sql/execution/Columnar.scala | 5 +++++ 4 files changed, 32 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index cd3c30fa69335..bfb1833b731a7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -68,6 +68,18 @@ public abstract class ColumnVector implements AutoCloseable { @Override public abstract void close(); + /** + * Cleans up memory for this column vector if it's not writable. The column vector is not usable + * after this. + * + * If this is a writable column vector, it is a no-op. + */ + public void closeIfNotWritable() { + // By default, we just call close() for all column vectors. If a column vector is writable, it + // should override this method and do nothing. + close(); + } + /** * Returns true if this column vector contains any null values. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 9e859e77644ac..52e4115af336a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -45,6 +45,16 @@ public void close() { } } + /** + * Called to close all the columns if they are not writable. This is used to clean up memory + * allocated during columnar processing. + */ + public void closeIfNotWritable() { + for (ColumnVector c: columns) { + c.closeIfNotWritable(); + } + } + /** * Returns an iterator over the rows in this batch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 10594d6c5d340..696e20525cdac 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -96,6 +96,11 @@ public void close() { releaseMemory(); } + @Override + public void closeIfNotWritable() { + // no-op + } + public void reserveAdditional(int additionalCapacity) { reserve(elementsAppended + additionalCapacity); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 111851094a69b..64163da50e13a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w | $shouldStop | } | $idx = $numRows; + | $batch.closeIfNotWritable(); | $batch = null; | $nextBatchFuncName(); |} + |// clean up resources + |if ($batch != null) { + | $batch.close(); + |} """.stripMargin }