From 0ebe17aa8669f5fb3af21a928ee1f40547e0d5e0 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Fri, 18 Aug 2023 12:47:04 -0700 Subject: [PATCH 1/2] [SPARK-44878] Disable strict limit for RocksDB write manager to avoid insertion exception on cache full --- .../execution/streaming/state/RocksDBMemoryManager.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 4766dcd0b0cca..38b9dc56838ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -50,7 +50,13 @@ object RocksDBMemoryManager extends Logging { logInfo(s"Creating RocksDB state store LRU cache with " + s"total_size=$totalMemoryUsageInBytes") - cache = new LRUCache(totalMemoryUsageInBytes, -1, true, conf.highPriorityPoolRatio) + // SPARK-44878 - avoid using strict limit to prevent insertion exception on cache full. + // Please refer to RocksDB issue here - https://github.com/facebook/rocksdb/issues/8670 + cache = new LRUCache(totalMemoryUsageInBytes, + -1, + /* strictCapacityLimit = */false, + conf.highPriorityPoolRatio) + writeBufferManager = new WriteBufferManager( (totalMemoryUsageInBytes * conf.writeBufferCacheRatio).toLong, cache) From 6e33f1cdf97b16c125d76866b36422c8092eb4d1 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Tue, 22 Aug 2023 11:01:06 -0700 Subject: [PATCH 2/2] Change to address Jungtaek's comments --- docs/structured-streaming-programming-guide.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 53d5919d4dcd6..de0085b5376c3 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2394,6 +2394,10 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node. Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings. +Note that the `boundedMemoryUsage` config will enable a soft limit on the total memory usage for RocksDB. +So the total memory used by RocksDB can temporarily exceed this value if all blocks allocated to higher level readers are in use. +Enabling a strict limit is not possible at this time since it will cause query failures and we do not support re-balancing of the state across additional nodes. + ##### RocksDB State Store Changelog Checkpointing In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage. Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability.