Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): add StateSync that sends requests to StateSyncRunner #2072

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ members = [
"crates/starknet_sequencer_infra",
"crates/starknet_sequencer_node",
"crates/starknet_sierra_compile",
"crates/starknet_state_sync",
"crates/starknet_state_sync_types",
"crates/starknet_task_executor",
"workspace_tests",
Expand Down Expand Up @@ -228,6 +229,8 @@ starknet_patricia = { path = "crates/starknet_patricia", version = "0.0.0" }
starknet_sequencer_infra = { path = "crates/starknet_sequencer_infra", version = "0.0.0" }
starknet_sequencer_node = { path = "crates/starknet_sequencer_node", version = "0.0.0" }
starknet_sierra_compile = { path = "crates/starknet_sierra_compile", version = "0.0.0" }
starknet_state_sync = { path = "crates/starknet_state_sync", version = "0.0.0" }
starknet_state_sync_types = { path = "crates/starknet_state_sync_types", version = "0.0.0" }
starknet_task_executor = { path = "crates/starknet_task_executor", version = "0.0.0" }
static_assertions = "1.1.0"
statistical = "1.0.0"
Expand Down
15 changes: 15 additions & 0 deletions crates/starknet_state_sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "starknet_state_sync"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[lints]
workspace = true

[dependencies]
async-trait.workspace = true
futures.workspace = true
starknet_sequencer_infra.workspace = true
starknet_state_sync_types.workspace = true
37 changes: 37 additions & 0 deletions crates/starknet_state_sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
pub mod runner;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use starknet_sequencer_infra::component_definitions::ComponentRequestHandler;
use starknet_state_sync_types::communication::{StateSyncRequest, StateSyncResponse};
use starknet_state_sync_types::errors::StateSyncError;

use crate::runner::StateSyncRunner;

// TODO: consider adding to config
const BUFFER_SIZE: usize = 100000;

pub fn create_state_sync_and_runner() -> (StateSync, StateSyncRunner) {
let (request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE);
(StateSync { request_sender }, StateSyncRunner { request_receiver })
}

pub struct StateSync {
pub request_sender: mpsc::Sender<(StateSyncRequest, oneshot::Sender<StateSyncResponse>)>,
}

// TODO: Have StateSyncRunner call StateSync instead of the opposite once we stop supporting
// papyrus executable and can move the storage into StateSync.
#[async_trait]
impl ComponentRequestHandler<StateSyncRequest, StateSyncResponse> for StateSync {
async fn handle_request(&mut self, request: StateSyncRequest) -> StateSyncResponse {
let (response_sender, response_receiver) = oneshot::channel();
if self.request_sender.send((request, response_sender)).await.is_err() {
return StateSyncResponse::GetBlock(Err(StateSyncError::RunnerCommunicationError));
}
response_receiver.await.unwrap_or_else(|_| {
StateSyncResponse::GetBlock(Err(StateSyncError::RunnerCommunicationError))
})
}
}
16 changes: 16 additions & 0 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_state_sync_types::communication::{StateSyncRequest, StateSyncResponse};

pub struct StateSyncRunner {
pub request_receiver: mpsc::Receiver<(StateSyncRequest, oneshot::Sender<StateSyncResponse>)>,
}

#[async_trait]
impl ComponentStarter for StateSyncRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
unimplemented!()
}
}
1 change: 1 addition & 0 deletions crates/starknet_state_sync_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
async-trait.workspace = true
papyrus_proc_macros.workspace = true
serde = { workspace = true, features = ["derive"] }
starknet_api.workspace = true
starknet_sequencer_infra.workspace = true
Expand Down
53 changes: 51 additions & 2 deletions crates/starknet_state_sync_types/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use std::sync::Arc;

use async_trait::async_trait;
use papyrus_proc_macros::handle_response_variants;
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_sequencer_infra::component_client::ClientError;
use starknet_sequencer_infra::component_client::{
ClientError,
LocalComponentClient,
RemoteComponentClient,
};
use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender;
use thiserror::Error;

use crate::errors::StateSyncError;
Expand All @@ -18,6 +27,8 @@ pub trait StateSyncClient: Send + Sync {
// TODO: Add state reader methods for gateway.
}

pub type StateSyncResult<T> = Result<T, StateSyncError>;

#[derive(Clone, Debug, Error)]
pub enum StateSyncClientError {
#[error(transparent)]
Expand All @@ -27,4 +38,42 @@ pub enum StateSyncClientError {
}
pub type StateSyncClientResult<T> = Result<T, StateSyncClientError>;

// TODO: Add client types and request/response enums
pub type LocalStateSyncClient = LocalComponentClient<StateSyncRequest, StateSyncResponse>;
pub type RemoteStateSyncClient = RemoteComponentClient<StateSyncRequest, StateSyncResponse>;
pub type SharedStateSyncClient = Arc<dyn StateSyncClient>;
pub type StateSyncRequestAndResponseSender =
ComponentRequestAndResponseSender<StateSyncRequest, StateSyncResponse>;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StateSyncRequest {
GetBlock(BlockNumber),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StateSyncResponse {
GetBlock(StateSyncResult<Option<SyncBlock>>),
}

#[async_trait]
impl StateSyncClient for LocalStateSyncClient {
async fn get_block(
&self,
block_number: BlockNumber,
) -> StateSyncClientResult<Option<SyncBlock>> {
let request = StateSyncRequest::GetBlock(block_number);
let response = self.send(request).await;
handle_response_variants!(StateSyncResponse, GetBlock, StateSyncClientError, StateSyncError)
}
}

#[async_trait]
impl StateSyncClient for RemoteStateSyncClient {
async fn get_block(
&self,
block_number: BlockNumber,
) -> StateSyncClientResult<Option<SyncBlock>> {
let request = StateSyncRequest::GetBlock(block_number);
let response = self.send(request).await?;
handle_response_variants!(StateSyncResponse, GetBlock, StateSyncClientError, StateSyncError)
}
}
6 changes: 4 additions & 2 deletions crates/starknet_state_sync_types/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;

// This error is defined even though it's empty to be compatible with the other components.
#[derive(Debug, Error, Serialize, Deserialize, Clone)]
pub enum StateSyncError {}
pub enum StateSyncError {
#[error("Communication error between StateSync and StateSyncRunner")]
RunnerCommunicationError,
}
Loading