Skip to content

Commit

Permalink
184207088 integrate resource with topology (#916)
Browse files Browse the repository at this point in the history
Co-authored-by: Ignacio Duart <[email protected]>
  • Loading branch information
sanity and iduartgomez authored Jan 3, 2024
1 parent 70bec60 commit 6392daf
Show file tree
Hide file tree
Showing 24 changed files with 1,848 additions and 1,316 deletions.
597 changes: 288 additions & 309 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ futures = "0.3"
rand = { version = "0.8" }
semver = { version = "1.0.14", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1"
serde_with = "3"
tracing = "0.1"
Expand Down
6 changes: 4 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fastrand = { workspace = true }
flatbuffers = "23.5.26"
futures = "0.3.21"
headers = "0.4"
itertools = "0.11"
itertools = "0.12.0"
libp2p = { default-features = false, features = ["autonat", "dns", "ed25519", "identify", "macros", "noise", "ping", "tcp", "tokio", "yamux"], version = "0.52.3" }
libp2p-identity = { features = ["ed25519", "rand"], version = "0.2.7" }
notify = "6"
Expand All @@ -56,12 +56,13 @@ stretto = { features = ["async", "sync"], version = "0.8" }
tar = { version = "0.4.38" }
thiserror = "1"
tokio = { features = ["fs", "macros", "rt-multi-thread", "sync", "process"], version = "1" }
tokio-tungstenite = "0.20"
tokio-tungstenite = "0.21.0"
tower-http = { features = ["fs", "trace"], version = "0.5" }
ulid = { features = ["serde"], version = "1.1" }
unsigned-varint = "0.7"
wasmer = { features = ["sys"], workspace = true }
xz2 = { version = "0.1" }
# enum-iterator = "1.4.1"

# Tracing deps
opentelemetry = "0.21.0"
Expand All @@ -72,6 +73,7 @@ tracing-subscriber = { optional = true, version = "0.3.16" }

# internal deps
freenet-stdlib = { features = ["net"], workspace = true }
time = "0.3.30"

[dev-dependencies]
arbitrary = { features = ["derive"], version = "1" }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{client_events::ClientId, node::PeerCliConfig, wasm_runtime::Runtime,

pub(crate) struct ClientResponsesReceiver(UnboundedReceiver<(ClientId, HostResult)>);

pub fn client_responses_channel() -> (ClientResponsesReceiver, ClientResponsesSender) {
pub(crate) fn client_responses_channel() -> (ClientResponsesReceiver, ClientResponsesSender) {
let (tx, rx) = mpsc::unbounded_channel();
(ClientResponsesReceiver(rx), ClientResponsesSender(tx))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/storages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "sqlite")]
pub use sqlite::{Pool as SqlitePool, SqlDbError};
pub use sqlite::Pool as SqlitePool;

#[cfg(feature = "sqlite")]
pub type Storage = SqlitePool;
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ mod message;
mod node;
/// Network operation/transaction state machines.
mod operations;
/// Resource usage tracking.
mod resources;
/// Ring connections and routing.
mod ring;
/// Router implementation.
Expand Down Expand Up @@ -53,3 +51,6 @@ pub mod dev_tool {
pub use ring::Location;
pub use wasm_runtime::{ContractStore, DelegateStore, Runtime, SecretsStore, StateStore};
}

#[cfg(test)]
pub mod test_utils;
4 changes: 2 additions & 2 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pub(crate) struct Transaction {
impl Transaction {
pub const NULL: &'static Transaction = &Transaction { id: Ulid(0) };

pub fn new<T: TxType>() -> Self {
pub(crate) fn new<T: TxType>() -> Self {
let ty = <T as TxType>::tx_type_id();
let id = Ulid::new();
Self::update(ty.0, id)
// Self { id }
}

pub fn transaction_type(&self) -> TransactionType {
pub(crate) fn transaction_type(&self) -> TransactionType {
let id_byte = (self.id.0 & 0xFFu128) as u8;
match id_byte {
0 => TransactionType::Connect,
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::operations::handle_op_request;
pub use network_bridge::inter_process::InterProcessConnManager;
pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};

use crate::topology::rate::Rate;
pub(crate) use op_state_manager::{OpManager, OpNotAvailable};

mod network_bridge;
Expand Down Expand Up @@ -113,6 +114,8 @@ pub struct NodeConfig {
pub(crate) rnd_if_htl_above: Option<usize>,
pub(crate) max_number_conn: Option<usize>,
pub(crate) min_number_conn: Option<usize>,
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
}

impl NodeConfig {
Expand All @@ -130,6 +133,8 @@ impl NodeConfig {
rnd_if_htl_above: None,
max_number_conn: None,
min_number_conn: None,
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
}
}

Expand Down Expand Up @@ -277,10 +282,9 @@ impl InitPeerNode {
/// Will panic if is not a valid representation.
pub fn decode_peer_id<T: AsMut<[u8]>>(mut bytes: T) -> Libp2pPeerId {
Libp2pPeerId::from_public_key(
&identity::Keypair::try_from(
&identity::Keypair::from(
identity::ed25519::Keypair::try_from_bytes(bytes.as_mut()).unwrap(),
)
.unwrap()
.public(),
)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/network_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ impl Clone for ConnectionError {
}
}

pub fn event_loop_notification_channel(
pub(crate) fn event_loop_notification_channel(
) -> (EventLoopNotificationsReceiver, EventLoopNotificationsSender) {
let (notification_tx, notification_rx) = mpsc::channel(100);
(
EventLoopNotificationsReceiver(notification_rx),
EventLoopNotificationsSender(notification_tx),
)
}
pub(super) struct EventLoopNotificationsReceiver(Receiver<Either<NetMessage, NodeEvent>>);
pub(crate) struct EventLoopNotificationsReceiver(Receiver<Either<NetMessage, NodeEvent>>);

impl Deref for EventLoopNotificationsReceiver {
type Target = Receiver<Either<NetMessage, NodeEvent>>;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ impl OpManager {
/// Notify the operation manager that a transaction is being transacted over the network.
pub fn sending_transaction(&self, peer: &PeerId, msg: &NetMessage) {
let transaction = msg.id();
if let Some(loc) = msg.requested_location() {
if let (Some(recipient), Some(target)) = (msg.target(), msg.requested_location()) {
self.ring
.record_request(loc, transaction.transaction_type());
.record_request(*recipient, target, transaction.transaction_type());
}
self.ring
.live_tx_tracker
Expand Down
Loading

0 comments on commit 6392daf

Please sign in to comment.