Skip to content

Commit

Permalink
feat: added version in state sync (#293)
Browse files Browse the repository at this point in the history
* feat: added versioning in state sync

* clippy fixes
  • Loading branch information
ogabrielides authored May 8, 2024
1 parent cda80d5 commit 60037b6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
42 changes: 40 additions & 2 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
str::Utf8Error,
};

use grovedb_merk::{
Expand All @@ -19,6 +18,8 @@ use crate::{replication, Error, GroveDb, Transaction, TransactionArg};

pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN];

pub const CURRENT_STATE_SYNC_VERSION: u16 = 1;

// Struct governing state sync
pub struct StateSyncInfo<'db> {
// Current Chunk restorer
Expand All @@ -32,6 +33,8 @@ pub struct StateSyncInfo<'db> {
pub pending_chunks: BTreeSet<Vec<u8>>,
// Number of processed chunks in current prefix (Path digest)
pub num_processed_chunks: usize,
// Version of state sync protocol,
pub version: u16,
}

// Struct containing information about current subtrees found in GroveDB
Expand Down Expand Up @@ -115,6 +118,7 @@ impl GroveDb {
current_prefix: None,
pending_chunks,
num_processed_chunks: 0,
version: CURRENT_STATE_SYNC_VERSION,
}
}

Expand Down Expand Up @@ -204,7 +208,15 @@ impl GroveDb {
&self,
global_chunk_id: &[u8],
tx: TransactionArg,
version: u16,
) -> Result<Vec<Op>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let chunk_prefix_length: usize = 32;
if global_chunk_id.len() < chunk_prefix_length {
return Err(Error::CorruptedData(
Expand Down Expand Up @@ -254,7 +266,7 @@ impl GroveDb {
}
Some(t) => {
let merk = self
.open_transactional_merk_at_path(path.into(), &t, None)
.open_transactional_merk_at_path(path.into(), t, None)
.value?;

if merk.is_empty_tree().unwrap() {
Expand Down Expand Up @@ -295,7 +307,20 @@ impl GroveDb {
mut state_sync_info: StateSyncInfo<'db>,
app_hash: CryptoHash,
tx: &'db Transaction,
version: u16,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}
if version != state_sync_info.version {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let mut res = vec![];

match (
Expand Down Expand Up @@ -343,7 +368,20 @@ impl GroveDb {
mut state_sync_info: StateSyncInfo<'db>,
chunk: (&[u8], Vec<Op>),
tx: &'db Transaction,
version: u16,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}
if version != state_sync_info.version {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let mut res = vec![];

let (global_chunk_id, chunk_data) = chunk;
Expand Down
7 changes: 4 additions & 3 deletions tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Qu
use grovedb::reference_path::ReferencePathType;
use rand::{distributions::Alphanumeric, Rng, };
use grovedb::element::SumValue;
use grovedb::replication::CURRENT_STATE_SYNC_VERSION;
use grovedb_path::{SubtreePath};

const MAIN_ΚΕΥ: &[u8] = b"key_main";
Expand Down Expand Up @@ -226,15 +227,15 @@ fn sync_db_demo(
target_tx: &Transaction,
) -> Result<(), grovedb::Error> {
let app_hash = source_db.root_hash(None).value.unwrap();
let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx)?;
let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?;

let mut chunk_queue : VecDeque<Vec<u8>> = VecDeque::new();

chunk_queue.extend(chunk_ids);

while let Some(chunk_id) = chunk_queue.pop_front() {
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None)?;
let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx)?;
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx, CURRENT_STATE_SYNC_VERSION)?;
state_sync_info = new_state_sync_info;
chunk_queue.extend(more_chunks);
}
Expand Down

0 comments on commit 60037b6

Please sign in to comment.