Skip to content

Commit

Permalink
[SPARK-38277][SS] Clear write batch after RocksDB state store's commit
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to clear the write batch (and also corresponding prefix iterators) after commit has succeeded on RocksDB state store. This PR also fixes the test case as side effect, as it had been relying on the "sort of bug" that we didn't clean up write batch till either rollback or load has been called.

### Why are the changes needed?

Without this, the memory usage of WriteBatch for RocksDB state store is "accumulated" over the partitions in the same executor. Say, 10 partitions in stateful operator are assigned to an executor and run sequentially. Given that we didn't clear write batch after commit, when the executor processes the last partition assigned to it, 10 WriteBatch instances contain all writes being performed in this microbatch.

### Does this PR introduce _any_ user-facing change?

No. This is a sort of bugfix.

### How was this patch tested?

Existing tests, with fixing the test case.

Closes apache#38880 from HeartSaVioR/SPARK-38277.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Yun Tang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR and Myasuka committed Dec 8, 2022
1 parent 5b99c49 commit 9eabe67
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class RocksDB(
if (conf.resetStatsOnLoad) {
nativeStats.reset
}
// reset resources to prevent side-effects from previous loaded version
// reset resources to prevent side-effects from previous loaded version if it was not cleaned
// up correctly
closePrefixScanIterators()
resetWriteBatch()
logInfo(s"Loaded $version")
Expand Down Expand Up @@ -319,6 +320,10 @@ class RocksDB(
} finally {
db.continueBackgroundWork()
silentDeleteRecursively(checkpointDir, s"committing $newVersion")
// reset resources as either 1) we already pushed the changes and it has been committed or
// 2) commit has failed and the current version is "invalidated".
closePrefixScanIterators()
resetWriteBatch()
release()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ class RocksDBSuite extends SparkFunSuite {
withDB(remoteDir, conf = conf) { db =>
// Generate versions without cleaning up
for (version <- 1 to 50) {
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
if (version > 1) {
// remove keys we wrote in previous iteration to ensure compaction happens
db.remove((version - 1).toString)
}
db.put(version.toString, version.toString)
db.commit()
}

Expand All @@ -132,7 +136,7 @@ class RocksDBSuite extends SparkFunSuite {
versionsPresent.foreach { version =>
db.load(version)
val data = db.iterator().map(toStr).toSet
assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet)
assert(data === Set((version.toString, version.toString)))
}
}
}
Expand Down

0 comments on commit 9eabe67

Please sign in to comment.