Skip to content

Commit

Permalink
Merge pull request #847 from iduartgomez/join-cli-events-with-net-events
Browse files Browse the repository at this point in the history
Join executor to network ops
  • Loading branch information
iduartgomez authored Sep 26, 2023
2 parents 13bf2a8 + 743b6ad commit d005177
Show file tree
Hide file tree
Showing 28 changed files with 1,009 additions and 585 deletions.
153 changes: 76 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl From<String> for AuthToken {

#[non_exhaustive]
pub struct OpenRequest<'a> {
pub id: ClientId,
pub client_id: ClientId,
pub request: Box<ClientRequest<'a>>,
pub notification_channel: Option<UnboundedSender<HostResult>>,
pub token: Option<AuthToken>,
Expand All @@ -109,7 +109,7 @@ impl<'a> OpenRequest<'a> {

pub fn new(id: ClientId, request: Box<ClientRequest<'a>>) -> Self {
Self {
id,
client_id: id,
request,
notification_channel: None,
token: None,
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/client_events/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
.map(|res| {
match res {
Ok(OpenRequest {
id: external,
client_id: external,
request,
notification_channel,
token,
Expand All @@ -103,7 +103,7 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
});

Ok(OpenRequest {
id,
client_id: id,
request,
notification_channel,
token,
Expand Down Expand Up @@ -162,9 +162,9 @@ async fn client_fn(
}
client_msg = client.recv() => {
match client_msg {
Ok(OpenRequest {id, request, notification_channel, token}) => {
tracing::debug!("received msg @ combinator from external id {id}, msg: {request}");
if tx_host.send(Ok(OpenRequest { id, request, notification_channel, token })).await.is_err() {
Ok(OpenRequest { client_id, request, notification_channel, token }) => {
tracing::debug!("received msg @ combinator from external id {client_id}, msg: {request}");
if tx_host.send(Ok(OpenRequest { client_id, request, notification_channel, token })).await.is_err() {
break;
}
}
Expand Down Expand Up @@ -308,7 +308,7 @@ mod test {
.unwrap();

for i in 0..3 {
let OpenRequest { id, .. } = combinator.recv().await.unwrap();
let OpenRequest { client_id: id, .. } = combinator.recv().await.unwrap();
eprintln!("received: {id:?}");
assert_eq!(ClientId::new(i), id);
}
Expand Down
51 changes: 25 additions & 26 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ mod handler;
mod in_memory;
pub mod storages;

pub(crate) use executor::{
executor_channel, ExecutorToEventLoopChannel, NetworkEventListenerHalve,
};
pub(crate) use handler::{
contract_handler_channel, CHSenderHalve, ContractHandler, ContractHandlerChannel,
ContractHandlerEvent, NetworkContractHandler, StoreResponse,
contract_handler_channel, ClientResponses, ClientResponsesSender, ContractHandler,
ContractHandlerEvent, ContractHandlerToEventLoopChannel, EventId, NetEventListener,
NetworkContractHandler, StoreResponse,
};
#[cfg(test)]
pub(crate) use in_memory::{MemoryContractHandler, MockRuntime};

use executor::ContractExecutor;
pub use executor::{Executor, ExecutorError, OperationMode};

use executor::ContractExecutor;

pub(crate) async fn contract_handling<'a, CH>(mut contract_handler: CH) -> Result<(), ContractError>
where
CH: ContractHandler + Send + 'static,
{
loop {
let res = contract_handler.channel().recv_from_listener().await?;
match res {
(
_id,
ContractHandlerEvent::GetQuery {
key,
fetch_contract,
},
) => {
let (id, event) = contract_handler.channel().recv_from_event_loop().await?;
match event {
ContractHandlerEvent::GetQuery {
key,
fetch_contract,
} => {
match contract_handler
.executor()
.fetch_contract(key.clone(), fetch_contract)
Expand All @@ -39,8 +41,8 @@ where
Ok((state, contract)) => {
contract_handler
.channel()
.send_to_listener(
_id,
.send_to_event_loop(
id,
ContractHandlerEvent::GetResponse {
key,
response: Ok(StoreResponse {
Expand All @@ -55,8 +57,8 @@ where
tracing::warn!("error while executing get contract query: {err}");
contract_handler
.channel()
.send_to_listener(
_id,
.send_to_event_loop(
id,
ContractHandlerEvent::GetResponse {
key,
response: Err(err.into()),
Expand All @@ -66,30 +68,27 @@ where
}
}
}
(id, ContractHandlerEvent::Cache(contract)) => {
ContractHandlerEvent::Cache(contract) => {
match contract_handler.executor().store_contract(contract).await {
Ok(_) => {
contract_handler
.channel()
.send_to_listener(id, ContractHandlerEvent::CacheResult(Ok(())))
.send_to_event_loop(id, ContractHandlerEvent::CacheResult(Ok(())))
.await?;
}
Err(err) => {
let err = ContractError::ContractRuntimeError(err);
contract_handler
.channel()
.send_to_listener(id, ContractHandlerEvent::CacheResult(Err(err)))
.send_to_event_loop(id, ContractHandlerEvent::CacheResult(Err(err)))
.await?;
}
}
}
(
_id,
ContractHandlerEvent::PutQuery {
key: _key,
state: _state,
},
) => {
ContractHandlerEvent::PutQuery {
key: _key,
state: _state,
} => {
// let _put_result = contract_handler
// .handle_request(ClientRequest::Put {
// contract: todo!(),
Expand Down
Loading

0 comments on commit d005177

Please sign in to comment.