Skip to content

Commit

Permalink
Merge pull request #910 from freenet/186548078-op-logic-fixes-2
Browse files Browse the repository at this point in the history
186548078 - More op logic fixes
  • Loading branch information
iduartgomez authored Dec 19, 2023
2 parents 025b041 + 2c5dfd3 commit c2530c6
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 149 deletions.
27 changes: 9 additions & 18 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::{
use directories::ProjectDirs;
use libp2p::{identity, PeerId};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use tokio::runtime::Runtime;

use crate::local_node::OperationMode;
Expand Down Expand Up @@ -103,7 +102,7 @@ pub struct ConfigPaths {
delegates_dir: PathBuf,
secrets_dir: PathBuf,
db_dir: PathBuf,
event_log: Mutex<PathBuf>,
event_log: PathBuf,
}

impl ConfigPaths {
Expand All @@ -118,8 +117,8 @@ impl ConfigPaths {
Ok(app_data_dir)
}

fn new() -> std::io::Result<ConfigPaths> {
let app_data_dir = Self::app_data_dir()?;
fn new(data_dir: Option<PathBuf>) -> std::io::Result<ConfigPaths> {
let app_data_dir = data_dir.map(Ok).unwrap_or_else(Self::app_data_dir)?;
let contracts_dir = app_data_dir.join("contracts");
let delegates_dir = app_data_dir.join("delegates");
let secrets_dir = app_data_dir.join("secrets");
Expand Down Expand Up @@ -158,7 +157,7 @@ impl ConfigPaths {
delegates_dir,
secrets_dir,
db_dir,
event_log: Mutex::new(event_log),
event_log,
})
}
}
Expand Down Expand Up @@ -205,24 +204,14 @@ impl Config {

pub fn event_log(&self) -> PathBuf {
if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) {
let mut local_file = self.config_paths.event_log.lock().clone();
let mut local_file = self.config_paths.event_log.clone();
local_file.set_file_name("_EVENT_LOG_LOCAL");
local_file
} else {
self.config_paths.event_log.lock().to_owned()
self.config_paths.event_log.to_owned()
}
}

pub fn set_event_log(path: PathBuf) {
tracing::debug!("setting event log file to: {:?}", &path);
fs::OpenOptions::new()
.write(true)
.create(true)
.open(&path)
.expect("couln't create event log file");
*Self::conf().config_paths.event_log.lock() = path;
}

pub fn conf() -> &'static Config {
CONFIG.get_or_init(|| match Config::load_conf() {
Ok(config) => config,
Expand Down Expand Up @@ -264,7 +253,9 @@ impl Config {
.flatten()
.unwrap_or(tracing::log::LevelFilter::Info);
let (bootstrap_ip, bootstrap_port, bootstrap_id) = Config::get_bootstrap_host(&settings)?;
let config_paths = ConfigPaths::new()?;

let data_dir = settings.get_string("data_dir").ok().map(PathBuf::from);
let config_paths = ConfigPaths::new(data_dir)?;

let local_mode = settings.get_string("network_mode").is_err();

Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -475,6 +476,10 @@ impl<R> Executor<R> {
})
}

pub fn test_data_dir(identifier: &str) -> PathBuf {
std::env::temp_dir().join(format!("freenet-executor-{identifier}"))
}

async fn get_stores(
config: &PeerCliConfig,
) -> Result<
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ impl Executor<MockRuntime> {
identifier: &str,
event_loop_channel: ExecutorToEventLoopChannel<ExecutorHalve>,
) -> Result<Self, DynError> {
let data_dir = std::env::temp_dir().join(format!("freenet-executor-{identifier}"));
let data_dir = Self::test_data_dir(identifier);

let contracts_data_dir = data_dir.join("contracts");
std::fs::create_dir_all(&contracts_data_dir).expect("directory created");
let contract_store = ContractStore::new(contracts_data_dir, u16::MAX as i64)?;

let db_path = data_dir.join("db");
std::fs::create_dir_all(&db_path).expect("directory created");
let log_file = data_dir.join("_EVENT_LOG_LOCAL");
crate::config::Config::set_event_log(log_file);
let state_store =
StateStore::new(Storage::new(Some(&db_path)).await?, u16::MAX as u32).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Transaction {
impl<'a> arbitrary::Arbitrary<'a> for Transaction {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let ty: TransactionTypeId = u.arbitrary()?;
let bytes: u128 = u.arbitrary()?;
let bytes: u128 = Ulid::new().0;
Ok(Self::update(ty.0, Ulid(bytes)))
}
}
Expand Down
12 changes: 8 additions & 4 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,15 @@ impl NodeConfig {
{
use super::tracing::{CombinedRegister, OTEventRegister};
CombinedRegister::new([
Box::new(EventRegister::new()),
Box::new(EventRegister::new(
crate::config::Config::conf().event_log(),
)),
Box::new(OTEventRegister::new()),
])
}
#[cfg(not(feature = "trace-ot"))]
{
EventRegister::new()
EventRegister::new(crate::config::Config::conf().event_log())
}
};
let node = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
Expand Down Expand Up @@ -580,8 +582,10 @@ async fn report_result(
second_trace_lines.join("\n")
})
.unwrap_or_default();
let log =
format!("Transaction ({tx}) error trace:\n {trace} \nstate:\n {state:?}\n");
let peer = &op_manager.ring.peer_key;
let log = format!(
"Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
);
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
#[cfg(not(any(debug_assertions, test)))]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ impl SimNetwork {
/// meaning that:
///
/// - at least 50% of the peers have more than the minimum connections
/// -
/// - the average number of connections per peer is above the mean between max and min connections
pub fn network_connectivity_quality(&self) -> Result<(), anyhow::Error> {
const HIGHER_THAN_MIN_THRESHOLD: f64 = 0.5;
let num_nodes = self.number_of_nodes;
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/node/testing_impl/inter_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ impl SimPeer {
{
use crate::tracing::{CombinedRegister, OTEventRegister};
CombinedRegister::new([
Box::new(EventRegister::new()),
Box::new(EventRegister::new(
crate::config::Config::conf().event_log(),
)),
Box::new(OTEventRegister::new()),
])
}
#[cfg(not(feature = "trace-ot"))]
{
EventRegister::new()
EventRegister::new(crate::config::Config::conf().event_log())
}
};
self.run_node(event_generator, event_register).await
Expand Down
27 changes: 22 additions & 5 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::backtrace::Backtrace as StdTrace;
use std::{pin::Pin, time::Duration};

use freenet_stdlib::prelude::ContractKey;
use futures::{future::BoxFuture, Future};
use tokio::sync::mpsc::error::SendError;

Expand Down Expand Up @@ -300,11 +301,7 @@ impl<T> From<SendError<T>> for OpError {
}

/// If the contract is not found, it will try to get it first if the `try_get` parameter is set.
async fn start_subscription_request(
op_manager: &OpManager,
key: freenet_stdlib::prelude::ContractKey,
try_get: bool,
) {
async fn start_subscription_request(op_manager: &OpManager, key: ContractKey, try_get: bool) {
let sub_op = subscribe::start_op(key.clone());
if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {
if !try_get {
Expand All @@ -322,3 +319,23 @@ async fn start_subscription_request(
}
}
}

async fn has_contract(op_manager: &OpManager, key: ContractKey) -> Result<bool, OpError> {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
key,
fetch_contract: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
..
} => Ok(true),
crate::contract::ContractHandlerEvent::GetResponse {
response: Ok(crate::contract::StoreResponse { state: None, .. }),
..
} => Ok(false),
_ => Err(OpError::UnexpectedOpState),
}
}
51 changes: 37 additions & 14 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,26 +1221,46 @@ mod test {

/// Given a network of one node and one gateway test that both are connected.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn one_node_connects_to_gw() {
let mut sim_nodes = SimNetwork::new("join_one_node_connects_to_gw", 1, 1, 1, 1, 2, 2).await;
sim_nodes.start().await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(sim_nodes.connected(&"node-0".into()));
async fn one_node_connects_to_gw() -> Result<(), anyhow::Error> {
const NUM_NODES: usize = 1usize;
const NUM_GW: usize = 1usize;
const MAX_HTL: usize = 1usize;
const RAND_IF_HTL_ABOVE: usize = 1usize;
const MAX_CONNS: usize = 1usize;
const MIN_CONNS: usize = 1usize;
let mut sim_nw = SimNetwork::new(
"join_one_node_connects_to_gw",
NUM_NODES,
NUM_GW,
MAX_HTL,
RAND_IF_HTL_ABOVE,
MAX_CONNS,
MIN_CONNS,
)
.await;
sim_nw.start().await;
sim_nw.check_connectivity(Duration::from_secs(1))?;
assert!(sim_nw.connected(&"node-1".into()));
Ok(())
}

/// Once a gateway is left without remaining open slots, ensure forwarding connects
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn forward_connection_to_node() -> Result<(), anyhow::Error> {
const NUM_NODES: usize = 3usize;
const NUM_GW: usize = 1usize;
const MAX_HTL: usize = 2usize;
const RAND_IF_HTL_ABOVE: usize = 1usize;
const MAX_CONNS: usize = 2usize;
const MIN_CONNS: usize = 1usize;
let mut sim_nw = SimNetwork::new(
"join_forward_connection_to_node",
NUM_GW,
NUM_NODES,
2,
1,
2,
1,
MAX_HTL,
RAND_IF_HTL_ABOVE,
MAX_CONNS,
MIN_CONNS,
)
.await;
// sim_nw.with_start_backoff(Duration::from_millis(100));
Expand All @@ -1264,17 +1284,20 @@ mod test {
// crate::config::set_logger();
const NUM_NODES: usize = 10usize;
const NUM_GW: usize = 2usize;
const MAX_HTL: usize = 5usize;
const RAND_IF_HTL_ABOVE: usize = 3usize;
const MAX_CONNS: usize = 4usize;
const MIN_CONNS: usize = 2usize;
let mut sim_nw = SimNetwork::new(
"join_all_nodes_should_connect",
NUM_GW,
NUM_NODES,
5,
3,
6,
2,
MAX_HTL,
RAND_IF_HTL_ABOVE,
MAX_CONNS,
MIN_CONNS,
)
.await;
sim_nw.with_start_backoff(Duration::from_millis(100));
sim_nw.start().await;
sim_nw.check_connectivity(Duration::from_secs(10))?;
// wait for a bit so peers can acquire more connections
Expand Down
19 changes: 15 additions & 4 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub(crate) async fn request_get(op_manager: &OpManager, get_op: GetOp) -> Result
retries: 0,
fetch_contract,
requester: None,
current_hop: op_manager.ring.max_hops_to_live,
});

let msg = GetMsg::RequestGet {
Expand Down Expand Up @@ -109,6 +110,7 @@ pub(crate) async fn request_get(op_manager: &OpManager, get_op: GetOp) -> Result
Ok(())
}

#[derive(Debug)]
enum GetState {
/// A new petition for a get op.
ReceivedRequest,
Expand All @@ -124,6 +126,7 @@ enum GetState {
requester: Option<PeerKeyLocation>,
fetch_contract: bool,
retries: usize,
current_hop: usize,
},
}

Expand Down Expand Up @@ -465,7 +468,7 @@ impl Operation for GetOp {
fetch_contract,
retries,
requester,
..
current_hop,
}) => {
// todo: register in the stats for the outcome of the op that failed to get a response from this peer
if retries < MAX_RETRIES {
Expand All @@ -484,7 +487,7 @@ impl Operation for GetOp {
target,
sender: *this_peer,
fetch_contract,
htl: op_manager.ring.max_hops_to_live,
htl: current_hop,
skip_list: new_skip_list.clone(),
});
} else {
Expand All @@ -494,6 +497,7 @@ impl Operation for GetOp {
retries: retries + 1,
fetch_contract,
requester,
current_hop,
});
} else {
tracing::error!(
Expand Down Expand Up @@ -706,7 +710,13 @@ impl Operation for GetOp {
skip_list: skip_list.clone(),
});
}
_ => return Err(OpError::invalid_transition(self.id)),
Some(other) => {
return Err(OpError::invalid_transition_with_state(
self.id,
Box::new(other),
))
}
None => return Err(OpError::invalid_transition(self.id)),
};
}
}
Expand Down Expand Up @@ -784,7 +794,7 @@ async fn try_forward_or_return(

let Some(new_target) = op_manager
.ring
.closest_potentially_caching(&key, [&sender.peer].as_slice())
.closest_potentially_caching(&key, new_skip_list.as_slice())
else {
tracing::warn!(
tx = %id,
Expand All @@ -806,6 +816,7 @@ async fn try_forward_or_return(
requester: Some(sender),
retries: 0,
fetch_contract,
current_hop: new_htl,
}),
Some(GetMsg::SeekNode {
id,
Expand Down
Loading

0 comments on commit c2530c6

Please sign in to comment.