Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#185908936 - Connect router to ring manager #868

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ config = { version = "0.13.0", features = [ "toml" ] }
crossbeam = "0.8.2"
ctrlc = { version = "3.4", features = ["termination"] }
dashmap = "^5.5"
delegate = "0.10"
directories = "5"
either = { workspace = true , features = ["serde"] }
futures = "0.3.21"
Expand Down
32 changes: 29 additions & 3 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct ConfigPaths {
secrets_dir: PathBuf,
db_dir: PathBuf,
app_data_dir: PathBuf,
event_log: PathBuf,
}

impl ConfigPaths {
Expand Down Expand Up @@ -122,12 +123,21 @@ impl ConfigPaths {
fs::create_dir_all(db_dir.join("local"))?;
}

let event_log = app_data_dir.join("_EVENT_LOG");
iduartgomez marked this conversation as resolved.
Show resolved Hide resolved
if !event_log.exists() {
fs::write(&event_log, [])?;
let mut local_file = event_log.clone();
local_file.set_file_name("_EVENT_LOG_LOCAL");
fs::write(local_file, [])?;
}

Ok(Self {
contracts_dir,
delegates_dir,
secrets_dir,
db_dir,
app_data_dir,
event_log,
})
}
}
Expand All @@ -139,7 +149,7 @@ impl Config {

pub fn set_op_mode(mode: OperationMode) {
let local_mode = matches!(mode, OperationMode::Local);
Self::get_static_conf()
Self::conf()
.local_mode
.store(local_mode, std::sync::atomic::Ordering::SeqCst);
}
Expand Down Expand Up @@ -176,8 +186,24 @@ impl Config {
}
}

pub fn get_static_conf() -> &'static Config {
CONFIG.get_or_init(|| Config::load_conf().expect("Failed to load configuration"))
pub fn event_log(&self) -> PathBuf {
if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) {
let mut local_file = self.config_paths.event_log.clone();
local_file.set_file_name("_EVENT_LOG_LOCAL");
local_file
} else {
self.config_paths.event_log.to_owned()
}
}

pub fn conf() -> &'static Config {
CONFIG.get_or_init(|| match Config::load_conf() {
Ok(config) => config,
Err(err) => {
tracing::error!("failed while loading configuration: {err}");
panic!("Failed while loading configuration")
}
})
}

fn load_conf() -> std::io::Result<Config> {
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::runtime::{
};
use crate::{
client_events::{ClientId, HostResult},
node::{NodeConfig, P2pBridge},
node::NodeConfig,
operations::{self, op_trait::Operation},
DynError,
};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
async fn send_to_event_loop<Op, T>(&mut self, message: T) -> Result<(), DynError>
where
T: ComposeNetworkMessage<Op>,
Op: Operation<P2pBridge> + Send + 'static,
Op: Operation + Send + 'static,
{
let op = message.initiate_op(&self.op_manager);
self.end.sender.send(*op.id()).await?;
Expand Down Expand Up @@ -178,7 +178,7 @@ mod sealed {
trait ComposeNetworkMessage<Op>
where
Self: Sized,
Op: Operation<P2pBridge> + Send + 'static,
Op: Operation + Send + 'static,
{
fn initiate_op(self, op_manager: &OpManager) -> Op {
todo!()
Expand All @@ -205,7 +205,7 @@ impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
op: operations::get::GetOp,
op_manager: &OpManager,
) -> Result<Transaction, OpError> {
let id = *<operations::get::GetOp as Operation<P2pBridge>>::id(&op);
let id = *op.id();
operations::get::request_get(op_manager, op, None).await?;
Ok(id)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ impl Executor<Runtime> {
// dependencies to be resolved
async fn op_request<Op, M>(&mut self, request: M) -> Result<Op, DynError>
where
Op: Operation<P2pBridge> + Send + 'static,
Op: Operation + Send + 'static,
M: ComposeNetworkMessage<Op>,
{
debug_assert!(self.event_loop_channel.is_some());
Expand All @@ -342,7 +342,7 @@ impl Executor<Runtime> {
pub async fn from_config(config: NodeConfig) -> Result<Self, DynError> {
const MAX_SIZE: i64 = 10 * 1024 * 1024;
const MAX_MEM_CACHE: u32 = 10_000_000;
let static_conf = crate::config::Config::get_static_conf();
let static_conf = crate::config::Config::conf();

let state_store = StateStore::new(Storage::new().await?, MAX_MEM_CACHE).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ where
_kv_store: StateStore::new(kv_store, 10_000_000).unwrap(),
_runtime: MockRuntime {
contract_store: ContractStore::new(
Config::get_static_conf().contracts_dir(),
Config::conf().contracts_dir(),
Self::MAX_MEM_CACHE,
)
.unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/storages/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct RocksDb(DB);
impl RocksDb {
#[cfg_attr(feature = "sqlite", allow(unused))]
pub async fn new() -> Result<Self, rocksdb::Error> {
let path = Config::get_static_conf().db_dir().join("freenet.db");
let path = Config::conf().db_dir().join("freenet.db");
tracing::info!("loading contract store from {path:?}");

let mut opts = Options::default();
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/storages/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ static POOL: Lazy<SqlitePool> = Lazy::new(|| {
let opts = if cfg!(test) {
SqliteConnectOptions::from_str("sqlite::memory:").unwrap()
} else {
let conn_str = Config::get_static_conf().db_dir().join("freenet.db");
let conn_str = Config::conf().db_dir().join("freenet.db");
tracing::info!("loading contract store from {conn_str:?}");
SqliteConnectOptions::new()
.create_if_missing(true)
Expand Down
22 changes: 9 additions & 13 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! Main message type which encapsulated all the messaging between nodes.

use std::{
fmt::Display,
time::{Duration, SystemTime},
};
use std::{fmt::Display, time::Duration};

use serde::{Deserialize, Serialize};
use uuid::{
Expand Down Expand Up @@ -45,13 +42,7 @@ static UUID_CONTEXT: Context = Context::new(14);
impl Transaction {
pub fn new(ty: TransactionTypeId, initial_peer: &PeerKey) -> Transaction {
// using v1 UUID to keep to keep track of the creation ts
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("infallible");
let now_secs = now.as_secs();
let now_nanos = now.as_nanos();
let now_nanos = now_nanos - (now_secs as u128 * 1_000_000_000);
let ts = Timestamp::from_unix(&UUID_CONTEXT, now_secs, now_nanos as u32);
let ts: Timestamp = uuid::timestamp::Timestamp::now(&UUID_CONTEXT);

// event in the net this UUID should be unique since peer keys are unique
// however some id collision may be theoretically possible if two transactions
Expand All @@ -61,8 +52,8 @@ impl Transaction {
let b = &mut [0; 6];
b.copy_from_slice(&initial_peer.to_bytes()[0..6]);
let id = Uuid::new_v1(ts, b);
// 2 word size for 64-bits platforms most likely since msg type
// probably will be aligned to 64 bytes

// 3 word size for 64-bits platforms
Self { id, ty }
}

Expand Down Expand Up @@ -231,6 +222,11 @@ impl Message {
Canceled(_) => true,
}
}

pub fn track_stats(&self) -> bool {
use Message::*;
!matches!(self, JoinRing(_) | Subscribe(_) | Canceled(_))
}
}

impl Display for Message {
Expand Down
Loading