Skip to content

Commit

Permalink
186724394 - Fix contract seeding (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Jan 12, 2024
1 parent 6d969bd commit 8b04584
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 277 deletions.
296 changes: 148 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
time::{Duration, SystemTime},
};

use freenet_stdlib::prelude::ContractKey;
use serde::{Deserialize, Serialize};
use ulid::Ulid;

Expand Down Expand Up @@ -228,6 +229,11 @@ pub(crate) enum NetMessage {
Put(PutMsg),
Get(GetMsg),
Subscribe(SubscribeMsg),
Unsubscribed {
transaction: Transaction,
key: ContractKey,
from: PeerId,
},
Update(UpdateMsg),
/// Failed a transaction, informing of abortion.
Aborted(Transaction),
Expand Down Expand Up @@ -294,6 +300,7 @@ impl NetMessage {
Subscribe(op) => op.id(),
Update(op) => op.id(),
Aborted(tx) => tx,
Unsubscribed { transaction, .. } => transaction,
}
}

Expand All @@ -306,6 +313,7 @@ impl NetMessage {
Subscribe(op) => op.target(),
Update(op) => op.target(),
Aborted(_) => None,
Unsubscribed { .. } => None,
}
}

Expand All @@ -319,6 +327,7 @@ impl NetMessage {
Subscribe(op) => op.terminal(),
Update(op) => op.terminal(),
Aborted(_) => true,
Unsubscribed { .. } => true,
}
}

Expand All @@ -331,6 +340,7 @@ impl NetMessage {
Subscribe(op) => op.requested_location(),
Update(op) => op.requested_location(),
Aborted(_) => None,
Unsubscribed { .. } => None,
}
}

Expand All @@ -351,6 +361,9 @@ impl Display for NetMessage {
Subscribe(msg) => msg.fmt(f)?,
Update(msg) => msg.fmt(f)?,
Aborted(msg) => msg.fmt(f)?,
Unsubscribed { key, from, .. } => {
write!(f, "Unsubscribed {{ key: {}, from: {} }}", key, from)?;
}
};
write!(f, "}}")
}
Expand Down
125 changes: 69 additions & 56 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::{
};

use either::Either;
use freenet_stdlib::client_api::{ClientRequest, ContractRequest, ErrorKind};
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
prelude::ContractKey,
};
use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId as Libp2pPeerId};
use serde::{Deserialize, Serialize};
use tracing::Instrument;
Expand Down Expand Up @@ -421,61 +424,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
}
}
ContractRequest::Subscribe { key, .. } => {
const TIMEOUT: Duration = Duration::from_secs(30);
let mut missing_contract = false;
let timeout = tokio::time::timeout(TIMEOUT, async {
// Initialize a subscribe op.
loop {
let op = subscribe::start_op(key.clone());
let _ = op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.await;
match subscribe::request_subscribe(&op_manager, op).await {
Err(OpError::ContractError(ContractError::ContractNotFound(
key,
))) if !missing_contract => {
tracing::info!(%key, "Trying to subscribe to a contract not present, requesting it first");
missing_contract = true;
let get_op = get::start_op(key.clone(), true);
if let Err(error) = get::request_get(&op_manager, get_op).await
{
tracing::error!(%key, %error, "Failed getting the contract while previously trying to subscribe; bailing");
break Err(error);
}
continue;
}
Err(OpError::ContractError(ContractError::ContractNotFound(_))) => {
tracing::warn!("Still waiting for {key} contract");
tokio::time::sleep(Duration::from_secs(2)).await
}
Err(err) => {
tracing::error!("{}", err);
break Err(err);
}
Ok(()) => {
if missing_contract {
tracing::debug!(%key,
"Got back the missing contract while subscribing"
);
}
tracing::debug!(%key, "Starting subscribe request");
break Ok(());
}
}
}
});
match timeout.await {
Err(_) => {
tracing::error!(%key, "Timeout while waiting for contract to start subscription");
}
Ok(Err(error)) => {
tracing::error!(%key, %error, "Error while subscribing to contract");
}
Ok(Ok(_)) => {
tracing::debug!(%key, "Started subscription to contract");
}
}
subscribe(op_manager, key, Some(client_id)).await;
}
_ => {
tracing::error!("Op not supported");
Expand Down Expand Up @@ -698,11 +647,75 @@ async fn process_message<CB>(
)
.await;
}
NetMessage::Unsubscribed { key, .. } => {
subscribe(op_manager, key.clone(), None).await;
break;
}
_ => break,
}
}
}

