From 9eabe67a693c28509dda25b3e5998eb7ca7a3aa9 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 8 Dec 2022 15:51:40 +0900 Subject: [PATCH] [SPARK-38277][SS] Clear write batch after RocksDB state store's commit ### What changes were proposed in this pull request? This PR proposes to clear the write batch (and also corresponding prefix iterators) after commit has succeeded on RocksDB state store. This PR also fixes the test case as side effect, as it had been relying on the "sort of bug" that we didn't clean up write batch till either rollback or load has been called. ### Why are the changes needed? Without this, the memory usage of WriteBatch for RocksDB state store is "accumulated" over the partitions in the same executor. Say, 10 partitions in stateful operator are assigned to an executor and run sequentially. Given that we didn't clear write batch after commit, when the executor processes the last partition assigned to it, 10 WriteBatch instances contain all writes being performed in this microbatch. ### Does this PR introduce _any_ user-facing change? No. This is a sort of bugfix. ### How was this patch tested? Existing tests, with fixing the test case. Closes #38880 from HeartSaVioR/SPARK-38277. Lead-authored-by: Jungtaek Lim Co-authored-by: Yun Tang Signed-off-by: Jungtaek Lim --- .../spark/sql/execution/streaming/state/RocksDB.scala | 7 ++++++- .../sql/execution/streaming/state/RocksDBSuite.scala | 8 ++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 5acd20f49dca5..425fc02e315e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -133,7 +133,8 @@ class RocksDB( if (conf.resetStatsOnLoad) { nativeStats.reset } - // reset resources to prevent side-effects from previous loaded version + // reset resources to prevent side-effects from previous loaded version if it was not cleaned + // up correctly closePrefixScanIterators() resetWriteBatch() logInfo(s"Loaded $version") @@ -319,6 +320,10 @@ class RocksDB( } finally { db.continueBackgroundWork() silentDeleteRecursively(checkpointDir, s"committing $newVersion") + // reset resources as either 1) we already pushed the changes and it has been committed or + // 2) commit has failed and the current version is "invalidated". + closePrefixScanIterators() + resetWriteBatch() release() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 00f9c7b8c001e..dd426b8e92bb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -116,7 +116,11 @@ class RocksDBSuite extends SparkFunSuite { withDB(remoteDir, conf = conf) { db => // Generate versions without cleaning up for (version <- 1 to 50) { - db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + if (version > 1) { + // remove keys we wrote in previous iteration to ensure compaction happens + db.remove((version - 1).toString) + } + db.put(version.toString, version.toString) db.commit() } @@ -132,7 +136,7 @@ class RocksDBSuite extends SparkFunSuite { versionsPresent.foreach { version => db.load(version) val data = db.iterator().map(toStr).toSet - assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet) + assert(data === Set((version.toString, version.toString))) } } }