Skip to content

Commit

Permalink
feat: state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielides committed Jan 8, 2025
1 parent cfd9c4d commit 2d60300
Show file tree
Hide file tree
Showing 24 changed files with 2,059 additions and 689 deletions.
1,475 changes: 812 additions & 663 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ transport = "routed"
#]
grpc-concurrency = [
{ "check_tx" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "list_snapshots" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "load_snapshot_chunk" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "offer_snapshot" = 1 },
{ "apply_snapshot_chunk" = 1 },
]


Expand Down Expand Up @@ -414,7 +418,7 @@ ttl-num-blocks = {{=it.platform.drive.tenderdash.mempool.ttlNumBlocks}}
# the network to take and serve state machine snapshots. State sync is not attempted if the node
# has any local state (LastBlockHeight > 0). The node will have a truncated block history,
# starting from the height of the snapshot.
enable = false
enable = true

# State sync uses light client verification to verify state. This can be done either through the
# P2P layer or RPC layer. Set this to true to use the P2P layer. If false (default), RPC layer
Expand Down
6 changes: 6 additions & 0 deletions packages/rs-drive-abci/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ ABCI_LOG_STDOUT_FORMAT=pretty
ABCI_LOG_STDOUT_COLOR=true

DB_PATH=/tmp/db

CHECKPOINTS_PATH=${DB_PATH}/checkpoints

# GroveDB database file
GROVEDB_LATEST_FILE=${DB_PATH}/latest_state

REJECTIONS_PATH=/tmp/rejected

# Cache size for Data Contracts
Expand Down
76 changes: 64 additions & 12 deletions packages/rs-drive-abci/src/abci/app/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
Expand All @@ -16,46 +20,80 @@ use tenderdash_abci::proto::abci as proto;
///
/// AbciApp implements logic that should be triggered when Tenderdash performs various operations, like
/// creating new proposal or finalizing new block.
pub struct ConsensusAbciApplication<'a, C> {
/// 'p: 'tx, means that Platform must outlive the transaction
pub struct ConsensusAbciApplication<'p, C> {
/// Platform
platform: &'a Platform<C>,
platform: &'p Platform<C>,
/// The current GroveDb transaction
transaction: RwLock<Option<Transaction<'a>>>,
transaction: RwLock<Option<Transaction<'p>>>,
/// The current block execution context
block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'p>>>,
/// The snapshot manager
snapshot_manager: SnapshotManager,
}

impl<'a, C> ConsensusAbciApplication<'a, C> {
impl<'p, C> ConsensusAbciApplication<'p, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
pub fn new(platform: &'p Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path
.to_str()
.unwrap()
.to_string(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}

impl<'a, C> PlatformApplication<C> for ConsensusAbciApplication<'a, C> {
impl<'p, C> PlatformApplication<C> for ConsensusAbciApplication<'p, C> {
fn platform(&self) -> &Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for ConsensusAbciApplication<'a, C> {
impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> {
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'p, C> SnapshotFetchingApplication<'p, C> for ConsensusAbciApplication<'p, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'p Platform<C> {
self.platform
}
}

impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> {
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
}
}

impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> {
impl<'p, C> TransactionalApplication<'p> for ConsensusAbciApplication<'p, C> {
/// create and store a new transaction
fn start_transaction(&self) {
let transaction = self.platform.drive.grove.start_transaction();
self.transaction.write().unwrap().replace(transaction);
}

fn transaction(&self) -> &RwLock<Option<Transaction<'a>>> {
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>> {
&self.transaction
}

Expand All @@ -77,13 +115,13 @@ impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> {
}
}

impl<'a, C> Debug for ConsensusAbciApplication<'a, C> {
impl<'p, C> Debug for ConsensusAbciApplication<'p, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<ConsensusAbciApplication>")
}
}

impl<'a, C> tenderdash_abci::Application for ConsensusAbciApplication<'a, C>
impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C>
where
C: CoreRPCLike,
{
Expand Down Expand Up @@ -149,4 +187,18 @@ where
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
67 changes: 66 additions & 1 deletion packages/rs-drive-abci/src/abci/app/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
Expand All @@ -23,15 +27,32 @@ pub struct FullAbciApplication<'a, C> {
pub transaction: RwLock<Option<Transaction<'a>>>,
/// The current block execution context
pub block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
pub snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'a>>>,
/// The snapshot manager
pub snapshot_manager: SnapshotManager,
}

impl<'a, C> FullAbciApplication<'a, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path
.to_str()
.unwrap()
.to_string(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}
Expand All @@ -42,6 +63,22 @@ impl<'a, C> PlatformApplication<C> for FullAbciApplication<'a, C> {
}
}

impl<'a, C> SnapshotManagerApplication for FullAbciApplication<'a, C> {
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'a, C> SnapshotFetchingApplication<'a, C> for FullAbciApplication<'a, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'a>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'a Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for FullAbciApplication<'a, C> {
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
Expand Down Expand Up @@ -150,4 +187,32 @@ where
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}

fn list_snapshots(
&self,
request: proto::RequestListSnapshots,
) -> Result<proto::ResponseListSnapshots, proto::ResponseException> {
handler::list_snapshots(self, request).map_err(error_into_exception)
}

fn load_snapshot_chunk(
&self,
request: proto::RequestLoadSnapshotChunk,
) -> Result<proto::ResponseLoadSnapshotChunk, proto::ResponseException> {
handler::load_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
22 changes: 20 additions & 2 deletions packages/rs-drive-abci/src/abci/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,36 @@ mod consensus;
/// Convert state transition execution result into ABCI response
pub mod execution_result;
mod full;
mod state_source;

use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::DefaultCoreRPC;
pub use check_tx::CheckTxAbciApplication;
pub use consensus::ConsensusAbciApplication;
use dpp::version::PlatformVersion;
pub use full::FullAbciApplication;
pub use state_source::StateSourceAbciApplication;

/// Platform-based ABCI application
pub trait PlatformApplication<C = DefaultCoreRPC> {
/// Returns Platform
fn platform(&self) -> &Platform<C>;
}

/// Platform-based ABCI application
pub trait SnapshotManagerApplication {
/// Returns Platform
fn snapshot_manager(&self) -> &SnapshotManager;
}

/// Transactional ABCI application
pub trait TransactionalApplication<'a> {
pub trait TransactionalApplication<'p> {
/// Creates and keeps a new transaction
fn start_transaction(&self);

/// Returns the current transaction
fn transaction(&self) -> &RwLock<Option<Transaction<'a>>>;
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>>;

/// Commits created transaction
fn commit_transaction(&self, platform_version: &PlatformVersion) -> Result<(), Error>;
Expand All @@ -39,3 +48,12 @@ pub trait BlockExecutionApplication {
/// Returns the current block execution context
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>>;
}

/// Application that can maintain state sync
pub trait SnapshotFetchingApplication<'p, C> {
/// Returns the current snapshot fetching session
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>>;

/// Returns platform reference
fn platform(&self) -> &'p Platform<C>;
}
Loading

0 comments on commit 2d60300

Please sign in to comment.