From 743b6ad6c62227a85c0f7224b9ebc38346ff13a5 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 22 Sep 2023 16:50:36 +0200 Subject: [PATCH] Join executor to network ops --- Cargo.lock | 153 +++++----- crates/core/src/client_events.rs | 4 +- crates/core/src/client_events/combinator.rs | 12 +- crates/core/src/contract.rs | 51 ++-- crates/core/src/contract/executor.rs | 269 +++++++++++++----- crates/core/src/contract/handler.rs | 167 ++++++++--- crates/core/src/contract/in_memory.rs | 15 +- crates/core/src/contract/storages/rocks_db.rs | 6 +- crates/core/src/contract/storages/sqlite.rs | 13 +- crates/core/src/message.rs | 18 +- crates/core/src/node.rs | 268 ++++++++++------- crates/core/src/node/conn_manager.rs | 47 ++- .../core/src/node/conn_manager/p2p_protoc.rs | 63 +++- .../node/{event_listener.rs => event_log.rs} | 12 +- crates/core/src/node/in_memory_impl.rs | 58 ++-- crates/core/src/node/op_state.rs | 60 ++-- crates/core/src/node/p2p_impl.rs | 42 ++- crates/core/src/node/tests.rs | 2 +- crates/core/src/operations.rs | 46 ++- crates/core/src/operations/get.rs | 83 ++++-- crates/core/src/operations/join_ring.rs | 6 + crates/core/src/operations/op_trait.rs | 2 + crates/core/src/operations/put.rs | 57 +++- crates/core/src/operations/state_machine.rs | 110 ------- crates/core/src/operations/subscribe.rs | 20 +- crates/core/src/operations/update.rs | 6 +- crates/core/src/server/mod.rs | 2 +- stdlib | 2 +- 28 files changed, 1009 insertions(+), 585 deletions(-) rename crates/core/src/node/{event_listener.rs => event_log.rs} (96%) delete mode 100644 crates/core/src/operations/state_machine.rs diff --git a/Cargo.lock b/Cargo.lock index a4a7d79f0..1e7b6b08c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,9 +87,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f2135563fb5c609d2b2b87c1e8ce7bc41b0b45430fa9661f457981503dd5bf0" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" dependencies = [ "memchr", ] @@ -313,9 +313,9 @@ checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" [[package]] name = "atomic-waker" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" @@ -481,9 +481,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "199c42ab6972d92c9f8995f086273d25c42fc0f7b2a1fcefba465c1352d25ba5" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" dependencies = [ "arrayref", "arrayvec", @@ -690,9 +690,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.4" +version = "4.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" +checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" dependencies = [ "clap_builder", "clap_derive", @@ -700,9 +700,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.4" +version = "4.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" +checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" dependencies = [ "anstream", "anstyle", @@ -736,9 +736,9 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" dependencies = [ "crossbeam-utils", ] @@ -1080,9 +1080,9 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.0" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622178105f911d937a42cdb140730ba4a3ed2becd8ae6ce39c7d28b5d75d4588" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" dependencies = [ "cfg-if", "cpufeatures", @@ -1323,7 +1323,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7277392b266383ef8396db7fdeb1e77b6c52fed775f5df15bb24f35b72156980" dependencies = [ - "curve25519-dalek 4.1.0", + "curve25519-dalek 4.1.1", "ed25519", "rand_core 0.6.4", "serde", @@ -1454,9 +1454,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fdev" @@ -2440,9 +2440,9 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.40.0" +version = "0.40.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef7dd7b09e71aac9271c60031d0e558966cdb3253ba0308ab369bb2de80630d0" +checksum = "dd44289ab25e4c9230d9246c475a22241e301b23e8f4061d3bdef304a1a99713" dependencies = [ "either", "fnv", @@ -2575,7 +2575,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71ce70757f2c0d82e9a3ef738fb10ea0723d16cec37f078f719e2c247704c1bb" dependencies = [ "bytes", - "curve25519-dalek 4.1.0", + "curve25519-dalek 4.1.1", "futures", "libp2p-core", "libp2p-identity", @@ -2595,9 +2595,9 @@ dependencies = [ [[package]] name = "libp2p-ping" -version = "0.43.0" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd5ee3270229443a2b34b27ed0cb7470ef6b4a6e45e54e89a8771fa683bab48" +checksum = "e702d75cd0827dfa15f8fd92d15b9932abe38d10d21f47c50438c71dd1b5dae3" dependencies = [ "either", "futures", @@ -2654,9 +2654,9 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.43.3" +version = "0.43.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28016944851bd73526d3c146aabf0fa9bbe27c558f080f9e5447da3a1772c01a" +checksum = "f0cf749abdc5ca1dce6296dc8ea0f012464dfcfd3ddd67ffc0cabd8241c4e1da" dependencies = [ "either", "fnv", @@ -2869,16 +2869,17 @@ checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "md-5" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ + "cfg-if", "digest 0.10.7", ] @@ -3406,9 +3407,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parking" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" [[package]] name = "parking_lot" @@ -3487,9 +3488,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" +checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" dependencies = [ "memchr", "thiserror", @@ -3498,9 +3499,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bee7be22ce7918f641a33f08e3f43388c7656772244e2bbb2477f44cc9021a" +checksum = "35513f630d46400a977c4cb58f78e1bfbe01434316e60c37d27b9ad6139c66d8" dependencies = [ "pest", "pest_generator", @@ -3508,9 +3509,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1511785c5e98d79a05e8a6bc34b4ac2168a0e3e92161862030ad84daa223141" +checksum = "bc9fc1b9e7057baba189b5c626e2d6f40681ae5b6eb064dc7c7834101ec8123a" dependencies = [ "pest", "pest_meta", @@ -3521,9 +3522,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42f0394d3123e33353ca5e1e89092e533d2cc490389f2bd6131c43c634ebc5f" +checksum = "1df74e9e7ec4053ceb980e7c0c8bd3594e977fde1af91daba9c928e8e8c6708d" dependencies = [ "once_cell", "pest", @@ -3802,9 +3803,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.10.4" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13f81c9a9d574310b8351f8666f5a93ac3b0069c45c28ad52c10291389a7cf9" +checksum = "2c78e758510582acc40acb90458401172d41f1016f8c9dde89e49677afb7eec1" dependencies = [ "bytes", "rand", @@ -3886,9 +3887,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" dependencies = [ "either", "rayon-core", @@ -3896,14 +3897,12 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" dependencies = [ - "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "num_cpus", ] [[package]] @@ -4017,9 +4016,9 @@ dependencies = [ [[package]] name = "rend" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab" +checksum = "a2571463863a6bd50c32f94402933f03457a3fbaf697a707c5be741e459f08fd" dependencies = [ "bytecheck", ] @@ -4192,9 +4191,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.13" +version = "0.38.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" +checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" dependencies = [ "bitflags 2.4.0", "errno", @@ -4226,9 +4225,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.5" +version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" +checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ "ring", "untrusted", @@ -4305,9 +4304,9 @@ checksum = "4c309e515543e67811222dbc9e3dd7e1056279b782e1dacffe4242b718734fb6" [[package]] name = "semver" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" dependencies = [ "serde", ] @@ -4426,9 +4425,9 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", @@ -4524,9 +4523,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "snow" @@ -4537,7 +4536,7 @@ dependencies = [ "aes-gcm", "blake2", "chacha20poly1305 0.9.1", - "curve25519-dalek 4.1.0", + "curve25519-dalek 4.1.1", "rand_core 0.6.4", "ring", "rustc_version", @@ -4944,9 +4943,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand 2.0.0", + "fastrand 2.0.1", "redox_syscall 0.3.5", - "rustix 0.38.13", + "rustix 0.38.14", "windows-sys 0.48.0", ] @@ -5004,9 +5003,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "itoa", @@ -5017,15 +5016,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -5088,9 +5087,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", @@ -5100,9 +5099,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -5361,9 +5360,9 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tungstenite" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" dependencies = [ "byteorder", "bytes", @@ -5546,9 +5545,9 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "waker-fn" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" [[package]] name = "walkdir" @@ -5891,9 +5890,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ "winapi", ] diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 3195bdd42..b8574ed32 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -93,7 +93,7 @@ impl From for AuthToken { #[non_exhaustive] pub struct OpenRequest<'a> { - pub id: ClientId, + pub client_id: ClientId, pub request: Box>, pub notification_channel: Option>, pub token: Option, @@ -109,7 +109,7 @@ impl<'a> OpenRequest<'a> { pub fn new(id: ClientId, request: Box>) -> Self { Self { - id, + client_id: id, request, notification_channel: None, token: None, diff --git a/crates/core/src/client_events/combinator.rs b/crates/core/src/client_events/combinator.rs index 44ea7a2c8..f528c39ca 100644 --- a/crates/core/src/client_events/combinator.rs +++ b/crates/core/src/client_events/combinator.rs @@ -84,7 +84,7 @@ impl super::ClientEventsProxy for ClientEventsCombinator { .map(|res| { match res { Ok(OpenRequest { - id: external, + client_id: external, request, notification_channel, token, @@ -103,7 +103,7 @@ impl super::ClientEventsProxy for ClientEventsCombinator { }); Ok(OpenRequest { - id, + client_id: id, request, notification_channel, token, @@ -162,9 +162,9 @@ async fn client_fn( } client_msg = client.recv() => { match client_msg { - Ok(OpenRequest {id, request, notification_channel, token}) => { - tracing::debug!("received msg @ combinator from external id {id}, msg: {request}"); - if tx_host.send(Ok(OpenRequest { id, request, notification_channel, token })).await.is_err() { + Ok(OpenRequest { client_id, request, notification_channel, token }) => { + tracing::debug!("received msg @ combinator from external id {client_id}, msg: {request}"); + if tx_host.send(Ok(OpenRequest { client_id, request, notification_channel, token })).await.is_err() { break; } } @@ -308,7 +308,7 @@ mod test { .unwrap(); for i in 0..3 { - let OpenRequest { id, .. } = combinator.recv().await.unwrap(); + let OpenRequest { client_id: id, .. } = combinator.recv().await.unwrap(); eprintln!("received: {id:?}"); assert_eq!(ClientId::new(i), id); } diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 2d6b48a9d..ef3d77fdd 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -7,30 +7,32 @@ mod handler; mod in_memory; pub mod storages; +pub(crate) use executor::{ + executor_channel, ExecutorToEventLoopChannel, NetworkEventListenerHalve, +}; pub(crate) use handler::{ - contract_handler_channel, CHSenderHalve, ContractHandler, ContractHandlerChannel, - ContractHandlerEvent, NetworkContractHandler, StoreResponse, + contract_handler_channel, ClientResponses, ClientResponsesSender, ContractHandler, + ContractHandlerEvent, ContractHandlerToEventLoopChannel, EventId, NetEventListener, + NetworkContractHandler, StoreResponse, }; #[cfg(test)] pub(crate) use in_memory::{MemoryContractHandler, MockRuntime}; -use executor::ContractExecutor; pub use executor::{Executor, ExecutorError, OperationMode}; +use executor::ContractExecutor; + pub(crate) async fn contract_handling<'a, CH>(mut contract_handler: CH) -> Result<(), ContractError> where CH: ContractHandler + Send + 'static, { loop { - let res = contract_handler.channel().recv_from_listener().await?; - match res { - ( - _id, - ContractHandlerEvent::GetQuery { - key, - fetch_contract, - }, - ) => { + let (id, event) = contract_handler.channel().recv_from_event_loop().await?; + match event { + ContractHandlerEvent::GetQuery { + key, + fetch_contract, + } => { match contract_handler .executor() .fetch_contract(key.clone(), fetch_contract) @@ -39,8 +41,8 @@ where Ok((state, contract)) => { contract_handler .channel() - .send_to_listener( - _id, + .send_to_event_loop( + id, ContractHandlerEvent::GetResponse { key, response: Ok(StoreResponse { @@ -55,8 +57,8 @@ where tracing::warn!("error while executing get contract query: {err}"); contract_handler .channel() - .send_to_listener( - _id, + .send_to_event_loop( + id, ContractHandlerEvent::GetResponse { key, response: Err(err.into()), @@ -66,30 +68,27 @@ where } } } - (id, ContractHandlerEvent::Cache(contract)) => { + ContractHandlerEvent::Cache(contract) => { match contract_handler.executor().store_contract(contract).await { Ok(_) => { contract_handler .channel() - .send_to_listener(id, ContractHandlerEvent::CacheResult(Ok(()))) + .send_to_event_loop(id, ContractHandlerEvent::CacheResult(Ok(()))) .await?; } Err(err) => { let err = ContractError::ContractRuntimeError(err); contract_handler .channel() - .send_to_listener(id, ContractHandlerEvent::CacheResult(Err(err))) + .send_to_event_loop(id, ContractHandlerEvent::CacheResult(Err(err))) .await?; } } } - ( - _id, - ContractHandlerEvent::PutQuery { - key: _key, - state: _state, - }, - ) => { + ContractHandlerEvent::PutQuery { + key: _key, + state: _state, + } => { // let _put_result = contract_handler // .handle_request(ClientRequest::Put { // contract: todo!(), diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 1c1e73c48..01236c011 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::fmt::Display; -use std::hint::unreachable_unchecked; +use std::sync::Arc; use std::time::{Duration, Instant}; use blake3::traits::digest::generic_array::GenericArray; @@ -14,21 +14,24 @@ use freenet_stdlib::client_api::{ RequestError, }; use freenet_stdlib::prelude::*; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{self}; +use crate::message::Transaction; +use crate::node::OpManager; #[cfg(any( not(feature = "local-mode"), feature = "network-mode", all(not(feature = "local-mode"), not(feature = "network-mode")) ))] use crate::operations::get::GetResult; +use crate::operations::{OpEnum, OpError}; use crate::runtime::{ ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface, DelegateStore, Runtime, SecretsStore, StateStore, StateStoreError, }; use crate::{ client_events::{ClientId, HostResult}, - node::{NodeConfig, OpManager, P2pBridge}, + node::{NodeConfig, P2pBridge}, operations::{self, op_trait::Operation}, DynError, }; @@ -98,27 +101,90 @@ pub enum OperationMode { Network, } -#[cfg(any( - not(feature = "local-mode"), - feature = "network-mode", - all(not(feature = "local-mode"), not(feature = "network-mode")) -))] -// for now just mocking up requests to the network -async fn op_request(_request: M) -> Result -where - Op: Operation, - M: ComposeNetworkMessage, -{ +pub(crate) struct ExecutorToEventLoopChannel { + op_manager: Arc, + end: End, +} + +pub(crate) fn executor_channel( + op_manager: Arc, +) -> ( + ExecutorToEventLoopChannel, + ExecutorToEventLoopChannel, +) { + let (sender, _b) = mpsc::channel(1); + let listener_halve = ExecutorToEventLoopChannel { + op_manager: op_manager.clone(), + end: NetworkEventListenerHalve, + }; + let sender_halve = ExecutorToEventLoopChannel { + op_manager: op_manager.clone(), + end: ExecutorHalve { sender }, + }; + (listener_halve, sender_halve) +} + +#[cfg(test)] +pub(crate) fn executor_channel_test() -> ( + ExecutorToEventLoopChannel, + ExecutorToEventLoopChannel, +) { todo!() } +impl ExecutorToEventLoopChannel { + async fn send_to_event_loop(&mut self, message: T) -> Result<(), DynError> + where + T: ComposeNetworkMessage, + Op: Operation + Send + 'static, + { + let op = message.initiate_op(&self.op_manager); + self.end.sender.send(*op.id()).await?; + >::resume_op(op, &self.op_manager).await?; + Ok(()) + } +} + +impl ExecutorToEventLoopChannel { + pub async fn transaction_from_executor(&mut self) -> Transaction { + todo!() + } + + pub async fn response(&mut self, _result: OpEnum) { + todo!() + } +} + +impl Clone for ExecutorToEventLoopChannel { + fn clone(&self) -> Self { + todo!() + } +} + +pub(crate) struct NetworkEventListenerHalve; +pub(crate) struct ExecutorHalve { + sender: mpsc::Sender, +} + +mod sealed { + use super::{ExecutorHalve, NetworkEventListenerHalve}; + pub(crate) trait ChannelHalve {} + impl ChannelHalve for NetworkEventListenerHalve {} + impl ChannelHalve for ExecutorHalve {} +} + #[allow(unused)] +#[async_trait::async_trait] trait ComposeNetworkMessage where Self: Sized, - Op: Operation, + Op: Operation + Send + 'static, { - fn get_message(self, manager: &OpManager) -> Op::Message { + fn initiate_op(self, op_manager: &OpManager) -> Op { + todo!() + } + + async fn resume_op(op: Op, op_manager: &OpManager) -> Result { todo!() } } @@ -129,7 +195,21 @@ struct GetContract { fetch_contract: bool, } -impl ComposeNetworkMessage for GetContract {} +#[async_trait::async_trait] +impl ComposeNetworkMessage for GetContract { + fn initiate_op(self, op_manager: &OpManager) -> operations::get::GetOp { + operations::get::start_op(self.key, self.fetch_contract, &op_manager.ring.peer_key) + } + + async fn resume_op( + op: operations::get::GetOp, + op_manager: &OpManager, + ) -> Result { + let id = *>::id(&op); + operations::get::request_get(op_manager, op, None).await?; + Ok(id) + } +} #[allow(unused)] struct SubscribeContract { @@ -172,9 +252,90 @@ pub struct Executor { mode: OperationMode, runtime: R, state_store: StateStore, - update_notifications: HashMap)>>, + update_notifications: HashMap)>>, subscriber_summaries: HashMap>>>, delegate_attested_ids: HashMap>, + #[cfg(any( + not(feature = "local-mode"), + feature = "network-mode", + all(not(feature = "local-mode"), not(feature = "network-mode")) + ))] + event_loop_channel: Option>, +} + +#[cfg(any( + not(feature = "local-mode"), + feature = "network-mode", + all(not(feature = "local-mode"), not(feature = "network-mode")) +))] +impl Executor { + pub(crate) fn event_loop_channel( + &mut self, + channel: ExecutorToEventLoopChannel, + ) { + self.event_loop_channel = Some(channel); + } + + async fn subscribe(&mut self, key: ContractKey) -> Result<(), ExecutorError> { + #[cfg(any( + all(not(feature = "local-mode"), not(feature = "network-mode")), + all(feature = "local-mode", feature = "network-mode") + ))] + { + if self.mode == OperationMode::Local { + return Ok(()); + } + } + let request = SubscribeContract { key }; + let op: operations::subscribe::SubscribeOp = self + .op_request(request) + .await + .map_err(ExecutorError::other)?; + let _sub: operations::subscribe::SubscribeResult = + op.try_into().map_err(ExecutorError::other)?; + Ok(()) + } + + #[inline] + async fn local_state_or_from_network( + &mut self, + id: &ContractInstanceId, + ) -> Result, ExecutorError> { + if let Ok(contract) = self.state_store.get(&(*id).into()).await { + return Ok(Either::Left(contract)); + }; + let request: GetContract = GetContract { + key: (*id).into(), + fetch_contract: true, + }; + let op: operations::get::GetOp = self + .op_request(request) + .await + .map_err(ExecutorError::other)?; + let get_result: operations::get::GetResult = op.try_into().map_err(ExecutorError::other)?; + Ok(Either::Right(get_result)) + } + + // FIXME: must add suspension and resuming when doing this, + // otherwise it may be possible to end up in a deadlock waiting for a tree of contract + // dependencies to be resolved + async fn op_request(&mut self, request: M) -> Result + where + Op: Operation + Send + 'static, + M: ComposeNetworkMessage, + { + debug_assert!(self.event_loop_channel.is_some()); + let channel = match self.event_loop_channel.as_mut() { + Some(ch) => ch, + None => { + // Safety: this should be always set if network mode is ambiguous + // or using network mode unequivocally + unsafe { std::hint::unreachable_unchecked() } + } + }; + channel.send_to_event_loop(request).await?; + todo!() + } } impl Executor { @@ -241,6 +402,12 @@ impl Executor { update_notifications: HashMap::default(), subscriber_summaries: HashMap::default(), delegate_attested_ids: HashMap::default(), + #[cfg(any( + not(feature = "local-mode"), + feature = "network-mode", + all(not(feature = "local-mode"), not(feature = "network-mode")) + ))] + event_loop_channel: None, }) } @@ -309,7 +476,7 @@ impl Executor { &mut self, id: ClientId, req: ClientRequest<'a>, - updates: Option>>, + updates: Option>>, ) -> Response { match req { ClientRequest::ContractOp(op) => self.contract_requests(op, id, updates).await, @@ -329,7 +496,7 @@ impl Executor { &mut self, req: ContractRequest<'_>, cli_id: ClientId, - updates: Option>>, + updates: Option>>, ) -> Response { match req { ContractRequest::Put { @@ -594,6 +761,7 @@ impl Executor { let state = match self.local_state_or_from_network(&id).await? { Either::Left(state) => state, Either::Right(GetResult { state, contract }) => { + let contract = contract.unwrap(); // fixme: deal with unwrap self.verify_and_store_contract( state.clone(), contract, @@ -680,8 +848,10 @@ impl Executor { key: key.clone(), new_state, }; - let op: operations::update::UpdateOp = - op_request(request).await.map_err(ExecutorError::other)?; + let op: operations::update::UpdateOp = self + .op_request(request) + .await + .map_err(ExecutorError::other)?; let _update: operations::update::UpdateResult = op.try_into().map_err(ExecutorError::other)?; } @@ -789,6 +959,7 @@ impl Executor { .await? { Either::Right(GetResult { state, contract }) => { + let contract = contract.unwrap(); // fixme: deal with unwrap self.verify_and_store_contract( state, contract.clone(), @@ -803,29 +974,6 @@ impl Executor { } } - #[cfg(any( - not(feature = "local-mode"), - feature = "network-mode", - all(not(feature = "local-mode"), not(feature = "network-mode")) - ))] - async fn subscribe(&self, key: ContractKey) -> Result<(), ExecutorError> { - #[cfg(any( - all(not(feature = "local-mode"), not(feature = "network-mode")), - all(feature = "local-mode", feature = "network-mode") - ))] - { - if self.mode == OperationMode::Local { - return Ok(()); - } - } - let request = SubscribeContract { key }; - let op: operations::subscribe::SubscribeOp = - op_request(request).await.map_err(ExecutorError::other)?; - let _sub: operations::subscribe::SubscribeResult = - op.try_into().map_err(ExecutorError::other)?; - Ok(()) - } - async fn get_local_contract( &self, id: &ContractInstanceId, @@ -908,10 +1056,11 @@ impl Executor { *related = Some(state.into()); } Either::Right(result) => { + let contract = result.contract.unwrap(); // fixme: deal with unwrap trying_key = (*id).into(); - trying_params = result.contract.params(); + trying_params = contract.params(); trying_state = result.state; - trying_contract = Some(result.contract); + trying_contract = Some(contract); continue; } } @@ -1004,28 +1153,6 @@ impl Executor { Ok(()) } - #[cfg(any( - not(feature = "local-mode"), - feature = "network-mode", - all(not(feature = "local-mode"), not(feature = "network-mode")), - ))] - #[inline] - async fn local_state_or_from_network( - &self, - id: &ContractInstanceId, - ) -> Result, ExecutorError> { - if let Ok(contract) = self.state_store.get(&(*id).into()).await { - return Ok(Either::Left(contract)); - }; - let request: GetContract = GetContract { - key: (*id).into(), - fetch_contract: true, - }; - let op: operations::get::GetOp = op_request(request).await.map_err(ExecutorError::other)?; - let get_result: operations::get::GetResult = op.try_into().map_err(ExecutorError::other)?; - Ok(Either::Right(get_result)) - } - async fn get_contract_locally( &self, key: &ContractKey, @@ -1055,7 +1182,7 @@ impl Executor { &mut self, _id: ClientId, _req: ClientRequest<'a>, - _updates: Option>>, + _updates: Option>>, ) -> Response { todo!() } @@ -1077,7 +1204,7 @@ impl ContractExecutor for Executor { Err(err) => Err(err), Ok(_) => { // Safety: check `perform_contract_get` to indeed check this should never happen - unsafe { unreachable_unchecked() } + unsafe { std::hint::unreachable_unchecked() } } } } diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index b56d45f62..311916723 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -1,6 +1,7 @@ #![allow(unused)] // FIXME: remove this -use std::collections::VecDeque; +use std::collections::{BTreeMap, VecDeque}; +use std::hash::Hash; use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering::SeqCst}; use std::time::{Duration, Instant}; @@ -9,12 +10,16 @@ use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse}; use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use super::executor::{ExecutorHalve, ExecutorToEventLoopChannel}; use super::{ executor::{ContractExecutor, Executor}, ContractError, }; +use crate::client_events::HostResult; +use crate::message::Transaction; +use crate::node::OpManager; use crate::{ client_events::ClientId, node::NodeConfig, @@ -24,18 +29,53 @@ use crate::{ pub const MAX_MEM_CACHE: i64 = 10_000_000; +pub(crate) struct ClientResponses(UnboundedReceiver<(ClientId, HostResult)>); + +impl ClientResponses { + pub fn channel() -> (Self, ClientResponsesSender) { + let (tx, rx) = mpsc::unbounded_channel(); + (Self(rx), ClientResponsesSender(tx)) + } +} + +impl std::ops::Deref for ClientResponses { + type Target = UnboundedReceiver<(ClientId, HostResult)>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for ClientResponses { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Clone)] +pub(crate) struct ClientResponsesSender(UnboundedSender<(ClientId, HostResult)>); + +impl std::ops::Deref for ClientResponsesSender { + type Target = UnboundedSender<(ClientId, HostResult)>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + pub(crate) trait ContractHandler { type Builder; type ContractExecutor: ContractExecutor; fn build( - channel: ContractHandlerChannel, + contract_handler_channel: ContractHandlerToEventLoopChannel, + executor_request_sender: ExecutorToEventLoopChannel, builder: Self::Builder, ) -> BoxFuture<'static, Result> where Self: Sized + 'static; - fn channel(&mut self) -> &mut ContractHandlerChannel; + fn channel(&mut self) -> &mut ContractHandlerToEventLoopChannel; /// # Arguments /// - updates: channel to send back updates from contracts to whoever is subscribed to the contract. @@ -51,7 +91,7 @@ pub(crate) trait ContractHandler { pub(crate) struct NetworkContractHandler { executor: Executor, - channel: ContractHandlerChannel, + channel: ContractHandlerToEventLoopChannel, } impl ContractHandler for NetworkContractHandler { @@ -59,20 +99,22 @@ impl ContractHandler for NetworkContractHandler { type ContractExecutor = Executor; fn build( - channel: ContractHandlerChannel, + channel: ContractHandlerToEventLoopChannel, + executor_request_sender: ExecutorToEventLoopChannel, config: Self::Builder, ) -> BoxFuture<'static, Result> where Self: Sized + 'static, { async { - let executor = Executor::from_config(config).await?; + let mut executor = Executor::from_config(config).await?; + executor.event_loop_channel(executor_request_sender); Ok(Self { executor, channel }) } .boxed() } - fn channel(&mut self) -> &mut ContractHandlerChannel { + fn channel(&mut self) -> &mut ContractHandlerToEventLoopChannel { &mut self.channel } @@ -103,7 +145,8 @@ impl ContractHandler for NetworkContractHandler { type ContractExecutor = Executor; fn build( - channel: ContractHandlerChannel, + channel: ContractHandlerToEventLoopChannel, + _executor_request_sender: ExecutorToEventLoopChannel, _builder: Self::Builder, ) -> BoxFuture<'static, Result> where @@ -116,7 +159,7 @@ impl ContractHandler for NetworkContractHandler { .boxed() } - fn channel(&mut self) -> &mut ContractHandlerChannel { + fn channel(&mut self) -> &mut ContractHandlerToEventLoopChannel { &mut self.channel } @@ -141,46 +184,66 @@ impl ContractHandler for NetworkContractHandler { } } -pub struct EventId(u64); +#[derive(Eq)] +pub(crate) struct EventId { + id: u64, + client_id: Option, +} + +impl EventId { + pub fn client_id(&self) -> Option { + self.client_id + } +} + +impl PartialEq for EventId { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Hash for EventId { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} /// A bidirectional channel which keeps track of the initiator half /// and sends the corresponding response to the listener of the operation. -pub(crate) struct ContractHandlerChannel { +pub(crate) struct ContractHandlerToEventLoopChannel { rx: mpsc::UnboundedReceiver, tx: mpsc::UnboundedSender, - //TODO: change queue to btree once pop_first is stabilized - // (https://github.com/rust-lang/rust/issues/62924) - queue: VecDeque<(u64, ContractHandlerEvent)>, + queue: BTreeMap, _halve: PhantomData, } -pub(crate) struct CHListenerHalve; -pub(crate) struct CHSenderHalve; +pub(crate) struct ContractHandlerHalve; +pub(crate) struct NetEventListener; mod sealed { - use super::{CHListenerHalve, CHSenderHalve}; + use super::{ContractHandlerHalve, NetEventListener}; pub(crate) trait ChannelHalve {} - impl ChannelHalve for CHListenerHalve {} - impl ChannelHalve for CHSenderHalve {} + impl ChannelHalve for ContractHandlerHalve {} + impl ChannelHalve for NetEventListener {} } pub(crate) fn contract_handler_channel() -> ( - ContractHandlerChannel, - ContractHandlerChannel, + ContractHandlerToEventLoopChannel, + ContractHandlerToEventLoopChannel, ) { let (notification_tx, notification_channel) = mpsc::unbounded_channel(); let (ch_tx, ch_listener) = mpsc::unbounded_channel(); ( - ContractHandlerChannel { + ContractHandlerToEventLoopChannel { rx: notification_channel, tx: ch_tx, - queue: VecDeque::new(), + queue: BTreeMap::new(), _halve: PhantomData, }, - ContractHandlerChannel { + ContractHandlerToEventLoopChannel { rx: ch_listener, tx: notification_tx, - queue: VecDeque::new(), + queue: BTreeMap::new(), _halve: PhantomData, }, ) @@ -195,18 +258,19 @@ static EV_ID: AtomicU64 = AtomicU64::new(0); // kind of event and can be optimized on a case basis const CH_EV_RESPONSE_TIME_OUT: Duration = Duration::from_secs(300); -impl ContractHandlerChannel { +impl ContractHandlerToEventLoopChannel { /// Send an event to the contract handler and receive a response event if successful. pub async fn send_to_handler( &mut self, ev: ContractHandlerEvent, + client_id: Option, ) -> Result { let id = EV_ID.fetch_add(1, SeqCst); self.tx - .send(InternalCHEvent { ev, id }) + .send(InternalCHEvent { ev, id, client_id }) .map_err(|err| ContractError::ChannelDropped(Box::new(err.0.ev)))?; - if let Ok(pos) = self.queue.binary_search_by_key(&id, |(k, _v)| *k) { - Ok(self.queue.remove(pos).unwrap().1) + if let Some(handler) = self.queue.remove(&id) { + Ok(handler) } else { let started_op = Instant::now(); loop { @@ -217,34 +281,46 @@ impl ContractHandlerChannel { if msg.id == id { return Ok(msg.ev); } else { - self.queue.push_front((id, msg.ev)); // should never be duplicates + self.queue.insert(id, msg.ev); // should never be duplicates } } tokio::time::sleep(Duration::from_nanos(100)).await; } } } + + // todo: use + pub async fn recv_from_handler(&mut self) -> (EventId, ContractHandlerEvent) { + todo!() + } } -impl ContractHandlerChannel { - pub async fn send_to_listener( +impl ContractHandlerToEventLoopChannel { + pub async fn send_to_event_loop( &self, id: EventId, ev: ContractHandlerEvent, ) -> Result<(), ContractError> { self.tx - .send(InternalCHEvent { ev, id: id.0 }) + .send(InternalCHEvent { + ev, + id: id.id, + client_id: id.client_id, + }) .map_err(|err| ContractError::ChannelDropped(Box::new(err.0.ev))) } - pub async fn recv_from_listener( + pub async fn recv_from_event_loop( &mut self, ) -> Result<(EventId, ContractHandlerEvent), ContractError> { - if let Some((id, ev)) = self.queue.pop_front() { - return Ok((EventId(id), ev)); - } if let Some(msg) = self.rx.recv().await { - return Ok((EventId(msg.id), msg.ev)); + return Ok(( + EventId { + id: msg.id, + client_id: msg.client_id, + }, + msg.ev, + )); } Err(ContractError::NoEvHandlerResponse) } @@ -259,6 +335,7 @@ pub(crate) struct StoreResponse { struct InternalCHEvent { ev: ContractHandlerEvent, id: u64, + client_id: Option, } #[derive(Debug)] @@ -288,6 +365,12 @@ pub(crate) enum ContractHandlerEvent { CacheResult(Result<(), ContractError>), } +impl ContractHandlerEvent { + pub async fn into_network_op(self, op_manager: &OpManager) -> Transaction { + todo!() + } +} + #[cfg(test)] pub mod test { use std::sync::Arc; @@ -313,12 +396,12 @@ pub mod test { Parameters::from(vec![]), ))); send_halve - .send_to_handler(ContractHandlerEvent::Cache(contract)) + .send_to_handler(ContractHandlerEvent::Cache(contract), None) .await }); let (id, ev) = - tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_listener()) + tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_event_loop()) .await??; if let ContractHandlerEvent::Cache(contract) = ev { @@ -329,7 +412,7 @@ pub mod test { )); tokio::time::timeout( Duration::from_millis(100), - rcv_halve.send_to_listener(id, ContractHandlerEvent::Cache(contract)), + rcv_halve.send_to_event_loop(id, ContractHandlerEvent::Cache(contract)), ) .await??; } else { diff --git a/crates/core/src/contract/in_memory.rs b/crates/core/src/contract/in_memory.rs index 9e6b6e09e..58fd9a5db 100644 --- a/crates/core/src/contract/in_memory.rs +++ b/crates/core/src/contract/in_memory.rs @@ -10,7 +10,8 @@ use futures::{future::BoxFuture, FutureExt}; use tokio::sync::mpsc::UnboundedSender; use super::{ - handler::{CHListenerHalve, ContractHandler, ContractHandlerChannel}, + executor::{ExecutorHalve, ExecutorToEventLoopChannel}, + handler::{ContractHandler, ContractHandlerHalve, ContractHandlerToEventLoopChannel}, storages::in_memory::MemKVStore, Executor, }; @@ -24,7 +25,7 @@ pub(crate) struct MemoryContractHandler where KVStore: StateStorage, { - channel: ContractHandlerChannel, + channel: ContractHandlerToEventLoopChannel, _kv_store: StateStore, _runtime: MockRuntime, } @@ -36,7 +37,10 @@ where { const MAX_MEM_CACHE: i64 = 10_000_000; - pub fn new(channel: ContractHandlerChannel, kv_store: KVStore) -> Self { + pub fn new( + channel: ContractHandlerToEventLoopChannel, + kv_store: KVStore, + ) -> Self { MemoryContractHandler { channel, _kv_store: StateStore::new(kv_store, 10_000_000).unwrap(), @@ -56,7 +60,8 @@ impl ContractHandler for MemoryContractHandler { type ContractExecutor = Executor; fn build( - channel: ContractHandlerChannel, + channel: ContractHandlerToEventLoopChannel, + _executor_request_sender: ExecutorToEventLoopChannel, _config: Self::Builder, ) -> BoxFuture<'static, Result> where @@ -66,7 +71,7 @@ impl ContractHandler for MemoryContractHandler { async move { Ok(MemoryContractHandler::new(channel, store)) }.boxed() } - fn channel(&mut self) -> &mut ContractHandlerChannel { + fn channel(&mut self) -> &mut ContractHandlerToEventLoopChannel { &mut self.channel } diff --git a/crates/core/src/contract/storages/rocks_db.rs b/crates/core/src/contract/storages/rocks_db.rs index 6a5d458c9..9d58c85d3 100644 --- a/crates/core/src/contract/storages/rocks_db.rs +++ b/crates/core/src/contract/storages/rocks_db.rs @@ -102,7 +102,8 @@ mod test { use crate::{ client_events::ClientId, contract::{ - contract_handler_channel, ContractHandler, MockRuntime, NetworkContractHandler, + contract_handler_channel, executor::executor_channel_test, ContractHandler, + MockRuntime, NetworkContractHandler, }, DynError, }; @@ -110,7 +111,8 @@ mod test { // Prepare and get handler for rocksdb async fn get_handler() -> Result, DynError> { let (_, ch_handler) = contract_handler_channel(); - let handler = NetworkContractHandler::build(ch_handler, ()).await?; + let (_, executor_sender) = executor_channel_test(); + let handler = NetworkContractHandler::build(ch_handler, executor_sender, ()).await?; Ok(handler) } diff --git a/crates/core/src/contract/storages/sqlite.rs b/crates/core/src/contract/storages/sqlite.rs index a674a5566..ec8bfbce5 100644 --- a/crates/core/src/contract/storages/sqlite.rs +++ b/crates/core/src/contract/storages/sqlite.rs @@ -142,15 +142,20 @@ mod test { use freenet_stdlib::client_api::ContractRequest; use freenet_stdlib::prelude::*; - use crate::contract::{ - contract_handler_channel, ContractHandler, MockRuntime, NetworkContractHandler, + use crate::{ + client_events::ClientId, + contract::{ + contract_handler_channel, executor::executor_channel_test, ContractHandler, + MockRuntime, NetworkContractHandler, + }, + DynError, }; - use crate::{client_events::ClientId, DynError}; // Prepare and get handler for an in-memory sqlite db async fn get_handler() -> Result, DynError> { let (_, ch_handler) = contract_handler_channel(); - let handler = NetworkContractHandler::build(ch_handler, ()).await?; + let (_, executor_sender) = executor_channel_test(); + let handler = NetworkContractHandler::build(ch_handler, executor_sender, ()).await?; Ok(handler) } diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index a3194ff4d..35d0acd6a 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -13,7 +13,10 @@ use uuid::{ use crate::{ node::{ConnectionError, PeerKey}, - operations::{get::GetMsg, join_ring::JoinRingMsg, put::PutMsg, subscribe::SubscribeMsg}, + operations::{ + get::GetMsg, join_ring::JoinRingMsg, put::PutMsg, subscribe::SubscribeMsg, + update::UpdateMsg, + }, ring::{Location, PeerKeyLocation}, }; pub(crate) use sealed_msg_type::{TransactionType, TransactionTypeId}; @@ -89,6 +92,8 @@ where } mod sealed_msg_type { + use crate::operations::update::UpdateMsg; + use super::*; pub(crate) trait SealedTxType { @@ -111,6 +116,7 @@ mod sealed_msg_type { Put, Get, Subscribe, + Update, Canceled, } @@ -136,7 +142,8 @@ mod sealed_msg_type { JoinRing -> JoinRingMsg, Put -> PutMsg, Get -> GetMsg, - Subscribe -> SubscribeMsg + Subscribe -> SubscribeMsg, + Update -> UpdateMsg }); } @@ -146,11 +153,12 @@ pub(crate) enum Message { Put(PutMsg), Get(GetMsg), Subscribe(SubscribeMsg), + Update(UpdateMsg), /// Failed a transaction, informing of cancellation. Canceled(Transaction), } -pub(crate) trait InnerMessage { +pub(crate) trait InnerMessage: Into { fn id(&self) -> &Transaction; } @@ -194,6 +202,7 @@ impl Message { Put(op) => op.id(), Get(op) => op.id(), Subscribe(op) => op.id(), + Update(_op) => todo!(), Canceled(tx) => tx, } } @@ -205,6 +214,7 @@ impl Message { Put(op) => op.target(), Get(op) => op.target(), Subscribe(op) => op.target(), + Update(_op) => todo!(), Canceled(_) => None, } } @@ -217,6 +227,7 @@ impl Message { Put(op) => op.terminal(), Get(op) => op.terminal(), Subscribe(op) => op.terminal(), + Update(_op) => todo!(), Canceled(_) => true, } } @@ -231,6 +242,7 @@ impl Display for Message { Put(msg) => msg.fmt(f)?, Get(msg) => msg.fmt(f)?, Subscribe(msg) => msg.fmt(f)?, + Update(_op) => todo!(), Canceled(msg) => msg.fmt(f)?, }; write!(f, "}}") diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 070d1fb9d..489f5db70 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -21,15 +21,18 @@ use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId}; #[cfg(test)] use self::in_memory_impl::NodeInMemory; use self::{ - event_listener::{EventListener, EventLog}, + event_log::{EventLog, EventLogListener}, p2p_impl::NodeP2P, }; use crate::{ - client_events::{BoxedClient, ClientEventsProxy, OpenRequest}, + client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest}, config::Config, config::GlobalExecutor, - contract::{ContractError, NetworkContractHandler, OperationMode}, - message::{InnerMessage, Message, NodeEvent, Transaction, TransactionType, TxType}, + contract::{ + ClientResponses, ClientResponsesSender, ContractError, ExecutorToEventLoopChannel, + NetworkContractHandler, NetworkEventListenerHalve, OperationMode, + }, + message::{InnerMessage, Message, Transaction, TransactionType, TxType}, operations::{ get, join_ring::{self, JoinRingMsg, JoinRingOp}, @@ -44,7 +47,7 @@ pub(crate) use conn_manager::{p2p_protoc::P2pBridge, ConnectionBridge, Connectio pub(crate) use op_state::OpManager; mod conn_manager; -mod event_listener; +mod event_log; #[cfg(test)] mod in_memory_impl; mod op_state; @@ -299,110 +302,132 @@ where } /// Process client events. -async fn client_event_handling(op_storage: Arc, mut client_events: ClientEv) -where +async fn client_event_handling( + op_storage: Arc, + mut client_events: ClientEv, + mut client_responses: ClientResponses, +) where ClientEv: ClientEventsProxy + Send + Sync + 'static, { loop { - // fixme: send back responses to client - let OpenRequest { - id: _id, request, .. - } = client_events.recv().await.unwrap(); // fixme: deal with this unwrap - if let ClientRequest::Disconnect { .. } = *request { - if let Err(err) = op_storage.notify_internal_op(NodeEvent::ShutdownNode).await { - tracing::error!("{}", err); + tokio::select! { + client_request = client_events.recv() => { + let req = match client_request { + Ok(req) => req, + Err(err) => { + tracing::debug!(error = %err, "client error"); + continue; + } + }; + if let ClientRequest::Disconnect { .. } = &*req.request { + // todo: notify executor of disconnect + continue; + } + process_open_request(req, op_storage.clone()).await; + } + res = client_responses.recv() => { + if let Some((cli_id, res)) = res { + if let Err(err) = client_events.send(cli_id, res).await { + tracing::error!("channel closed: {err}"); + break; + } + } } - break; } + } +} - let op_storage_cp = op_storage.clone(); - GlobalExecutor::spawn(async move { - match *request { - ClientRequest::ContractOp(ops) => match ops { - ContractRequest::Put { - state, +#[inline] +async fn process_open_request(request: OpenRequest<'static>, op_storage: Arc) { + // this will indirectly start actions on the local contract executor + let op_storage_cp = op_storage.clone(); + let fut = async move { + let client_id = request.client_id; + match *request.request { + ClientRequest::ContractOp(ops) => match ops { + ContractRequest::Put { + state, + contract, + related_contracts, + } => { + // Initialize a put op. + tracing::debug!( + "Received put from user event @ {}", + &op_storage_cp.ring.peer_key + ); + let op = put::start_op( contract, - related_contracts, - } => { - // Initialize a put op. - tracing::debug!( - "Received put from user event @ {}", - &op_storage_cp.ring.peer_key - ); - let op = put::start_op( - contract, - state, - op_storage_cp.ring.max_hops_to_live, - &op_storage_cp.ring.peer_key, - ); - if let Err(err) = put::request_put(&op_storage_cp, op).await { - tracing::error!("{}", err); - } - todo!("use `related_contracts`: {related_contracts:?}") - } - ContractRequest::Update { - key: _key, - data: _delta, - } => { - todo!() + state, + op_storage_cp.ring.max_hops_to_live, + &op_storage_cp.ring.peer_key, + ); + if let Err(err) = put::request_put(&op_storage_cp, op, Some(client_id)).await { + tracing::error!("{}", err); } - ContractRequest::Get { - key, - fetch_contract: contract, - } => { - // Initialize a get op. - tracing::debug!( - "Received get from user event @ {}", - &op_storage_cp.ring.peer_key - ); - let op = get::start_op(key, contract, &op_storage_cp.ring.peer_key); - if let Err(err) = get::request_get(&op_storage_cp, op).await { - tracing::error!("{}", err); - } + todo!("use `related_contracts`: {related_contracts:?}") + } + ContractRequest::Update { + key: _key, + data: _delta, + } => { + todo!() + } + ContractRequest::Get { + key, + fetch_contract: contract, + } => { + // Initialize a get op. + tracing::debug!( + "Received get from user event @ {}", + &op_storage_cp.ring.peer_key + ); + let op = get::start_op(key, contract, &op_storage_cp.ring.peer_key); + if let Err(err) = get::request_get(&op_storage_cp, op, Some(client_id)).await { + tracing::error!("{}", err); } - ContractRequest::Subscribe { key, .. } => { - // Initialize a subscribe op. - loop { - // FIXME: this will block the event loop until the subscribe op succeeds - // instead the op should be deferred for later execution - let op = subscribe::start_op(key.clone(), &op_storage_cp.ring.peer_key); - match subscribe::request_subscribe(&op_storage_cp, op).await { - Err(OpError::ContractError(ContractError::ContractNotFound( - key, - ))) => { - tracing::warn!("Trying to subscribe to a contract not present: {}, requesting it first", key); - let get_op = get::start_op( - key.clone(), - true, - &op_storage_cp.ring.peer_key, - ); - if let Err(err) = get::request_get(&op_storage_cp, get_op).await - { - tracing::error!("Failed getting the contract `{}` while previously trying to subscribe; bailing: {}", key, err); - tokio::time::sleep(Duration::from_secs(5)).await; - } - } - Err(err) => { - tracing::error!("{}", err); - break; + } + ContractRequest::Subscribe { key, .. } => { + // Initialize a subscribe op. + loop { + // FIXME: this will block the event loop until the subscribe op succeeds + // instead the op should be deferred for later execution + let op = subscribe::start_op(key.clone(), &op_storage_cp.ring.peer_key); + match subscribe::request_subscribe(&op_storage_cp, op, Some(client_id)) + .await + { + Err(OpError::ContractError(ContractError::ContractNotFound(key))) => { + tracing::warn!("Trying to subscribe to a contract not present: {}, requesting it first", key); + let get_op = + get::start_op(key.clone(), true, &op_storage_cp.ring.peer_key); + if let Err(err) = + get::request_get(&op_storage_cp, get_op, Some(client_id)).await + { + tracing::error!("Failed getting the contract `{}` while previously trying to subscribe; bailing: {}", key, err); + tokio::time::sleep(Duration::from_secs(5)).await; } - Ok(()) => break, } + Err(err) => { + tracing::error!("{}", err); + break; + } + Ok(()) => break, } - todo!() } - _ => { - tracing::error!("op not supported"); - } - }, - ClientRequest::DelegateOp(_op) => todo!("FIXME: delegate op"), - ClientRequest::Disconnect { .. } => unreachable!(), + todo!() + } _ => { tracing::error!("op not supported"); } + }, + ClientRequest::DelegateOp(_op) => todo!("FIXME: delegate op"), + ClientRequest::Disconnect { .. } => unreachable!(), + _ => { + tracing::error!("op not supported"); } - }); - } + } + }; + + GlobalExecutor::spawn(fut); } macro_rules! log_handling_msg { @@ -416,9 +441,24 @@ macro_rules! log_handling_msg { } #[inline(always)] -fn report_result(op_result: Result<(), OpError>) { - if let Err(err) = op_result { - tracing::debug!("Finished tx w/ error: {}", err) +async fn report_result( + op_result: Result, OpError>, + executor_callback: Option>, + client_req_handler_callback: Option<(ClientId, ClientResponsesSender)>, +) { + match op_result { + Ok(Some(res)) => { + if let Some((client_id, cb)) = client_req_handler_callback { + let _ = cb.send((client_id, res.to_host_result(client_id))); + } + if let Some(mut cb) = executor_callback { + cb.response(res).await; + } + } + Ok(None) => {} + Err(err) => { + tracing::debug!("Finished tx w/ error: {}", err) + } } } @@ -426,10 +466,14 @@ async fn process_message( msg: Result, op_storage: Arc, mut conn_manager: CB, - event_listener: Option>, + event_listener: Option>, + executor_callback: Option>, + client_req_handler_callback: Option, + client_id: Option, ) where CB: ConnectionBridge, { + let cli_req = client_id.zip(client_req_handler_callback); match msg { Ok(msg) => { if let Some(mut listener) = event_listener { @@ -442,23 +486,32 @@ async fn process_message( &op_storage, &mut conn_manager, op, + client_id, ) .await; - report_result(op_result); + report_result(op_result, executor_callback, cli_req).await; } Message::Put(op) => { log_handling_msg!("put", *op.id(), op_storage); - let op_result = - handle_op_request::(&op_storage, &mut conn_manager, op) - .await; - report_result(op_result); + let op_result = handle_op_request::( + &op_storage, + &mut conn_manager, + op, + client_id, + ) + .await; + report_result(op_result, executor_callback, cli_req).await; } Message::Get(op) => { log_handling_msg!("get", op.id(), op_storage); - let op_result = - handle_op_request::(&op_storage, &mut conn_manager, op) - .await; - report_result(op_result); + let op_result = handle_op_request::( + &op_storage, + &mut conn_manager, + op, + client_id, + ) + .await; + report_result(op_result, executor_callback, cli_req).await; } Message::Subscribe(op) => { log_handling_msg!("subscribe", op.id(), op_storage); @@ -466,15 +519,16 @@ async fn process_message( &op_storage, &mut conn_manager, op, + client_id, ) .await; - report_result(op_result); + report_result(op_result, executor_callback, cli_req).await; } _ => {} } } Err(err) => { - report_result(Err(err.into())); + report_result(Err(err.into()), executor_callback, cli_req).await; } } } diff --git a/crates/core/src/node/conn_manager.rs b/crates/core/src/node/conn_manager.rs index c589eadae..e1c6cf73d 100644 --- a/crates/core/src/node/conn_manager.rs +++ b/crates/core/src/node/conn_manager.rs @@ -1,10 +1,17 @@ //! Types and definitions to handle all socket communication for the peer nodes. +use std::ops::{Deref, DerefMut}; + +use either::Either; use libp2p::swarm::StreamUpgradeError; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use super::PeerKey; -use crate::message::Message; +use crate::{ + client_events::ClientId, + message::{Message, NodeEvent}, +}; #[cfg(test)] pub(crate) mod in_memory; @@ -76,3 +83,41 @@ impl Clone for ConnectionError { } } } + +pub(super) struct EventLoopNotifications(Receiver), NodeEvent>>); + +impl EventLoopNotifications { + pub fn channel() -> (EventLoopNotifications, EventLoopNotificationsSender) { + let (notification_tx, notification_rx) = mpsc::channel(100); + ( + EventLoopNotifications(notification_rx), + EventLoopNotificationsSender(notification_tx), + ) + } +} + +impl Deref for EventLoopNotifications { + type Target = Receiver), NodeEvent>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for EventLoopNotifications { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub(super) struct EventLoopNotificationsSender( + Sender), NodeEvent>>, +); + +impl Deref for EventLoopNotificationsSender { + type Target = Sender), NodeEvent>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/crates/core/src/node/conn_manager/p2p_protoc.rs b/crates/core/src/node/conn_manager/p2p_protoc.rs index f17b4c65b..1f2573591 100644 --- a/crates/core/src/node/conn_manager/p2p_protoc.rs +++ b/crates/core/src/node/conn_manager/p2p_protoc.rs @@ -32,13 +32,15 @@ use libp2p::{ }, InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, Swarm, }; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use unsigned_varint::codec::UviBytes; -use super::{ConnectionBridge, ConnectionError}; +use super::{ConnectionBridge, ConnectionError, EventLoopNotifications}; use crate::{ + client_events::ClientId, config::{self, GlobalExecutor}, - message::{Message, NodeEvent, TransactionType}, + contract::{ClientResponsesSender, ExecutorToEventLoopChannel, NetworkEventListenerHalve}, + message::{Message, NodeEvent, Transaction, TransactionType}, node::{ handle_cancelled_op, join_ring_request, process_message, InitPeerNode, NodeBuilder, OpManager, PeerKey, @@ -199,7 +201,7 @@ impl P2pConnManager { swarm.add_external_address(remote_addr); } - let (tx_bridge_cmd, rx_bridge_cmd) = channel(100); + let (tx_bridge_cmd, rx_bridge_cmd) = mpsc::channel(100); let bridge = P2pBridge::new(tx_bridge_cmd); let gateways = config.get_gateways()?; @@ -222,15 +224,20 @@ impl P2pConnManager { pub async fn run_event_listener( mut self, op_manager: Arc, - mut notification_channel: Receiver>, + mut notification_channel: EventLoopNotifications, + mut executor_channel: ExecutorToEventLoopChannel, + cli_response_sender: ClientResponsesSender, ) -> Result<(), anyhow::Error> { use ConnMngrActions::*; + let mut pending_from_executor = HashSet::new(); + let mut tx_to_client: HashMap = HashMap::new(); + loop { - let net_msg = self.swarm.select_next_some().map(|event| match event { + let network_msg = self.swarm.select_next_some().map(|event| match event { SwarmEvent::Behaviour(NetEvent::Freenet(msg)) => { tracing::debug!("Message inbound: {:?}", msg); - Ok(Left(*msg)) + Ok(Left((*msg, None))) } SwarmEvent::ConnectionClosed { peer_id, .. } => { Ok(Right(ConnMngrActions::ConnectionClosed { @@ -299,9 +306,9 @@ impl P2pConnManager { } }); - let notification_msg = notification_channel.recv().map(|m| match m { + let notification_msg = notification_channel.0.recv().map(|m| match m { None => Ok(Right(ClosedChannel)), - Some(Left(msg)) => Ok(Left(msg)), + Some(Left((msg, cli_id))) => Ok(Left((msg, cli_id))), Some(Right(action)) => Ok(Right(NodeAction(action))), }); @@ -315,13 +322,24 @@ impl P2pConnManager { }); let msg: Result<_, ConnectionError> = tokio::select! { - msg = net_msg => { msg } + msg = network_msg => { msg } msg = notification_msg => { msg } msg = bridge_msg => { msg } + (event_id, contract_handler_event) = op_manager.recv_from_handler() => { + if let Some(client_id) = event_id.client_id() { + let transaction = contract_handler_event.into_network_op(&op_manager).await; + tx_to_client.insert(transaction, client_id); + } + continue; + } + id = executor_channel.transaction_from_executor() => { + pending_from_executor.insert(id); + continue; + } }; match msg { - Ok(Left(msg)) => { + Ok(Left((msg, client_id))) => { let cb = self.bridge.clone(); match msg { Message::Canceled(tx) => { @@ -356,11 +374,24 @@ impl P2pConnManager { continue; } msg => { + let executor_callback = pending_from_executor + .remove(msg.id()) + .then(|| executor_channel.clone()); + let pending_client_req = tx_to_client.get(msg.id()).copied(); + let client_req_handler_callback = if pending_client_req.is_some() { + debug_assert!(client_id.is_none()); + Some(cli_response_sender.clone()) + } else { + None + }; GlobalExecutor::spawn(process_message( Ok(msg), op_manager.clone(), cb, None, + executor_callback, + client_req_handler_callback, + client_id, )); } } @@ -415,7 +446,15 @@ impl P2pConnManager { } Err(err) => { let cb = self.bridge.clone(); - GlobalExecutor::spawn(process_message(Err(err), op_manager.clone(), cb, None)); + GlobalExecutor::spawn(process_message( + Err(err), + op_manager.clone(), + cb, + None, + None, + None, + None, + )); } Ok(Right(NoAction)) | Ok(Right(NodeAction(NodeEvent::ConfirmedInbound))) => {} } diff --git a/crates/core/src/node/event_listener.rs b/crates/core/src/node/event_log.rs similarity index 96% rename from crates/core/src/node/event_listener.rs rename to crates/core/src/node/event_log.rs index 944f1770f..7828c1139 100644 --- a/crates/core/src/node/event_listener.rs +++ b/crates/core/src/node/event_log.rs @@ -21,9 +21,9 @@ struct ListenerLogId(usize); /// /// This type then can emit it's own information to adjacent systems /// or is a no-op. -pub(crate) trait EventListener { +pub(crate) trait EventLogListener { fn event_received(&mut self, ev: EventLog); - fn trait_clone(&self) -> Box; + fn trait_clone(&self) -> Box; } #[allow(dead_code)] // fixme: remove this @@ -112,13 +112,13 @@ struct MessageLog { #[derive(Clone)] pub(super) struct EventRegister {} -impl EventListener for EventRegister { +impl EventLogListener for EventRegister { fn event_received(&mut self, _log: EventLog) { // let (_msg_log, _log_id) = create_log(log); // TODO: save log } - fn trait_clone(&self) -> Box { + fn trait_clone(&self) -> Box { Box::new(self.clone()) } } @@ -321,7 +321,7 @@ mod test_utils { } } - impl super::EventListener for TestEventListener { + impl super::EventLogListener for TestEventListener { fn event_received(&mut self, log: EventLog) { let tx = log.tx; let mut logs = self.logs.write(); @@ -331,7 +331,7 @@ mod test_utils { self.tx_log.entry(*tx).or_default().push(log_id); } - fn trait_clone(&self) -> Box { + fn trait_clone(&self) -> Box { Box::new(self.clone()) } } diff --git a/crates/core/src/node/in_memory_impl.rs b/crates/core/src/node/in_memory_impl.rs index aef809307..84b07142a 100644 --- a/crates/core/src/node/in_memory_impl.rs +++ b/crates/core/src/node/in_memory_impl.rs @@ -2,17 +2,22 @@ use std::{collections::HashMap, sync::Arc}; use either::Either; use freenet_stdlib::prelude::*; -use tokio::sync::mpsc::{self, Receiver}; use super::{ - client_event_handling, conn_manager::in_memory::MemoryConnManager, - event_listener::EventListener, handle_cancelled_op, join_ring_request, op_state::OpManager, + client_event_handling, + conn_manager::{in_memory::MemoryConnManager, EventLoopNotifications}, + event_log::EventLogListener, + handle_cancelled_op, join_ring_request, + op_state::OpManager, process_message, PeerKey, }; use crate::{ client_events::ClientEventsProxy, config::GlobalExecutor, - contract::{self, ContractError, ContractHandler, ContractHandlerEvent}, + contract::{ + self, executor_channel, ClientResponsesSender, ContractError, ContractHandler, + ContractHandlerEvent, ExecutorToEventLoopChannel, NetworkEventListenerHalve, + }, message::{Message, NodeEvent, TransactionType}, node::NodeBuilder, operations::OpError, @@ -24,17 +29,18 @@ pub(super) struct NodeInMemory { pub peer_key: PeerKey, pub op_storage: Arc, gateways: Vec, - notification_channel: Receiver>, + notification_channel: EventLoopNotifications, conn_manager: MemoryConnManager, - event_listener: Option>, + event_listener: Option>, is_gateway: bool, + _executor_listener: ExecutorToEventLoopChannel, } impl NodeInMemory { /// Buils an in-memory node. Does nothing upon construction, pub async fn build( builder: NodeBuilder<1>, - event_listener: Option>, + event_listener: Option>, ch_builder: CH::Builder, ) -> Result where @@ -46,10 +52,11 @@ impl NodeInMemory { let is_gateway = builder.local_ip.zip(builder.local_port).is_some(); let ring = Ring::new(&builder, &gateways)?; - let (notification_tx, notification_channel) = mpsc::channel(100); + let (notification_channel, notification_tx) = EventLoopNotifications::channel(); let (ops_ch_channel, ch_channel) = contract::contract_handler_channel(); let op_storage = Arc::new(OpManager::new(ring, notification_tx, ops_ch_channel)); - let contract_handler = CH::build(ch_channel, ch_builder) + let (_executor_listener, executor_sender) = executor_channel(op_storage.clone()); + let contract_handler = CH::build(ch_channel, executor_sender, ch_builder) .await .map_err(|e| anyhow::anyhow!(e))?; @@ -63,6 +70,7 @@ impl NodeInMemory { notification_channel, event_listener, is_gateway, + _executor_listener, }) } @@ -84,8 +92,13 @@ impl NodeInMemory { anyhow::bail!("requires at least one gateway"); } } - GlobalExecutor::spawn(client_event_handling(self.op_storage.clone(), user_events)); - self.run_event_listener().await + let (client_responses, cli_response_sender) = contract::ClientResponses::channel(); + GlobalExecutor::spawn(client_event_handling( + self.op_storage.clone(), + user_events, + client_responses, + )); + self.run_event_listener(cli_response_sender).await } pub async fn append_contracts<'a>( @@ -96,13 +109,16 @@ impl NodeInMemory { for (contract, state) in contracts { let key = contract.key(); self.op_storage - .notify_contract_handler(ContractHandlerEvent::Cache(contract.clone())) + .notify_contract_handler(ContractHandlerEvent::Cache(contract.clone()), None) .await?; self.op_storage - .notify_contract_handler(ContractHandlerEvent::PutQuery { - key: key.clone(), - state, - }) + .notify_contract_handler( + ContractHandlerEvent::PutQuery { + key: key.clone(), + state, + }, + None, + ) .await?; tracing::debug!( "Appended contract {} to peer {}", @@ -129,12 +145,15 @@ impl NodeInMemory { } /// Starts listening to incoming events. Will attempt to join the ring if any gateways have been provided. - async fn run_event_listener(&mut self) -> Result<(), anyhow::Error> { + async fn run_event_listener( + &mut self, + _client_responses: ClientResponsesSender, // fixme: use this + ) -> Result<(), anyhow::Error> { loop { let msg = tokio::select! { msg = self.conn_manager.recv() => { msg.map(Either::Left) } msg = self.notification_channel.recv() => if let Some(msg) = msg { - Ok(msg) + Ok(msg.map_left(|(msg, _cli_id)| msg)) } else { anyhow::bail!("notification channel shutdown, fatal error"); } @@ -198,6 +217,9 @@ impl NodeInMemory { op_storage, conn_manager, event_listener, + None, + None, + None, )); } } diff --git a/crates/core/src/node/op_state.rs b/crates/core/src/node/op_state.rs index 273ec2589..c3a5658f1 100644 --- a/crates/core/src/node/op_state.rs +++ b/crates/core/src/node/op_state.rs @@ -3,21 +3,21 @@ use std::{collections::BTreeMap, time::Instant}; use dashmap::DashMap; use either::Either; use parking_lot::RwLock; -use tokio::sync::{ - mpsc::{error::SendError, Sender}, - Mutex, -}; +use tokio::sync::{mpsc::error::SendError, Mutex}; use crate::{ - contract::{CHSenderHalve, ContractError, ContractHandlerChannel, ContractHandlerEvent}, - message::{Message, NodeEvent, Transaction, TransactionType}, + contract::{ + ContractError, ContractHandlerEvent, ContractHandlerToEventLoopChannel, NetEventListener, + }, + dev_tool::ClientId, + message::{Message, Transaction, TransactionType}, operations::{ get::GetOp, join_ring::JoinRingOp, put::PutOp, subscribe::SubscribeOp, OpEnum, OpError, }, ring::Ring, }; -use super::PeerKey; +use super::{conn_manager::EventLoopNotificationsSender, PeerKey}; /// Thread safe and friendly data structure to maintain state of the different operations /// and enable their execution. @@ -26,8 +26,9 @@ pub(crate) struct OpManager { put: DashMap, get: DashMap, subscribe: DashMap, - notification_channel: Sender>, - contract_handler: Mutex>, + to_event_listener: EventLoopNotificationsSender, + // todo: remove the need for a mutex here + ch_outbound: Mutex>, // FIXME: think of an optimal strategy to check for timeouts and clean up garbage _ops_ttl: RwLock>>, pub ring: Ring, @@ -42,10 +43,10 @@ macro_rules! check_id_op { } impl OpManager { - pub fn new( + pub(super) fn new( ring: Ring, - notification_channel: Sender>, - contract_handler: ContractHandlerChannel, + notification_channel: EventLoopNotificationsSender, + contract_handler: ContractHandlerToEventLoopChannel, ) -> Self { Self { join_ring: DashMap::default(), @@ -53,8 +54,8 @@ impl OpManager { get: DashMap::default(), subscribe: DashMap::default(), ring, - notification_channel, - contract_handler: Mutex::new(contract_handler), + to_event_listener: notification_channel, + ch_outbound: Mutex::new(contract_handler), _ops_ttl: RwLock::new(BTreeMap::new()), } } @@ -69,35 +70,41 @@ impl OpManager { &self, msg: Message, op: OpEnum, - ) -> Result<(), SendError> { + client_id: Option, + ) -> Result<(), SendError<(Message, Option)>> { // push back the state to the stack self.push(*msg.id(), op).expect("infallible"); - self.notification_channel - .send(Either::Left(msg)) + self.to_event_listener + .send(Either::Left((msg, client_id))) .await .map_err(|err| SendError(err.0.unwrap_left())) } - /// Send an internal message to this node event loop. - pub async fn notify_internal_op(&self, msg: NodeEvent) -> Result<(), SendError> { - self.notification_channel - .send(Either::Right(msg)) - .await - .map_err(|err| SendError(err.0.unwrap_right())) - } + // /// Send an internal message to this node event loop. + // pub async fn notify_internal_op(&self, msg: NodeEvent) -> Result<(), SendError> { + // self.to_event_listener + // .send(Either::Right(msg)) + // .await + // .map_err(|err| SendError(err.0.unwrap_right())) + // } /// Send an event to the contract handler and await a response event from it if successful. pub async fn notify_contract_handler( &self, msg: ContractHandlerEvent, + client_id: Option, ) -> Result { - self.contract_handler + self.ch_outbound .lock() .await - .send_to_handler(msg) + .send_to_handler(msg, client_id) .await } + pub async fn recv_from_handler(&self) -> (crate::contract::EventId, ContractHandlerEvent) { + todo!() + } + pub fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> { match op { OpEnum::JoinRing(tx) => { @@ -134,6 +141,7 @@ impl OpManager { .remove(id) .map(|(_k, v)| v) .map(OpEnum::Subscribe), + TransactionType::Update => todo!(), TransactionType::Canceled => unreachable!(), } } diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index 0f726c738..9c2b93a35 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use either::Either; use libp2p::{ core::{ muxing, @@ -11,16 +10,19 @@ use libp2p::{ identity::Keypair, noise, tcp, yamux, PeerId, Transport, }; -use tokio::sync::mpsc::{self, Receiver}; use super::{ - client_event_handling, conn_manager::p2p_protoc::P2pConnManager, join_ring_request, PeerKey, + client_event_handling, + conn_manager::{p2p_protoc::P2pConnManager, EventLoopNotifications}, + join_ring_request, PeerKey, }; use crate::{ client_events::combinator::ClientEventsCombinator, config::{self, GlobalExecutor}, - contract::{self, ContractHandler}, - message::{Message, NodeEvent}, + contract::{ + self, ClientResponsesSender, ContractHandler, ExecutorToEventLoopChannel, + NetworkEventListenerHalve, + }, node::NodeBuilder, ring::Ring, util::IterExt, @@ -31,10 +33,12 @@ use super::OpManager; pub(super) struct NodeP2P { pub(crate) peer_key: PeerKey, pub(crate) op_storage: Arc, - notification_channel: Receiver>, + notification_channel: EventLoopNotifications, pub(super) conn_manager: P2pConnManager, // event_listener: Option>, is_gateway: bool, + executor_listener: ExecutorToEventLoopChannel, + cli_response_sender: ClientResponsesSender, } impl NodeP2P { @@ -60,8 +64,14 @@ impl NodeP2P { } // start the p2p event loop + // todo: pass `cli_response_sender` self.conn_manager - .run_event_listener(self.op_storage.clone(), self.notification_channel) + .run_event_listener( + self.op_storage.clone(), + self.notification_channel, + self.executor_listener, + self.cli_response_sender, + ) .await } @@ -81,16 +91,22 @@ impl NodeP2P { }; let ring = Ring::new(&builder, &gateways)?; - let (notification_tx, notification_channel) = mpsc::channel(100); - let (ops_ch_channel, ch_channel) = contract::contract_handler_channel(); - let op_storage = Arc::new(OpManager::new(ring, notification_tx, ops_ch_channel)); - let contract_handler = CH::build(ch_channel, ch_builder) + let (notification_channel, notification_tx) = EventLoopNotifications::channel(); + let (ch_outbound, ch_inbound) = contract::contract_handler_channel(); + let (client_responses, cli_response_sender) = contract::ClientResponses::channel(); + let op_storage = Arc::new(OpManager::new(ring, notification_tx, ch_outbound)); + let (executor_listener, executor_sender) = contract::executor_channel(op_storage.clone()); + let contract_handler = CH::build(ch_inbound, executor_sender, ch_builder) .await .map_err(|e| anyhow::anyhow!(e))?; GlobalExecutor::spawn(contract::contract_handling(contract_handler)); let clients = ClientEventsCombinator::new(builder.clients); - GlobalExecutor::spawn(client_event_handling(op_storage.clone(), clients)); + GlobalExecutor::spawn(client_event_handling( + op_storage.clone(), + clients, + client_responses, + )); Ok(NodeP2P { peer_key, @@ -98,6 +114,8 @@ impl NodeP2P { notification_channel, op_storage, is_gateway: builder.location.is_some(), + executor_listener, + cli_response_sender, }) } diff --git a/crates/core/src/node/tests.rs b/crates/core/src/node/tests.rs index d36676cba..e4eba6473 100644 --- a/crates/core/src/node/tests.rs +++ b/crates/core/src/node/tests.rs @@ -17,7 +17,7 @@ use crate::{ client_events::test::MemoryEventsGen, config::GlobalExecutor, contract::MemoryContractHandler, - node::{event_listener::TestEventListener, InitPeerNode, NodeBuilder, NodeInMemory}, + node::{event_log::TestEventListener, InitPeerNode, NodeBuilder, NodeInMemory}, ring::{Distance, Location, PeerKeyLocation}, }; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 32957a6d7..9c9576331 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,14 +1,12 @@ use tokio::sync::mpsc::error::SendError; use self::op_trait::Operation; -use crate::operations::get::GetOp; -use crate::operations::put::PutOp; -use crate::operations::subscribe::SubscribeOp; use crate::{ + client_events::{ClientId, HostResult}, contract::ContractError, message::{InnerMessage, Message, Transaction, TransactionType}, node::{ConnectionBridge, ConnectionError, OpManager, PeerKey}, - operations::join_ring::JoinRingOp, + operations::{get::GetOp, join_ring::JoinRingOp, put::PutOp, subscribe::SubscribeOp}, ring::RingError, }; @@ -35,7 +33,8 @@ pub(crate) async fn handle_op_request( op_storage: &OpManager, conn_manager: &mut CB, msg: Op::Message, -) -> Result<(), OpError> + client_id: Option, +) -> Result, OpError> where Op: Operation, CB: ConnectionBridge, @@ -45,7 +44,8 @@ where let result = { let OpInitialization { sender: s, op } = Op::load_or_init(op_storage, &msg)?; sender = s; - op.process_message(conn_manager, op_storage, msg).await + op.process_message(conn_manager, op_storage, msg, client_id) + .await }; handle_op_result( op_storage, @@ -61,7 +61,7 @@ async fn handle_op_result( conn_manager: &mut CB, result: Result, sender: Option, -) -> Result<(), OpError> +) -> Result, OpError> where CB: ConnectionBridge, { @@ -69,7 +69,7 @@ where match result { Err((OpError::StatePushed, _)) => { // do nothing and continue, the operation will just continue later on - return Ok(()); + return Ok(None); } Err((err, tx_id)) => { if let Some(sender) = sender { @@ -88,6 +88,14 @@ where } op_storage.push(id, updated_state)?; } + + Ok(OperationResult { + return_msg: None, + state: Some(final_state), + }) if final_state.is_final() => { + // operation finished_completely with result + return Ok(Some(final_state)); + } Ok(OperationResult { return_msg: None, state: Some(updated_state), @@ -112,7 +120,7 @@ where // operation finished_completely } } - Ok(()) + Ok(None) } pub(crate) enum OpEnum { @@ -132,6 +140,20 @@ impl OpEnum { Subscribe(op) => *>::id(op), } } + + fn is_final(&self) -> bool { + match self { + OpEnum::JoinRing(op) if op.finished() => true, + OpEnum::Put(op) if op.finished() => true, + OpEnum::Get(op) if op.finished() => true, + OpEnum::Subscribe(op) if op.finished() => true, + _ => false, + } + } + + pub fn to_host_result(&self, _client_id: ClientId) -> HostResult { + todo!() + } } #[derive(Debug, thiserror::Error)] @@ -148,7 +170,7 @@ pub(crate) enum OpError { #[error("cannot perform a state transition from the current state with the provided input (tx: {0})")] InvalidStateTransition(Transaction), #[error("failed notifying back to the node message loop, channel closed")] - NotificationError(#[from] Box>), + NotificationError(#[from] Box)>>), #[error("unspected transaction type, trying to get a {0:?} from a {1:?}")] IncorrectTxType(TransactionType, TransactionType), #[error("op not present: {0}")] @@ -163,8 +185,8 @@ pub(crate) enum OpError { StatePushed, } -impl From> for OpError { - fn from(err: SendError) -> OpError { +impl From)>> for OpError { + fn from(err: SendError<(Message, Option)>) -> OpError { OpError::NotificationError(Box::new(err)) } } diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 9c4288eea..8f5c3fc0c 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -5,6 +5,7 @@ use std::time::Duration; use freenet_stdlib::prelude::*; use crate::{ + client_events::ClientId, config::PEER_TIMEOUT, contract::{ContractError, ContractHandlerEvent, StoreResponse}, message::{InnerMessage, Message, Transaction, TxType}, @@ -28,20 +29,29 @@ const MAX_GET_RETRY_HOPS: usize = 1; pub(crate) struct GetOp { id: Transaction, state: Option, + result: Option, _ttl: Duration, } +impl GetOp { + pub(super) fn finished(&self) -> bool { + self.result.is_some() + } +} #[allow(dead_code)] pub(crate) struct GetResult { pub state: WrappedState, - pub contract: ContractContainer, + pub contract: Option, } impl TryFrom for GetResult { type Error = OpError; - fn try_from(_value: GetOp) -> Result { - todo!() + fn try_from(value: GetOp) -> Result { + match value.result { + Some(r) => Ok(r), + _ => todo!(), + } } } @@ -73,6 +83,7 @@ where op: Self { state: Some(GetState::ReceivedRequest), id: tx, + result: None, _ttl: PEER_TIMEOUT, }, sender, @@ -92,10 +103,12 @@ where conn_manager: &'a mut CB, op_storage: &'a OpManager, input: Self::Message, + client_id: Option, ) -> Pin> + Send + 'a>> { Box::pin(async move { let return_msg; let new_state; + let mut result = None; match input { GetMsg::RequestGet { @@ -156,6 +169,7 @@ where sender: op_storage.ring.own_location(), target: sender, // return to requester }), + None, self._ttl, ); } @@ -185,10 +199,13 @@ where response: value, key: returned_key, } = op_storage - .notify_contract_handler(ContractHandlerEvent::GetQuery { - key: key.clone(), - fetch_contract, - }) + .notify_contract_handler( + ContractHandlerEvent::GetQuery { + key: key.clone(), + fetch_contract, + }, + client_id, + ) .await? { match check_contract_found( @@ -308,13 +325,13 @@ where }; } GetMsg::ReturnGet { + id, key, value: StoreResponse { state: Some(value), contract, }, - id, sender, target, } => { @@ -331,12 +348,13 @@ where if let Some(contract) = &contract { // store contract first op_storage - .notify_contract_handler(ContractHandlerEvent::Cache( - contract.clone(), - )) + .notify_contract_handler( + ContractHandlerEvent::Cache(contract.clone()), + client_id, + ) .await?; let key = contract.key(); - tracing::debug!("Contract `{}` successfully put", key); + tracing::debug!("Contract `{}` successfully cached", key); } else { // no contract, consider this like an error ignoring the incoming update value tracing::warn!( @@ -347,6 +365,7 @@ where let op = GetOp { id, state: self.state, + result: None, _ttl: self._ttl, }; @@ -363,6 +382,7 @@ where target, }), OpEnum::Get(op), + None, ) .await?; return Err(OpError::StatePushed); @@ -370,10 +390,13 @@ where } op_storage - .notify_contract_handler(ContractHandlerEvent::PutQuery { - key: key.clone(), - state: value.clone(), - }) + .notify_contract_handler( + ContractHandlerEvent::PutQuery { + key: key.clone(), + state: value.clone(), + }, + client_id, + ) .await?; match self.state { @@ -384,10 +407,18 @@ where ); new_state = None; return_msg = None; + result = Some(GetResult { + state: value.clone(), + contract, + }); } else { tracing::debug!("Get response received for contract {}", key); new_state = None; return_msg = None; + result = Some(GetResult { + state: value.clone(), + contract, + }); } } Some(GetState::ReceivedRequest) => { @@ -410,7 +441,7 @@ where _ => return Err(OpError::UnexpectedOpState), } - build_op_result(self.id, new_state, return_msg, self._ttl) + build_op_result(self.id, new_state, return_msg, result, self._ttl) }) } } @@ -419,11 +450,13 @@ fn build_op_result( id: Transaction, state: Option, msg: Option, + result: Option, ttl: Duration, ) -> Result { let output_op = Some(GetOp { id, state, + result, _ttl: ttl, }); Ok(OperationResult { @@ -493,6 +526,7 @@ pub(crate) fn start_op(key: ContractKey, fetch_contract: bool, id: &PeerKey) -> GetOp { id, state, + result: None, _ttl: PEER_TIMEOUT, } } @@ -516,7 +550,11 @@ enum GetState { } /// Request to get the current value from a contract. -pub(crate) async fn request_get(op_storage: &OpManager, get_op: GetOp) -> Result<(), OpError> { +pub(crate) async fn request_get( + op_storage: &OpManager, + get_op: GetOp, + client_id: Option, +) -> Result<(), OpError> { let (target, id) = if let Some(GetState::PrepareRequest { key, id, .. }) = get_op.state.clone() { // the initial request must provide: @@ -554,20 +592,21 @@ pub(crate) async fn request_get(op_storage: &OpManager, get_op: GetOp) -> Result }); let msg = Some(GetMsg::RequestGet { + id, key, target, - id, fetch_contract, }); let op = GetOp { id, state: new_state, + result: None, _ttl: get_op._ttl, }; op_storage - .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Get(op)) + .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Get(op), client_id) .await?; } _ => return Err(OpError::InvalidStateTransition(get_op.id)), @@ -578,12 +617,12 @@ pub(crate) async fn request_get(op_storage: &OpManager, get_op: GetOp) -> Result mod messages { use std::fmt::Display; + use serde::{Deserialize, Serialize}; + use crate::{contract::StoreResponse, message::InnerMessage}; use super::*; - use serde::{Deserialize, Serialize}; - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) enum GetMsg { /// Internal node call to route to a peer close to the contract. diff --git a/crates/core/src/operations/join_ring.rs b/crates/core/src/operations/join_ring.rs index 050cbcbd3..ca20df520 100644 --- a/crates/core/src/operations/join_ring.rs +++ b/crates/core/src/operations/join_ring.rs @@ -6,6 +6,7 @@ use super::{OpError, OperationResult}; use crate::operations::op_trait::Operation; use crate::operations::OpInitialization; use crate::{ + client_events::ClientId, config::PEER_TIMEOUT, message::{InnerMessage, Message, Transaction}, node::{ConnectionBridge, ConnectionError, OpManager, PeerKey}, @@ -32,6 +33,10 @@ impl JoinRingOp { pub fn has_backoff(&self) -> bool { self.backoff.is_some() } + + pub(super) fn finished(&self) -> bool { + todo!() + } } pub(crate) struct JoinRingResult {} @@ -89,6 +94,7 @@ impl Operation for JoinRingOp { conn_manager: &'a mut CB, op_storage: &'a OpManager, input: Self::Message, + _client_id: Option, ) -> Pin> + Send + 'a>> { Box::pin(async move { let mut return_msg = None; diff --git a/crates/core/src/operations/op_trait.rs b/crates/core/src/operations/op_trait.rs index 683d9df43..d5a4d63f5 100644 --- a/crates/core/src/operations/op_trait.rs +++ b/crates/core/src/operations/op_trait.rs @@ -5,6 +5,7 @@ use std::pin::Pin; use futures::Future; use crate::{ + client_events::ClientId, message::{InnerMessage, Transaction}, node::OpManager, operations::{OpError, OpInitialization, OperationResult}, @@ -33,5 +34,6 @@ where conn_manager: &'a mut CB, op_storage: &'a OpManager, input: Self::Message, + client_id: Option, ) -> Pin> + Send + 'a>>; } diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 10f8b95fe..9c6a61a72 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -12,6 +12,7 @@ use freenet_stdlib::prelude::*; use super::{OpEnum, OpError, OperationResult}; use crate::{ + client_events::ClientId, config::PEER_TIMEOUT, contract::ContractHandlerEvent, message::{InnerMessage, Message, Transaction, TxType}, @@ -25,7 +26,15 @@ pub(crate) struct PutOp { state: Option, /// time left until time out, when this reaches zero it will be removed from the state _ttl: Duration, + done: bool, } + +impl PutOp { + pub(super) fn finished(&self) -> bool { + self.done + } +} + pub(crate) struct PutResult {} impl TryFrom for PutResult { @@ -61,6 +70,7 @@ impl Operation for PutOp { Ok(OpInitialization { op: Self { state: Some(PutState::ReceivedRequest), + done: false, id: tx, _ttl: PEER_TIMEOUT, }, @@ -80,10 +90,12 @@ impl Operation for PutOp { conn_manager: &'a mut CB, op_storage: &'a OpManager, input: Self::Message, + client_id: Option, ) -> Pin> + Send + 'a>> { Box::pin(async move { let return_msg; let new_state; + let mut done = false; match input { PutMsg::RequestPut { @@ -140,7 +152,7 @@ impl Operation for PutOp { .within_caching_distance(&Location::from(&key)) { tracing::debug!("Contract `{}` not cached @ peer {}", key, target.peer); - match try_to_cache_contract(op_storage, &contract, &key).await { + match try_to_cache_contract(op_storage, &contract, &key, client_id).await { Ok(_) => {} Err(err) => return Err(err), } @@ -156,7 +168,7 @@ impl Operation for PutOp { // after the contract has been cached, push the update query tracing::debug!("Attempting contract value update"); - let new_value = put_contract(op_storage, key.clone(), value).await?; + let new_value = put_contract(op_storage, key.clone(), value, client_id).await?; tracing::debug!("Contract successfully updated"); // if the change was successful, communicate this back to the requestor and broadcast the change conn_manager @@ -197,7 +209,7 @@ impl Operation for PutOp { ); match try_to_broadcast( - id, + (id, client_id), op_storage, self.state, broadcast_to, @@ -224,7 +236,8 @@ impl Operation for PutOp { let target = op_storage.ring.own_location(); tracing::debug!("Attempting contract value update"); - let new_value = put_contract(op_storage, key.clone(), new_value).await?; + let new_value = + put_contract(op_storage, key.clone(), new_value, client_id).await?; tracing::debug!("Contract successfully updated"); let broadcast_to = op_storage @@ -247,7 +260,7 @@ impl Operation for PutOp { ); match try_to_broadcast( - id, + (id, client_id), op_storage, self.state, broadcast_to, @@ -326,6 +339,7 @@ impl Operation for PutOp { tracing::debug!("Successfully updated value for {}", contract,); new_state = None; return_msg = None; + done = true; } _ => return Err(OpError::InvalidStateTransition(self.id)), }; @@ -355,7 +369,7 @@ impl Operation for PutOp { .ring .within_caching_distance(&Location::from(&key)); if !cached_contract && within_caching_dist { - match try_to_cache_contract(op_storage, &contract, &key).await { + match try_to_cache_contract(op_storage, &contract, &key, client_id).await { Ok(_) => {} Err(err) => return Err(err), } @@ -367,7 +381,7 @@ impl Operation for PutOp { }); } // after the contract has been cached, push the update query - let new_value = put_contract(op_storage, key, new_value).await?; + let new_value = put_contract(op_storage, key, new_value, client_id).await?; //update skip list skip_list.push(peer_loc.peer); @@ -391,7 +405,7 @@ impl Operation for PutOp { _ => return Err(OpError::UnexpectedOpState), } - build_op_result(self.id, new_state, return_msg, self._ttl) + build_op_result(self.id, new_state, return_msg, self._ttl, done) }) } } @@ -401,10 +415,12 @@ fn build_op_result( state: Option, msg: Option, ttl: Duration, + done: bool, ) -> Result { let output_op = Some(PutOp { id, state, + done, _ttl: ttl, }); Ok(OperationResult { @@ -417,10 +433,11 @@ async fn try_to_cache_contract<'a>( op_storage: &'a OpManager, contract: &ContractContainer, key: &ContractKey, + client_id: Option, ) -> Result<(), OpError> { // this node does not have the contract, so instead store the contract and execute the put op. let res = op_storage - .notify_contract_handler(ContractHandlerEvent::Cache(contract.clone())) + .notify_contract_handler(ContractHandlerEvent::Cache(contract.clone()), client_id) .await?; if let ContractHandlerEvent::CacheResult(Ok(_)) = res { op_storage.ring.contract_cached(key); @@ -435,7 +452,7 @@ async fn try_to_cache_contract<'a>( } async fn try_to_broadcast( - id: Transaction, + (id, client_id): (Transaction, Option), op_storage: &OpManager, state: Option, broadcast_to: Vec, @@ -471,10 +488,15 @@ async fn try_to_broadcast( let op = PutOp { id, state: new_state, + done: false, _ttl: ttl, }; op_storage - .notify_op_change(Message::from(return_msg.unwrap()), OpEnum::Put(op)) + .notify_op_change( + Message::from(return_msg.unwrap()), + OpEnum::Put(op), + client_id, + ) .await?; return Err(OpError::StatePushed); } @@ -509,6 +531,7 @@ pub(crate) fn start_op( PutOp { id, state, + done: false, _ttl: PEER_TIMEOUT, } } @@ -529,7 +552,11 @@ enum PutState { } /// Request to insert/update a value into a contract. -pub(crate) async fn request_put(op_storage: &OpManager, put_op: PutOp) -> Result<(), OpError> { +pub(crate) async fn request_put( + op_storage: &OpManager, + put_op: PutOp, + client_id: Option, +) -> Result<(), OpError> { let key = if let Some(PutState::PrepareRequest { contract, .. }) = put_op.state.clone() { contract.key() } else { @@ -570,11 +597,12 @@ pub(crate) async fn request_put(op_storage: &OpManager, put_op: PutOp) -> Result let op = PutOp { state: new_state, id, + done: false, _ttl: put_op._ttl, }; op_storage - .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Put(op)) + .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Put(op), client_id) .await?; } _ => return Err(OpError::InvalidStateTransition(put_op.id)), @@ -587,10 +615,11 @@ async fn put_contract( op_storage: &OpManager, key: ContractKey, state: WrappedState, + client_id: Option, ) -> Result { // after the contract has been cached, push the update query match op_storage - .notify_contract_handler(ContractHandlerEvent::PutQuery { key, state }) + .notify_contract_handler(ContractHandlerEvent::PutQuery { key, state }, client_id) .await { Ok(ContractHandlerEvent::PutResponse { diff --git a/crates/core/src/operations/state_machine.rs b/crates/core/src/operations/state_machine.rs deleted file mode 100644 index 8473d8dc5..000000000 --- a/crates/core/src/operations/state_machine.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! Inspired by `rust-fsm`, bought in tree for modifying and tailoring it to -//! this application needs. - -use crate::message::Transaction; - -use super::OpError; - -pub(crate) trait StateMachineImpl { - /// The input alphabet. - type Input; - /// The set of possible states. - type State; - /// The output alphabet. - type Output; - - /// The transition fuction that outputs a new state based on the current - /// state and the provided input. Outputs `None` when there is no transition - /// for a given combination of the input and the state. - fn state_transition_from_input( - _state: Self::State, - _input: Self::Input, - ) -> Option { - None - } - - fn state_transition(_state: &mut Self::State, _input: &mut Self::Input) -> Option { - None - } - - /// The output function that outputs some value from the output alphabet - /// based on the current state and the given input. Outputs `None` when - /// there is no output for a given combination of the input and the state. - fn output_from_input(_state: Self::State, _input: Self::Input) -> Option { - None - } - - fn output_from_input_as_ref( - _state: &Self::State, - _input: &Self::Input, - ) -> Option { - None - } -} - -/// A convenience wrapper around the `StateMachine` trait that encapsulates the -/// state and transition and output function calls. -pub(crate) struct StateMachine { - state: Option, - pub id: Transaction, -} - -impl StateMachine -where - T: StateMachineImpl, -{ - /// Create a new instance of this wrapper which encapsulates the given - /// state. - pub fn from_state(state: T::State, id: Transaction) -> Self { - Self { - state: Some(state), - id, - } - } - - /// Consumes the provided input, gives an output and performs a state - /// transition. If a state transition with the current state and the - /// provided input is not allowed, returns an error. - /// - /// The consumed input is moved to the state, while the output production takes it by reference. - pub fn consume_to_state( - &mut self, - input: T::Input, - ) -> Result, OpError> { - let popped_state = self - .state - .take() - .ok_or(OpError::InvalidStateTransition(self.id))?; - let output = T::output_from_input_as_ref(&popped_state, &input); - if let Some(new_state) = T::state_transition_from_input(popped_state, input) { - self.state = Some(new_state); - Ok(output) - } else { - Err(OpError::InvalidStateTransition(self.id)) - } - } - - /// Semantically similar to [`Self::consume_to_state()`] with the exception that - /// the consumed input is moved to the output, while the state change takes it by reference. - pub fn consume_to_output( - &mut self, - mut input: T::Input, - ) -> Result, OpError> { - let mut popped_state = self - .state - .take() - .ok_or(OpError::InvalidStateTransition(self.id))?; - if let Some(new_state) = T::state_transition(&mut popped_state, &mut input) { - let output = T::output_from_input(popped_state, input); - self.state = Some(new_state); - Ok(output) - } else { - Err(OpError::InvalidStateTransition(self.id)) - } - } - - /// Returns the current state. - pub fn state(&mut self) -> &mut T::State { - self.state.as_mut().expect("infallible") - } -} diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 80c85ac46..726eaad65 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -5,13 +5,13 @@ use std::time::Duration; use freenet_stdlib::prelude::*; use serde::{Deserialize, Serialize}; -use crate::operations::op_trait::Operation; -use crate::operations::OpInitialization; use crate::{ + client_events::ClientId, config::PEER_TIMEOUT, contract::ContractError, message::{Message, Transaction, TxType}, node::{ConnectionBridge, OpManager, PeerKey}, + operations::{op_trait::Operation, OpInitialization}, ring::{PeerKeyLocation, RingError}, }; @@ -27,6 +27,12 @@ pub(crate) struct SubscribeOp { _ttl: Duration, } +impl SubscribeOp { + pub fn finished(&self) -> bool { + todo!() + } +} + pub(crate) enum SubscribeResult {} impl TryFrom for SubscribeResult { @@ -84,6 +90,7 @@ impl Operation for SubscribeOp { conn_manager: &'a mut CB, op_storage: &'a OpManager, input: Self::Message, + client_id: Option, ) -> Pin> + Send + 'a>> { Box::pin(async move { let return_msg; @@ -247,6 +254,8 @@ impl Operation for SubscribeOp { sender.peer ); op_storage.ring.add_subscription(key); + let _ = client_id; + // todo: should inform back to the network event loop? match self.state { Some(SubscribeState::AwaitingResponse { .. }) => { @@ -312,6 +321,7 @@ enum SubscribeState { pub(crate) async fn request_subscribe( op_storage: &OpManager, sub_op: SubscribeOp, + client_id: Option, ) -> Result<(), OpError> { let (target, _id) = if let Some(SubscribeState::PrepareRequest { id, key }) = sub_op.state.clone() { @@ -344,7 +354,11 @@ pub(crate) async fn request_subscribe( _ttl: sub_op._ttl, }; op_storage - .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Subscribe(op)) + .notify_op_change( + msg.map(Message::from).unwrap(), + OpEnum::Subscribe(op), + client_id, + ) .await?; } _ => return Err(OpError::InvalidStateTransition(sub_op.id)), diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 7ba93714c..49e5efe66 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1,7 +1,7 @@ // TODO: complete update logic in the network pub(crate) use self::messages::UpdateMsg; -use crate::node::ConnectionBridge; +use crate::{client_events::ClientId, node::ConnectionBridge}; use super::{op_trait::Operation, OpError}; @@ -37,6 +37,7 @@ impl Operation for UpdateOp { _conn_manager: &'a mut CB, _op_storage: &'a crate::node::OpManager, _input: Self::Message, + _client_id: Option, ) -> std::pin::Pin< Box> + Send + 'a>, > { @@ -45,8 +46,11 @@ impl Operation for UpdateOp { } mod messages { + use serde::{Deserialize, Serialize}; + use crate::message::{InnerMessage, Transaction}; + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) enum UpdateMsg {} impl InnerMessage for UpdateMsg { diff --git a/crates/core/src/server/mod.rs b/crates/core/src/server/mod.rs index 48a1cb426..c46883ca9 100644 --- a/crates/core/src/server/mod.rs +++ b/crates/core/src/server/mod.rs @@ -105,7 +105,7 @@ pub mod local_node { } }; let OpenRequest { - id, + client_id: id, request, notification_channel, token, diff --git a/stdlib b/stdlib index 26c7acf86..2ee157f90 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 26c7acf8635b3ce8cef2568c159379b2ed800e73 +Subproject commit 2ee157f90449e85c0e43b552bb8653f8dd694e6b