Skip to content

Commit

Permalink
Implement simple durable Raft storage based on RocksDB
Browse files Browse the repository at this point in the history
This commit adds RocksDbStorage which implements raft::Storage.
The RocksDbStorage is a durable storage implementation which is
used by the RaftMetadataStore to store the raft state durably.

This fixes restatedev#1791.
  • Loading branch information
tillrohrmann committed Jan 2, 2025
1 parent 579cbed commit 52b197a
Show file tree
Hide file tree
Showing 6 changed files with 539 additions and 78 deletions.
3 changes: 2 additions & 1 deletion crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod grpc;
mod grpc_svc;
pub mod local;
pub mod raft;
mod util;

use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
Expand Down Expand Up @@ -196,7 +197,7 @@ pub async fn create_metadata_store(
Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed())
}
Kind::Raft => {
let store = RaftMetadataStore::new()?;
let store = RaftMetadataStore::create().await?;
Ok(MetadataStoreRunner::new(store, health_status, server_builder).boxed())
}
}
Expand Down
64 changes: 14 additions & 50 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use crate::{
MetadataStoreBackend, MetadataStoreRequest, PreconditionViolation, RequestError,
util, MetadataStoreBackend, MetadataStoreRequest, PreconditionViolation, RequestError,
RequestReceiver, RequestSender,
};
use bytes::BytesMut;
Expand All @@ -25,7 +25,7 @@ use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::live::BoxedLiveLoad;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB};
use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB};
use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -59,14 +59,18 @@ impl LocalMetadataStore {
let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options))
.add_cf_pattern(
CfPrefixPattern::ANY,
cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build()
.expect("valid spec");
let db_spec = DbSpecBuilder::new(
db_name.clone(),
options.data_dir(),
util::db_options(options),
)
.add_cf_pattern(
CfPrefixPattern::ANY,
util::cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build()
.expect("valid spec");

let db = db_manager
.open_db(updateable_rocksdb_options.clone(), db_spec)
Expand Down Expand Up @@ -300,46 +304,6 @@ impl LocalMetadataStore {
}
}

fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options {
rocksdb::Options::default()
}

fn cf_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
set_memory_related_opts(&mut opts, memory_budget);
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
opts.set_num_levels(3);

opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Zstd,
]);

//
opts
}
}

fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
// We set the budget to allow 1 mutable + 3 immutable.
opts.set_write_buffer_size(memtables_budget / 4);

// merge 2 memtables when flushing to L0
opts.set_min_write_buffer_number_to_merge(2);
opts.set_max_write_buffer_number(4);
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
opts.set_level_zero_file_num_compaction_trigger(2);
// doesn't really matter much, but we don't want to create too many files
opts.set_target_file_size_base(memtables_budget as u64 / 8);
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
opts.set_max_bytes_for_level_base(memtables_budget as u64);
}

impl MetadataStoreBackend for LocalMetadataStore {
fn request_sender(&self) -> RequestSender {
self.request_sender()
Expand Down
1 change: 1 addition & 0 deletions crates/metadata-store/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod storage;
mod store;

pub use store::RaftMetadataStore;
Loading

0 comments on commit 52b197a

Please sign in to comment.