Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44878][SS] Disable strict limit for RocksDB write manager to avoid insertion exception on cache full #42567

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,10 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.

Note that the `boundedMemoryUsage` config will enable a soft limit on the total memory usage for RocksDB.
So the total memory used by RocksDB can temporarily exceed this value if all blocks allocated to higher level readers are in use.
Enabling a strict limit is not possible at this time since it will cause query failures and we do not support re-balancing of the state across additional nodes.

##### RocksDB State Store Changelog Checkpointing
In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage.
Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ object RocksDBMemoryManager extends Logging {
logInfo(s"Creating RocksDB state store LRU cache with " +
s"total_size=$totalMemoryUsageInBytes")

cache = new LRUCache(totalMemoryUsageInBytes, -1, true, conf.highPriorityPoolRatio)
// SPARK-44878 - avoid using strict limit to prevent insertion exception on cache full.
// Please refer to RocksDB issue here - https://github.com/facebook/rocksdb/issues/8670
cache = new LRUCache(totalMemoryUsageInBytes,
-1,
/* strictCapacityLimit = */false,
conf.highPriorityPoolRatio)

writeBufferManager = new WriteBufferManager(
(totalMemoryUsageInBytes * conf.writeBufferCacheRatio).toLong,
cache)
Expand Down