/// Attempts to subscribe to a contract
async fn subscribe(op_manager: Arc<OpManager>, key: ContractKey, client_id: Option<ClientId>) {
const TIMEOUT: Duration = Duration::from_secs(30);
let mut missing_contract = false;
let timeout = tokio::time::timeout(TIMEOUT, async {
// Initialize a subscribe op.
loop {
let op = subscribe::start_op(key.clone());
if let Some(client_id) = client_id {
let _ = op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.await;
}
match subscribe::request_subscribe(&op_manager, op).await {
Err(OpError::ContractError(ContractError::ContractNotFound(key)))
if !missing_contract =>
{
tracing::info!(%key, "Trying to subscribe to a contract not present, requesting it first");
missing_contract = true;
let get_op = get::start_op(key.clone(), true);
if let Err(error) = get::request_get(&op_manager, get_op).await {
tracing::error!(%key, %error, "Failed getting the contract while previously trying to subscribe; bailing");
break Err(error);
}
continue;
}
Err(OpError::ContractError(ContractError::ContractNotFound(_))) => {
tracing::warn!("Still waiting for {key} contract");
tokio::time::sleep(Duration::from_secs(2)).await
}
Err(err) => {
tracing::error!("{}", err);
break Err(err);
}
Ok(()) => {
if missing_contract {
tracing::debug!(%key,
"Got back the missing contract while subscribing"
);
}
tracing::debug!(%key, "Starting subscribe request");
break Ok(());
}
}
}
});
match timeout.await {
Err(_) => {
tracing::error!(%key, "Timeout while waiting for contract to start subscription");
}
Ok(Err(error)) => {
tracing::error!(%key, %error, "Error while subscribing to contract");
}
Ok(Ok(_)) => {
tracing::debug!(%key, "Started subscription to contract");
}
}
}

async fn handle_aborted_op<CM>(
tx: Transaction,
this_peer: PeerId,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<'a> From<&'a str> for NodeLabel {
#[cfg(test)]
#[derive(Clone)]
pub(crate) struct NodeSpecification {
pub owned_contracts: Vec<(ContractContainer, WrappedState, Option<PeerKeyLocation>)>,
pub owned_contracts: Vec<(ContractContainer, WrappedState, bool)>,
pub events_to_generate: HashMap<EventId, freenet_stdlib::client_api::ClientRequest<'static>>,
pub contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
}
Expand Down Expand Up @@ -283,7 +283,7 @@ pub(super) struct Builder<ER> {
contract_handler_name: String,
add_noise: bool,
event_register: ER,
contracts: Vec<(ContractContainer, WrappedState, Option<PeerKeyLocation>)>,
contracts: Vec<(ContractContainer, WrappedState, bool)>,
contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
}

Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<ER> Builder<ER> {
#[cfg(test)]
pub fn append_contracts(
&mut self,
contracts: Vec<(ContractContainer, WrappedState, Option<PeerKeyLocation>)>,
contracts: Vec<(ContractContainer, WrappedState, bool)>,
contract_subscribers: std::collections::HashMap<ContractKey, Vec<PeerKeyLocation>>,
) {
self.contracts.extend(contracts);
Expand All @@ -93,7 +93,7 @@ where
{
async fn append_contracts(
&mut self,
contracts: Vec<(ContractContainer, WrappedState, Option<PeerKeyLocation>)>,
contracts: Vec<(ContractContainer, WrappedState, bool)>,
contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
) -> Result<(), anyhow::Error> {
use crate::contract::ContractHandlerEvent;
Expand All @@ -112,10 +112,8 @@ where
key,
self.op_manager.ring.peer_key
);
if let Some(subscription) = subscription {
self.op_manager
.ring
.add_subscription(key.clone(), subscription);
if subscription {
self.op_manager.ring.seed_contract(key.clone());
}
if let Some(subscribers) = contract_subscribers.get(&key) {
// add contract subscribers
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,7 @@ impl Operation for GetOp {
..
})
);
let should_subscribe = op_manager
.ring
.within_subscribing_distance(&Location::from(&key));
let should_subscribe = op_manager.ring.should_seed(&key);
let should_put = is_original_requester || should_subscribe;

if should_put {
Expand All @@ -610,7 +608,7 @@ impl Operation for GetOp {
match res {
ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {
let is_subscribed_contract =
op_manager.ring.is_subscribed_to_contract(&key);
op_manager.ring.is_seeding_contract(&key);
if !is_subscribed_contract && should_subscribe {
tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching");
super::start_subscription_request(
Expand Down Expand Up @@ -953,7 +951,7 @@ mod test {
owned_contracts: vec![(
ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract)),
contract_val,
None,
false,
)],
events_to_generate: HashMap::new(),
contract_subscribers: HashMap::new(),
Expand Down Expand Up @@ -1048,7 +1046,7 @@ mod test {
owned_contracts: vec![(
ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract)),
contract_val,
None,
false,
)],
events_to_generate: HashMap::new(),
contract_subscribers: HashMap::new(),
Expand Down
Loading

0 comments on commit 8b04584

Please sign in to comment.