Skip to content

Commit

Permalink
Implement simple durable Raft storage based on RocksDB
Browse files Browse the repository at this point in the history
This commit adds RocksDbStorage which implements raft::Storage.
The RocksDbStorage is a durable storage implementation which is
used by the RaftMetadataStore to store the raft state durably.

This fixes restatedev#1791.
  • Loading branch information
tillrohrmann committed Nov 28, 2024
1 parent f37a81d commit f1d03fa
Show file tree
Hide file tree
Showing 7 changed files with 539 additions and 78 deletions.
1 change: 1 addition & 0 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod grpc;
mod grpc_svc;
pub mod local;
pub mod raft;
mod util;

use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
Expand Down
64 changes: 14 additions & 50 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use crate::{
MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender,
util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender,
};
use bytes::BytesMut;
use bytestring::ByteString;
Expand All @@ -23,7 +23,7 @@ use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::live::BoxedLiveLoad;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB};
use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, trace};
Expand Down Expand Up @@ -56,14 +56,18 @@ impl LocalMetadataStore {
let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options))
.add_cf_pattern(
CfPrefixPattern::ANY,
cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build()
.expect("valid spec");
let db_spec = DbSpecBuilder::new(
db_name.clone(),
options.data_dir(),
util::db_options(options),
)
.add_cf_pattern(
CfPrefixPattern::ANY,
util::cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build()
.expect("valid spec");

let db = db_manager
.open_db(updateable_rocksdb_options.clone(), db_spec)
Expand Down Expand Up @@ -296,43 +300,3 @@ impl LocalMetadataStore {
}
}
}

fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options {
rocksdb::Options::default()
}

fn cf_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
set_memory_related_opts(&mut opts, memory_budget);
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
opts.set_num_levels(3);

opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Zstd,
]);

//
opts
}
}

fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
// We set the budget to allow 1 mutable + 3 immutable.
opts.set_write_buffer_size(memtables_budget / 4);

// merge 2 memtables when flushing to L0
opts.set_min_write_buffer_number_to_merge(2);
opts.set_max_write_buffer_number(4);
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
opts.set_level_zero_file_num_compaction_trigger(2);
// doesn't really matter much, but we don't want to create too many files
opts.set_target_file_size_base(memtables_budget as u64 / 8);
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
opts.set_max_bytes_for_level_base(memtables_budget as u64);
}
1 change: 1 addition & 0 deletions crates/metadata-store/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
// by the Apache License, Version 2.0.

pub mod service;
mod storage;
mod store;
2 changes: 1 addition & 1 deletion crates/metadata-store/src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl RaftMetadataStoreService {
impl MetadataStoreService for RaftMetadataStoreService {
async fn run(mut self) -> Result<(), Error> {
let store_options = self.options.live_load();
let store = RaftMetadataStore::new().map_err(Error::generic)?;
let store = RaftMetadataStore::create().await.map_err(Error::generic)?;

let mut builder = GrpcServiceBuilder::default();

Expand Down
Loading

0 comments on commit f1d03fa

Please sign in to comment.