Skip to content

Commit

Permalink
Implement Update Op (#915)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexis Batyk <[email protected]>
Co-authored-by: Ignacio Duart <[email protected]>
  • Loading branch information
3 people authored Feb 18, 2024
1 parent 1ba0a2e commit 64cf41b
Show file tree
Hide file tree
Showing 13 changed files with 848 additions and 54 deletions.
1 change: 0 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[submodule "stdlib"]
path = stdlib
url = https://github.com/freenet/freenet-stdlib

6 changes: 4 additions & 2 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,16 @@ pub(crate) mod test {
}
}
val if (35..80).contains(&val) => {
let new_state = UpdateData::State(State::from(self.random_byte_vec()));
if let Some(contract) = self.choose(&state.existing_contracts) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
// TODO: It will be used when the delta updates are available
// let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
continue;
}
let request = ContractRequest::Update {
key: contract.key().clone(),
data: delta,
data: new_state,
};
if state.owns_contracts.contains(&contract.key()) {
return Some(request.into());
Expand Down
30 changes: 30 additions & 0 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ where
error
})?;
}
ContractHandlerEvent::UpdateQuery {
key,
state,
related_contracts,
} => {
let update_result = contract_handler
.executor()
.upsert_contract_state(
key.clone(),
Either::Left(state.clone()),
related_contracts,
None,
)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await;

contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::UpdateResponse {
new_value: update_result.map_err(Into::into),
},
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
_ => unreachable!(),
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,17 @@ struct UpdateContract {

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::update::UpdateOp {
fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
operations::update::start_op(key, new_state, op_manager.ring.max_hops_to_live)
let related_contracts = RelatedContracts::default();
operations::update::start_op(key, new_state, related_contracts)
}

async fn resume_op(
op: operations::update::UpdateOp,
op_manager: &OpManager,
) -> Result<(), OpError> {
operations::update::request_update(op_manager, op, None).await
operations::update::request_update(op_manager, op).await
}
}

Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ impl ContractExecutor for Executor<MockRuntime> {
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
// (Either::Left(_), None) => {
// return Err(ExecutorError::request(RequestError::from(
// StdContractError::Get {
// key: key.clone(),
// cause: "Missing contract or parameters".into(),
// },
// )));
// }
(Either::Left(incoming_state), None) => {
// update case

self.state_store
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
}
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ pub(crate) enum ContractHandlerEvent {
key: ContractKey,
response: Result<StoreResponse, ExecutorError>,
},
/// Updates a supposedly existing contract in this node
UpdateQuery {
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
},
/// The response to an update query
UpdateResponse {
new_value: Result<WrappedState, ExecutorError>,
},
}

impl std::fmt::Display for ContractHandlerEvent {
Expand Down Expand Up @@ -399,6 +409,17 @@ impl std::fmt::Display for ContractHandlerEvent {
write!(f, "get query failed {{ {key} }}",)
}
},
ContractHandlerEvent::UpdateQuery { key, .. } => {
write!(f, "update query {{ {key} }}")
}
ContractHandlerEvent::UpdateResponse { new_value } => match new_value {
Ok(v) => {
write!(f, "update query response {{ {v} }}",)
}
Err(e) => {
write!(f, "update query failed {{ {e} }}",)
}
},
}
}
}
Expand Down
46 changes: 40 additions & 6 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
prelude::ContractKey,
prelude::{ContractKey, RelatedContracts, WrappedState},
};
use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId as Libp2pPeerId};

Expand All @@ -40,7 +40,7 @@ use crate::{
message::{NetMessage, NodeEvent, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
get, put, subscribe, OpEnum, OpError, OpOutcome,
get, put, subscribe, update, OpEnum, OpError, OpOutcome,
},
ring::{Location, PeerKeyLocation},
router::{RouteEvent, RouteOutcome},
Expand Down Expand Up @@ -396,15 +396,33 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
tracing::error!("{}", err);
}
}
ContractRequest::Update {
key: _key,
data: _delta,
} => {
ContractRequest::Update { key, data } => {
// FIXME: perform updates
tracing::debug!(
this_peer = %op_manager.ring.peer_key,
"Received update from user event",
);
let state = match data {
freenet_stdlib::prelude::UpdateData::State(s) => s,
_ => {
unreachable!();
}
};

let wrapped_state = WrappedState::from(state.into_bytes());

let related_contracts = RelatedContracts::default();

let op = update::start_op(key, wrapped_state, related_contracts);

let _ = op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.await;

if let Err(err) = update::request_update(&op_manager, op).await {
tracing::error!("request update error {}", err)
}
}
ContractRequest::Get {
key,
Expand Down Expand Up @@ -648,6 +666,22 @@ async fn process_message<CB>(
)
.await;
}
NetMessage::Update(op) => {
let op_result =
handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
.await;
handle_op_not_available!(op_result);
break report_result(
tx,
op_result,
&op_manager,
executor_callback,
cli_req,
&mut *event_listener,
)
.await;
}

NetMessage::Unsubscribed { key, .. } => {
subscribe(op_manager, key.clone(), None).await;
break;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ER> Builder<ER> {
contract::contract_handling(contract_handler)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);

let mut config = super::RunnerConfig {
peer_key: self.peer_key,
gateways,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/testing_impl/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl NetworkPeer {
#[cfg(feature = "trace-ot")]
{
use crate::tracing::{CombinedRegister, OTEventRegister};
crate::tracing::CombinedRegister::new([
CombinedRegister::new([
Box::new(EventRegister::new(
crate::config::Config::conf().event_log(),
)),
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ where
sender = s;
op.process_message(network_bridge, op_manager, msg).await
};

handle_op_result(op_manager, network_bridge, result, tx, sender).await
}

Expand All @@ -90,6 +91,7 @@ where
match result {
Err(OpError::StatePushed) => {
// do nothing and continue, the operation will just continue later on
tracing::debug!("entered in state pushed to continue with op");
return Ok(None);
}
Err(err) => {
Expand Down Expand Up @@ -134,6 +136,7 @@ where
}) => {
op_manager.completed(tx_id);
// finished the operation at this node, informing back

if let Some(target) = msg.target().cloned() {
network_bridge.send(&target.peer, msg).await?;
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl Operation for PutOp {
};

let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer);

match try_to_broadcast(
*id,
last_hop,
Expand Down
Loading

0 comments on commit 64cf41b

Please sign in to comment.