Skip to content

Commit

Permalink
chore(sequencer_infra): local client send returns client result
Browse files Browse the repository at this point in the history
commit-id:90fa2e32
  • Loading branch information
Itay-Tsabary-Starkware committed Nov 16, 2024
1 parent 65afb9e commit b36421d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 25 deletions.
12 changes: 6 additions & 6 deletions crates/starknet_batcher_types/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum BatcherClientError {
impl BatcherClient for LocalBatcherClient {
async fn build_proposal(&self, input: BuildProposalInput) -> BatcherClientResult<()> {
let request = BatcherRequest::BuildProposal(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(BatcherResponse, BuildProposal, BatcherClientError, BatcherError)
}

Expand All @@ -109,7 +109,7 @@ impl BatcherClient for LocalBatcherClient {
input: GetProposalContentInput,
) -> BatcherClientResult<GetProposalContentResponse> {
let request = BatcherRequest::GetProposalContent(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
BatcherResponse,
GetProposalContent,
Expand All @@ -120,7 +120,7 @@ impl BatcherClient for LocalBatcherClient {

async fn validate_proposal(&self, input: ValidateProposalInput) -> BatcherClientResult<()> {
let request = BatcherRequest::ValidateProposal(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
BatcherResponse,
ValidateProposal,
Expand All @@ -134,7 +134,7 @@ impl BatcherClient for LocalBatcherClient {
input: SendProposalContentInput,
) -> BatcherClientResult<SendProposalContentResponse> {
let request = BatcherRequest::SendProposalContent(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
BatcherResponse,
SendProposalContent,
Expand All @@ -145,13 +145,13 @@ impl BatcherClient for LocalBatcherClient {

async fn start_height(&self, input: StartHeightInput) -> BatcherClientResult<()> {
let request = BatcherRequest::StartHeight(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(BatcherResponse, StartHeight, BatcherClientError, BatcherError)
}

async fn decision_reached(&self, input: DecisionReachedInput) -> BatcherClientResult<()> {
let request = BatcherRequest::DecisionReached(input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
BatcherResponse,
DecisionReached,
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_gateway_types/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl GatewayClient for LocalGatewayClient {
#[instrument(skip(self))]
async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult<TransactionHash> {
let request = GatewayRequest::AddTransaction(gateway_input);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(GatewayResponse, AddTransaction, GatewayClientError, GatewayError)
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_mempool_p2p_types/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient {
transaction: RpcTransaction,
) -> MempoolP2pPropagatorClientResult<()> {
let request = MempoolP2pPropagatorRequest::AddTransaction(transaction);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
MempoolP2pPropagatorResponse,
AddTransaction,
Expand All @@ -83,7 +83,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient {
propagation_metadata: BroadcastedMessageMetadata,
) -> MempoolP2pPropagatorClientResult<()> {
let request = MempoolP2pPropagatorRequest::ContinuePropagation(propagation_metadata);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
MempoolP2pPropagatorResponse,
ContinuePropagation,
Expand Down
6 changes: 3 additions & 3 deletions crates/starknet_mempool_types/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,19 @@ pub enum MempoolClientError {
impl MempoolClient for LocalMempoolClient {
async fn add_tx(&self, args: AddTransactionArgsWrapper) -> MempoolClientResult<()> {
let request = MempoolRequest::AddTransaction(args);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(MempoolResponse, AddTransaction, MempoolClientError, MempoolError)
}

async fn commit_block(&self, args: CommitBlockArgs) -> MempoolClientResult<()> {
let request = MempoolRequest::CommitBlock(args);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(MempoolResponse, CommitBlock, MempoolClientError, MempoolError)
}

async fn get_txs(&self, n_txs: usize) -> MempoolClientResult<Vec<AccountTransaction>> {
let request = MempoolRequest::GetTransactions(n_txs);
let response = self.send(request).await;
let response = self.send(request).await?;
handle_response_variants!(
MempoolResponse,
GetTransactions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::any::type_name;
use tokio::sync::mpsc::{channel, Sender};
use tracing::info;

use crate::component_client::ClientResult;
use crate::component_definitions::ComponentRequestAndResponseSender;

/// The `LocalComponentClient` struct is a generic client for sending component requests and
Expand Down Expand Up @@ -71,11 +72,11 @@ where
Self { tx }
}

pub async fn send(&self, request: Request) -> Response {
pub async fn send(&self, request: Request) -> ClientResult<Response> {
let (res_tx, mut res_rx) = channel::<Response>(1);
let request_and_res_tx = ComponentRequestAndResponseSender { request, tx: res_tx };
self.tx.send(request_and_res_tx).await.expect("Outbound connection should be open.");
res_rx.recv().await.expect("Inbound connection should be open.")
Ok(res_rx.recv().await.expect("Inbound connection should be open."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,22 @@ where
{
Ok(request) => {
let response = local_client.send(request).await;
HyperResponse::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from(
BincodeSerdeWrapper::new(response)
.to_bincode()
.expect("Response serialization should succeed"),
))
match response {
Ok(response) => HyperResponse::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from(
BincodeSerdeWrapper::new(response)
.to_bincode()
.expect("Response serialization should succeed"),
)),
Err(error) => {
panic!(
"Remote server failed sending with its local client. Error: {:?}",
error
);
}
}
}
Err(error) => {
let server_error = ServerError::RequestDeserializationFailure(error.to_string());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ComponentBClient = LocalComponentClient<ComponentBRequest, ComponentBRespon
impl ComponentAClientTrait for LocalComponentClient<ComponentARequest, ComponentAResponse> {
async fn a_get_value(&self) -> ResultA {
let res = self.send(ComponentARequest::AGetValue).await;
match res {
match res? {
ComponentAResponse::AGetValue(value) => Ok(value),
}
}
Expand All @@ -39,7 +39,7 @@ impl ComponentAClientTrait for LocalComponentClient<ComponentARequest, Component
impl ComponentBClientTrait for LocalComponentClient<ComponentBRequest, ComponentBResponse> {
async fn b_get_value(&self) -> ResultB {
let res = self.send(ComponentBRequest::BGetValue).await;
match res {
match res? {
ComponentBResponse::BGetValue(value) => Ok(value),
unexpected_response => {
Err(ClientError::UnexpectedResponse(format!("{unexpected_response:?}")))
Expand All @@ -48,7 +48,8 @@ impl ComponentBClientTrait for LocalComponentClient<ComponentBRequest, Component
}

async fn b_set_value(&self, value: ValueB) -> ClientResult<()> {
match self.send(ComponentBRequest::BSetValue(value)).await {
let res = self.send(ComponentBRequest::BSetValue(value)).await;
match res? {
ComponentBResponse::BSetValue => Ok(()),
unexpected_response => {
Err(ClientError::UnexpectedResponse(format!("{unexpected_response:?}")))
Expand Down

0 comments on commit b36421d

Please sign in to comment.