From 265a7aaaad8c1625c76c0fea5941108828144503 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Mon, 25 Mar 2024 11:00:05 +0900 Subject: [PATCH] feat: remove gossip code BREAKING CHANGE: remove gossip as royalties will be collected via DAG --- .github/workflows/merge.yml | 113 ----- .github/workflows/nightly.yml | 115 ----- .github/workflows/nightly_wan.yml | 138 +----- sn_cli/README.md | 1 - sn_cli/src/main.rs | 9 - sn_cli/src/subcommands.rs | 4 - sn_cli/src/subcommands/gossipsub.rs | 62 --- sn_cli/src/subcommands/wallet/wo_wallet.rs | 125 +---- sn_client/src/api.rs | 91 +--- sn_client/src/event.rs | 9 - sn_client/src/test_utils.rs | 2 +- sn_faucet/src/main.rs | 2 +- sn_faucet/src/token_distribution.rs | 3 +- sn_networking/Cargo.toml | 4 +- sn_networking/src/cmd.rs | 61 +-- sn_networking/src/driver.rs | 52 -- sn_networking/src/error.rs | 11 - sn_networking/src/event.rs | 48 +- sn_networking/src/lib.rs | 20 - sn_networking/src/metrics.rs | 6 - sn_node/Cargo.toml | 33 +- sn_node/examples/register_inspect.rs | 2 +- sn_node/examples/registers.rs | 2 +- sn_node/src/bin/safenode/rpc_service.rs | 91 +--- sn_node/src/event.rs | 10 +- sn_node/src/lib.rs | 39 +- sn_node/src/node.rs | 100 +--- sn_node/src/put_validation.rs | 28 -- sn_node/tests/common/client.rs | 26 +- sn_node/tests/data_with_churn.rs | 5 +- sn_node/tests/double_spend.rs | 8 +- sn_node/tests/msgs_over_gossipsub.rs | 169 ------- sn_node/tests/nodes_rewards.rs | 444 ------------------ sn_node/tests/sequential_transfers.rs | 5 +- sn_node/tests/storage_payments.rs | 23 +- sn_node/tests/verify_data_location.rs | 5 +- sn_node_manager/src/lib.rs | 3 - sn_node_manager/src/local.rs | 3 - sn_node_rpc_client/README.md | 3 - sn_node_rpc_client/src/main.rs | 68 +-- .../src/safenode_proto/req_resp_types.proto | 31 +- sn_protocol/src/safenode_proto/safenode.proto | 12 - sn_service_management/README.md | 5 +- sn_service_management/src/error.rs | 6 - sn_service_management/src/rpc.rs | 49 +- 45 files changed, 105 insertions(+), 1941 deletions(-) delete mode 100644 sn_cli/src/subcommands/gossipsub.rs delete mode 100644 sn_node/tests/msgs_over_gossipsub.rs delete mode 100644 sn_node/tests/nodes_rewards.rs diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 8093ae56ce..69ee8f93ab 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -518,56 +518,6 @@ jobs: log_file_prefix: safe_test_logs_e2e platform: ${{ matrix.os }} - gossipsub: - if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: Gossipsub E2E tests - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - steps: - - uses: actions/checkout@v4 - - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 - - - name: Build binaries - run: cargo build --release --bin safenode --bin faucet - timeout-minutes: 30 - - - name: Build gossipsub testing executable - run: cargo test --release -p sn_node --features=local-discovery --test msgs_over_gossipsub --no-run - env: - # only set the target dir for windows to bypass the linker issue. - # happens if we build the node manager via testnet action - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 30 - - - name: Start a local network - uses: maidsafe/sn-local-testnet-action@main - with: - action: start - interval: 2000 - node-path: target/release/safenode - faucet-path: target/release/faucet - platform: ${{ matrix.os }} - build: true - - - name: Gossipsub - nodes to subscribe to topics, and publish messages - run: cargo test --release -p sn_node --features local-discovery --test msgs_over_gossipsub -- --nocapture - env: - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 20 - - - name: Stop the local network and upload logs - if: always() - uses: maidsafe/sn-local-testnet-action@main - with: - action: stop - log_file_prefix: safe_test_logs_gossipsub_e2e - platform: ${{ matrix.os }} - spend_test: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" name: spend tests against network @@ -637,69 +587,6 @@ jobs: log_file_prefix: safe_test_logs_spend platform: ${{ matrix.os }} - royalty_reward_test: - if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: royalty_reward tests against network - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - steps: - - uses: actions/checkout@v4 - - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - - uses: Swatinem/rust-cache@v2 - - - name: Build binaries - run: cargo build --release --features=local-discovery,royalties-by-gossip --bin safenode --bin faucet - timeout-minutes: 30 - - - name: Build testing executable - run: cargo test --release -p sn_node --features=local-discovery,royalties-by-gossip --test nodes_rewards --no-run - env: - # only set the target dir for windows to bypass the linker issue. - # happens if we build the node manager via testnet action - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 30 - - - name: Start a local network - uses: maidsafe/sn-local-testnet-action@main - with: - action: start - interval: 2000 - node-path: target/release/safenode - faucet-path: target/release/faucet - platform: ${{ matrix.os }} - build: true - - - name: Check SAFE_PEERS was set - shell: bash - run: | - if [[ -z "$SAFE_PEERS" ]]; then - echo "The SAFE_PEERS variable has not been set" - exit 1 - else - echo "SAFE_PEERS has been set to $SAFE_PEERS" - fi - - # This should be first to avoid slow reward acceptance etc - - name: execute the nodes rewards tests - run: cargo test --release -p sn_node --features=local-discovery,royalties-by-gossip --test nodes_rewards -- --nocapture --test-threads=1 - env: - SN_LOG: "all" - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 25 - - - name: Stop the local network and upload logs - if: always() - uses: maidsafe/sn-local-testnet-action@main - with: - action: stop - log_file_prefix: safe_test_logs_royalty_reward - platform: ${{ matrix.os }} - token_distribution_test: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" name: token distribution test diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index e773fe623a..2e6a4976c6 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -179,56 +179,6 @@ jobs: SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" SLACK_TITLE: "Nightly Unit Test Run Failed" - gossipsub: - if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: Gossipsub E2E tests - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - steps: - - uses: actions/checkout@v4 - - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 - - - name: Build binaries - run: cargo build --release --bin safenode --bin faucet - timeout-minutes: 30 - - - name: Build gossipsub testing executable - run: cargo test --release -p sn_node --features=local-discovery --test msgs_over_gossipsub --no-run - env: - # only set the target dir for windows to bypass the linker issue. - # happens if we build the node manager via testnet action - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 30 - - - name: Start a local network - uses: maidsafe/sn-local-testnet-action@main - with: - action: start - interval: 2000 - node-path: target/release/safenode - faucet-path: target/release/faucet - platform: ${{ matrix.os }} - build: true - - - name: Gossipsub - nodes to subscribe to topics, and publish messages - run: cargo test --release -p sn_node --features local-discovery --test msgs_over_gossipsub -- --nocapture - env: - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 20 - - - name: Stop the local network and upload logs - if: always() - uses: maidsafe/sn-local-testnet-action@main - with: - action: stop - log_file_prefix: safe_test_logs_gossipsub_e2e - platform: ${{ matrix.os }} - spend_test: name: spend tests against network runs-on: ${{ matrix.os }} @@ -300,71 +250,6 @@ jobs: SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" SLACK_TITLE: "Nightly Spend Test Run Failed" - royalty_reward_test: - name: royalty_reward tests against network - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - steps: - - uses: actions/checkout@v4 - - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - - uses: Swatinem/rust-cache@v2 - continue-on-error: true - - - name: Build binaries - run: cargo build --release --features=local-discovery,royalties-by-gossip --bin safenode --bin faucet - timeout-minutes: 30 - - - name: Build testing executable - run: cargo test --release -p sn_node --features=local-discovery,royalties-by-gossip --test nodes_rewards --no-run - env: - # only set the target dir for windows to bypass the linker issue. - # happens if we build the node manager via testnet action - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 30 - - - name: Start a local network - uses: maidsafe/sn-local-testnet-action@main - with: - action: start - interval: 2000 - node-path: target/release/safenode - faucet-path: target/release/faucet - platform: ${{ matrix.os }} - build: true - - # This should be first to avoid slow reward acceptance etc - - name: execute the nodes rewards tests - run: cargo test --release -p sn_node --features=local-discovery,royalties-by-gossip --test nodes_rewards -- --nocapture --test-threads=1 - env: - SN_LOG: "all" - CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} - timeout-minutes: 25 - - - name: Small wait to allow reward receipt - run: sleep 30 - timeout-minutes: 1 - - - name: Stop the local network and upload logs - if: always() - uses: maidsafe/sn-local-testnet-action@main - with: - action: stop - log_file_prefix: safe_test_logs_royalty_reward - platform: ${{ matrix.os }} - - - name: post notification to slack on failure - if: ${{ failure() }} - uses: bryannice/gitactions-slack-notification@2.0.0 - env: - SLACK_INCOMING_WEBHOOK: ${{ secrets.SLACK_GH_ACTIONS_WEBHOOK_URL }} - SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" - SLACK_TITLE: "Nightly Royalty Reward Test Run Failed" - token_distribution_test: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" name: token distribution test diff --git a/.github/workflows/nightly_wan.yml b/.github/workflows/nightly_wan.yml index b4fa1c0f28..6114eb2fb3 100644 --- a/.github/workflows/nightly_wan.yml +++ b/.github/workflows/nightly_wan.yml @@ -2,7 +2,7 @@ name: Nightly -- Full WAN Network Tests on: schedule: - - cron: '0 0 * * *' + - cron: "0 0 * * *" workflow_dispatch: env: @@ -24,7 +24,7 @@ jobs: - uses: Swatinem/rust-cache@v2 continue-on-error: true - + - name: Build safe run: cargo build --release --bin safe timeout-minutes: 30 @@ -154,105 +154,6 @@ jobs: # SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" # SLACK_TITLE: "Nightly E2E Test Run Failed" - gossipsub: - if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: Gossipsub E2E tests - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest] - steps: - - uses: actions/checkout@v4 - - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 - - - name: Build gossipsub testing executable - run: cargo test --release -p sn_node --test msgs_over_gossipsub --no-run - timeout-minutes: 30 - - - name: Start a WAN network - uses: maidsafe/sn-testnet-action@main - with: - action: create - re-attempts: 3 - rust-log: debug - ansible-vault-password: ${{ secrets.SN_TESTNET_ANSIBLE_VAULT_PASSWORD }} - aws-access-key-id: ${{ secrets.SN_TESTNET_AWS_ACCESS_KEY_ID }} - aws-access-key-secret: ${{ secrets.SN_TESTNET_AWS_SECRET_ACCESS_KEY }} - aws-region: eu-west-2 - do-token: ${{ secrets.SN_TESTNET_DO_PAT }} - ssh-secret-key: ${{ secrets.SN_TESTNET_SSH_KEY }} - security-group-id: sg-0d47df5b3f0d01e2a - subnet-id: subnet-018f2ab26755df7f9 - node-count: 20 - vm-count: 1 - provider: digital-ocean - testnet-name: NightlyGossipSubE2E - custom-node-bin-org-name: maidsafe - custom-node-bin-branch-name: main - - - name: Check env variables - shell: bash - run: | - echo "Peer is $SAFE_PEERS" - echo "Deployment inventory is $SN_INVENTORY" - - - name: Gossipsub - nodes to subscribe to topics, and publish messages - run: cargo test --release -p sn_node --test msgs_over_gossipsub -- --nocapture - timeout-minutes: 20 - - - name: Fetch network logs - if: always() - uses: maidsafe/sn-testnet-action@main - with: - action: logs - re-attempts: 3 - rust-log: debug - ansible-vault-password: ${{ secrets.SN_TESTNET_ANSIBLE_VAULT_PASSWORD }} - aws-access-key-id: ${{ secrets.SN_TESTNET_AWS_ACCESS_KEY_ID }} - aws-access-key-secret: ${{ secrets.SN_TESTNET_AWS_SECRET_ACCESS_KEY }} - aws-region: eu-west-2 - do-token: ${{ secrets.SN_TESTNET_DO_PAT }} - ssh-secret-key: ${{ secrets.SN_TESTNET_SSH_KEY }} - node-count: 20 - vm-count: 1 - provider: digital-ocean - testnet-name: NightlyGossipSubE2E - custom-node-bin-org-name: maidsafe - custom-node-bin-branch-name: main - - - name: Upload local logs - if: always() - uses: actions/upload-artifact@v4 - with: - name: local_logs_NightlyGossipSubE2E - path: | - ~/.local/share/safe/node/*/logs/*.log* - ~/.local/share/safe/*/*/*.log* - ~/.local/share/safe/client/logs/*/*.log* - - - name: Stop the WAN network - if: always() - uses: maidsafe/sn-testnet-action@main - with: - action: destroy - re-attempts: 3 - rust-log: debug - ansible-vault-password: ${{ secrets.SN_TESTNET_ANSIBLE_VAULT_PASSWORD }} - aws-access-key-id: ${{ secrets.SN_TESTNET_AWS_ACCESS_KEY_ID }} - aws-access-key-secret: ${{ secrets.SN_TESTNET_AWS_SECRET_ACCESS_KEY }} - aws-region: eu-west-2 - do-token: ${{ secrets.SN_TESTNET_DO_PAT }} - ssh-secret-key: ${{ secrets.SN_TESTNET_SSH_KEY }} - node-count: 20 - vm-count: 1 - provider: digital-ocean - testnet-name: NightlyGossipSubE2E - custom-node-bin-org-name: maidsafe - custom-node-bin-branch-name: main - spend_test: name: Spend tests against network runs-on: ${{ matrix.os }} @@ -306,7 +207,7 @@ jobs: timeout-minutes: 45 - name: execute the storage payment tests - run: cargo test --release -p sn_node --test storage_payments -- --nocapture --test-threads=1 + run: cargo test --release -p sn_node --test storage_payments -- --nocapture --test-threads=1 env: SN_LOG: "all" timeout-minutes: 45 @@ -314,7 +215,7 @@ jobs: - name: Small wait to allow reward receipt run: sleep 30 timeout-minutes: 1 - + - name: Fetch network logs if: always() uses: maidsafe/sn-testnet-action@main @@ -400,7 +301,7 @@ jobs: - uses: Swatinem/rust-cache@v2 continue-on-error: true - - name: Build churn tests + - name: Build churn tests run: cargo test --release -p sn_node --test data_with_churn --no-run timeout-minutes: 30 @@ -492,9 +393,9 @@ jobs: testnet-name: NightlyChurnTest custom-node-bin-org-name: maidsafe custom-node-bin-branch-name: main - - # TODO: re-enable the below scripts once we have proper way to restart nodes. - # Currently on remote network (not local), the nodes do not handle restart RPC cmd well. They reuse the same + + # TODO: re-enable the below scripts once we have proper way to restart nodes. + # Currently on remote network (not local), the nodes do not handle restart RPC cmd well. They reuse the same # log location and the logs are over written. Hence the scripts might give false outputs. # - name: Verify restart of nodes using rg @@ -550,9 +451,9 @@ jobs: echo "We are logging an extremely large data" exit 1 fi - # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log + # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log #faucet dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/faucet/logs/faucet.log - if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 15000 { print; exit 1 }' + if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 15000 { print; exit 1 }' then echo "We are logging an extremely large data" exit 1 @@ -563,9 +464,9 @@ jobs: then echo "Sanity check pass for local safe path" fi - # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log + # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log #faucet dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/faucet/logs/faucet.log - if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 1000 { print; exit 1 }' + if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 1000 { print; exit 1 }' then echo "Sanity check pass for wan logs path" fi @@ -578,7 +479,6 @@ jobs: # SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" # SLACK_TITLE: "Nightly Churn Test Run Failed" - verify_data_location_routing_table: name: Verify data location and Routing Table runs-on: ${{ matrix.os }} @@ -648,7 +548,7 @@ jobs: timeout-minutes: 90 - name: Verify the routing tables of the nodes - run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture + run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture timeout-minutes: 5 - name: Fetch network logs @@ -701,8 +601,8 @@ jobs: custom-node-bin-org-name: maidsafe custom-node-bin-branch-name: main - # TODO: re-enable the below scripts once we have proper way to restart nodes. - # Currently on remote network (not local), the nodes do not handle restart RPC cmd well. They reuse the same + # TODO: re-enable the below scripts once we have proper way to restart nodes. + # Currently on remote network (not local), the nodes do not handle restart RPC cmd well. They reuse the same # log location and the logs are over written. Hence the scripts might give false outputs. # - name: Verify restart of nodes using rg @@ -742,9 +642,9 @@ jobs: echo "We are logging an extremely large data" exit 1 fi - # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log + # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log #faucet dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/faucet/logs/faucet.log - if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 15000 { print; exit 1 }' + if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 15000 { print; exit 1 }' then echo "We are logging an extremely large data" exit 1 @@ -755,9 +655,9 @@ jobs: then echo "Sanity check pass for local safe path" fi - # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log + # node dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/safenode1/safenode.log #faucet dir structure: ~/sn-testnet-deploy/logs/NightlyChurnTest/NightlyChurnTest-genesis/faucet/logs/faucet.log - if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 1000 { print; exit 1 }' + if ! rg '^' "${{ matrix.wan_logs_path }}"/*/*/*/ | awk 'length($0) > 1000 { print; exit 1 }' then echo echo "Sanity check pass for wan logs path" fi diff --git a/sn_cli/README.md b/sn_cli/README.md index 9f031e13c2..f1a2f29edf 100644 --- a/sn_cli/README.md +++ b/sn_cli/README.md @@ -7,4 +7,3 @@ The `safe` binary includes the following subcommands: - `wallet`: Commands for wallet management. This includes creating wallets, checking balances, and making transactions. - `files`: Commands for file management. This includes uploading, downloading, and deleting files. - `register`: Commands for register management. This includes creating, reading, and writing to registers. -- `gossipsub`: Commands for gossipsub management. This includes subscribing to topics and publishing messages. diff --git a/sn_cli/src/main.rs b/sn_cli/src/main.rs index c134428a68..9e6fb73970 100644 --- a/sn_cli/src/main.rs +++ b/sn_cli/src/main.rs @@ -17,7 +17,6 @@ use crate::{ subcommands::{ files::files_cmds, folders::folders_cmds, - gossipsub::gossipsub_cmds, register::register_cmds, wallet::{ hot_wallet::{wallet_cmds, wallet_cmds_without_client, WalletCmds}, @@ -115,12 +114,6 @@ async fn main() -> Result<()> { Some(bootstrap_peers) }; - // use gossipsub only for the wallet cmd that requires it. - let joins_gossipsub = matches!( - opt.cmd, - SubCmd::WatchOnlyWallet(WatchOnlyWalletCmds::ReceiveOnline { .. }) - ); - // get the broadcaster as we want to have our own progress bar. let broadcaster = ClientEventsBroadcaster::default(); let progress_bar_handler = spawn_connection_progress_bar(broadcaster.subscribe()); @@ -128,7 +121,6 @@ async fn main() -> Result<()> { let result = Client::new( secret_key, bootstrap_peers, - joins_gossipsub, opt.connection_timeout, Some(broadcaster), ) @@ -158,7 +150,6 @@ async fn main() -> Result<()> { SubCmd::Register(cmds) => { register_cmds(cmds, &client, &client_data_dir_path, should_verify_store).await? } - SubCmd::Gossipsub(cmds) => gossipsub_cmds(cmds, &client).await?, }; Ok(()) diff --git a/sn_cli/src/subcommands.rs b/sn_cli/src/subcommands.rs index 5442f65acf..1980469f4b 100644 --- a/sn_cli/src/subcommands.rs +++ b/sn_cli/src/subcommands.rs @@ -9,7 +9,6 @@ mod acc_packet; pub(crate) mod files; pub(crate) mod folders; -pub(crate) mod gossipsub; pub(crate) mod register; pub(crate) mod wallet; @@ -35,7 +34,4 @@ pub(super) enum SubCmd { #[clap(name = "register", subcommand)] /// Commands for register management Register(register::RegisterCmds), - #[clap(name = "gossipsub", subcommand)] - /// Commands for gossipsub management - Gossipsub(gossipsub::GossipsubCmds), } diff --git a/sn_cli/src/subcommands/gossipsub.rs b/sn_cli/src/subcommands/gossipsub.rs deleted file mode 100644 index 8e93f03121..0000000000 --- a/sn_cli/src/subcommands/gossipsub.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use clap::Subcommand; -use color_eyre::Result; -use sn_client::{Client, ClientEvent}; - -#[derive(Subcommand, custom_debug::Debug)] -pub enum GossipsubCmds { - /// Subscribe to a topic and listen for messages published on it - Subscribe { - /// The name of the topic. - #[clap(name = "topic")] - topic: String, - }, - /// Unsubscribe from a topic - Unsubscribe { - /// The name of the topic. - #[clap(name = "topic")] - topic: String, - }, - /// Publish a message on a given topic - Publish { - /// The name of the topic. - #[clap(name = "topic")] - topic: String, - /// The message to publish. - #[clap(name = "msg")] - #[debug(skip)] - msg: String, - }, -} - -pub(crate) async fn gossipsub_cmds(cmds: GossipsubCmds, client: &Client) -> Result<()> { - match cmds { - GossipsubCmds::Subscribe { topic } => { - client.subscribe_to_topic(topic.clone()); - println!("Subscribed to topic '{topic}'. Listening for messages published on it..."); - let mut events_channel = client.events_channel(); - while let Ok(event) = events_channel.recv().await { - if let ClientEvent::GossipsubMsg { msg, .. } = event { - let msg = String::from_utf8(msg.to_vec())?; - println!("New message published: {msg}"); - } - } - } - GossipsubCmds::Unsubscribe { topic } => { - client.unsubscribe_from_topic(topic.clone()); - println!("Unsubscribed from topic '{topic}'."); - } - GossipsubCmds::Publish { topic, msg } => { - client.publish_on_topic(topic.clone(), msg.into()); - println!("Message published on topic '{topic}'."); - } - } - Ok(()) -} diff --git a/sn_cli/src/subcommands/wallet/wo_wallet.rs b/sn_cli/src/subcommands/wallet/wo_wallet.rs index 6cb7fac510..7a955956d1 100644 --- a/sn_cli/src/subcommands/wallet/wo_wallet.rs +++ b/sn_cli/src/subcommands/wallet/wo_wallet.rs @@ -8,7 +8,7 @@ use super::{helpers::verify_spend_at, watch_only_wallet_from_pk, WalletApiHelper}; -use bls::{PublicKey, PK_SIZE}; +use bls::PublicKey; use clap::Parser; use color_eyre::{ eyre::{bail, eyre}, @@ -16,20 +16,17 @@ use color_eyre::{ }; use dialoguer::Confirm; use sn_client::transfers::{ - CashNoteRedemption, DerivationIndex, MainPubkey, NanoTokens, OfflineTransfer, SignedSpend, - UniquePubkey, WalletError, WatchOnlyWallet, + DerivationIndex, MainPubkey, NanoTokens, OfflineTransfer, SignedSpend, UniquePubkey, + WatchOnlyWallet, }; -use sn_client::{Client, ClientEvent}; +use sn_client::Client; use std::{ collections::{BTreeMap, BTreeSet}, - path::{Path, PathBuf}, + path::Path, str::FromStr, }; use walkdir::WalkDir; -const DEFAULT_RECEIVE_ONLINE_WALLET_DIR: &str = "receive_online"; -const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION"; - // Please do not remove the blank lines in these doc comments. // They are used for inserting line breaks when the help menu is rendered in the UI. #[derive(Parser, Debug)] @@ -94,20 +91,6 @@ pub enum WatchOnlyWalletCmds { #[clap(long, name = "force", default_value = "false")] force: bool, }, - /// Listen for transfer notifications from the network over gossipsub protocol. - /// - /// Transfers will be deposited to the watch-only wallet. - /// - /// Only cash notes owned by the provided public key will be accepted, verified to be valid - /// against the network, and deposited onto a locally stored watch-only wallet. - ReceiveOnline { - /// Hex-encoded main public key - #[clap(name = "public key")] - pk: String, - /// Optional path where to store the wallet - #[clap(name = "path")] - path: Option, - }, /// Verify a spend on the Network. Verify { /// The Network address or hex encoded UniquePubkey of the Spend to verify @@ -192,17 +175,13 @@ pub(crate) async fn wo_wallet_cmds_without_client( pub(crate) async fn wo_wallet_cmds( cmds: WatchOnlyWalletCmds, client: &Client, - root_dir: &Path, + _root_dir: &Path, verify_store: bool, ) -> Result<()> { match cmds { WatchOnlyWalletCmds::Broadcast { signed_tx, force } => { broadcast_signed_spends(signed_tx, client, verify_store, force).await } - WatchOnlyWalletCmds::ReceiveOnline { pk, path } => { - let wallet_dir = path.unwrap_or(root_dir.join(DEFAULT_RECEIVE_ONLINE_WALLET_DIR)); - listen_notifs_and_deposit(&wallet_dir, client, pk).await - } WatchOnlyWalletCmds::Verify { spend_address, genesis, @@ -355,95 +334,3 @@ async fn broadcast_signed_spends( Ok(()) } - -async fn listen_notifs_and_deposit(root_dir: &Path, client: &Client, pk_hex: String) -> Result<()> { - let mut wallet = match MainPubkey::from_hex(&pk_hex) { - Ok(main_pk) => watch_only_wallet_from_pk(main_pk, root_dir)?, - Err(err) => return Err(eyre!("Failed to parse hex-encoded public key: {err:?}")), - }; - - let main_pk = wallet.address(); - let pk = main_pk.public_key(); - - client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); - let mut events_receiver = client.events_channel(); - - println!("Current balance in local wallet: {}", wallet.balance()); - println!("Listening to transfers notifications for {pk:?}... (press Ctrl+C to exit)"); - println!(); - - while let Ok(event) = events_receiver.recv().await { - let cash_notes = match event { - ClientEvent::GossipsubMsg { topic, msg } => { - // we assume it's a notification of a transfer as that's the only topic we've subscribed to - match try_decode_transfer_notif(&msg) { - Err(err) => { - println!("GossipsubMsg received on topic '{topic}' couldn't be decoded as transfer notif: {err:?}"); - continue; - } - Ok((key, _)) if key != pk => continue, - Ok((key, cashnote_redemptions)) => { - println!("New transfer notification received for {key:?}, containing {} CashNoteRedemption/s.", cashnote_redemptions.len()); - match client - .verify_cash_notes_redemptions(main_pk, &cashnote_redemptions) - .await - { - Err(err) => { - println!("At least one of the CashNoteRedemptions received is invalid, dropping them: {err:?}"); - continue; - } - Ok(cash_notes) => cash_notes, - } - } - } - } - _other_event => continue, - }; - - cash_notes.iter().for_each(|cn| { - let value = match cn.value() { - Ok(value) => value.to_string(), - Err(err) => { - println!("Failed to obtain cash note value: {err}"); - "unknown".to_string() - } - }; - println!( - "CashNote received with {:?}, value: {value}", - cn.unique_pubkey(), - ); - }); - - match wallet.deposit_and_store_to_disk(&cash_notes) { - Ok(()) => {} - Err(err @ WalletError::Io(_)) => { - println!("ERROR: Failed to deposit the received cash notes: {err}"); - println!(); - println!("WARNING: we'll try to reload/recreate the local wallet now, but if it was corrupted there could have been lost funds."); - println!(); - wallet.reload_from_disk_or_recreate()?; - wallet.deposit_and_store_to_disk(&cash_notes)?; - } - Err(other_err) => return Err(other_err.into()), - } - - println!( - "New balance after depositing received CashNote/s: {}", - wallet.balance() - ); - println!(); - } - - Ok(()) -} - -fn try_decode_transfer_notif(msg: &[u8]) -> Result<(PublicKey, Vec)> { - let mut key_bytes = [0u8; PK_SIZE]; - key_bytes.copy_from_slice( - msg.get(0..PK_SIZE) - .ok_or_else(|| eyre!("msg doesn't have enough bytes"))?, - ); - let key = PublicKey::from_bytes(key_bytes)?; - let cashnote_redemptions: Vec = rmp_serde::from_slice(&msg[PK_SIZE..])?; - Ok((key, cashnote_redemptions)) -} diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 2a1e2292a6..334531653d 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -60,7 +60,7 @@ const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(30); impl Client { /// A quick client that only takes some peers to connect to pub async fn quick_start(peers: Option>) -> Result { - Self::new(SecretKey::random(), peers, false, None, None).await + Self::new(SecretKey::random(), peers, None, None).await } /// Instantiate a new client. @@ -71,7 +71,6 @@ impl Client { /// # Arguments /// * 'signer' - [SecretKey] /// * 'peers' - [Option]<[Vec]<[Multiaddr]>> - /// * 'enable_gossip' - Boolean: Signifies whether the client should attempt to join the gossip layer in the network. i.e to monitor network royalties. /// * 'connection_timeout' - [Option]<[Duration]> : Specification for client connection timeout set via Optional /// * 'client_event_broadcaster' - [Option]<[ClientEventsBroadcaster]> /// @@ -88,7 +87,6 @@ impl Client { pub async fn new( signer: SecretKey, peers: Option>, - enable_gossip: bool, connection_timeout: Option, client_event_broadcaster: Option, ) -> Result { @@ -107,12 +105,11 @@ impl Client { let root_dir = std::env::temp_dir(); trace!("Starting Kad swarm in client mode..{root_dir:?}."); - let mut network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), local, root_dir); - - if enable_gossip { - network_builder.enable_gossip(); - } + #[cfg(not(feature = "open-metrics"))] + let network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), local, root_dir); + #[cfg(feature = "open-metrics")] + let mut network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), local, root_dir); #[cfg(feature = "open-metrics")] network_builder.metrics_registry(Registry::default()); @@ -255,11 +252,6 @@ impl Client { debug!("{peers_added}/{CLOSE_GROUP_SIZE} initial peers found.",); } } - NetworkEvent::GossipsubMsgReceived { topic, msg } - | NetworkEvent::GossipsubMsgPublished { topic, msg } => { - self.events_broadcaster - .broadcast(ClientEvent::GossipsubMsg { topic, msg }); - } _other => {} } @@ -282,9 +274,7 @@ impl Client { /// // Using client.events_channel() to publish messages /// let mut events_channel = client.events_channel(); /// while let Ok(event) = events_channel.recv().await { - /// if let ClientEvent::GossipsubMsg { msg, .. } = event { - /// let msg = String::from_utf8(msg.to_vec()).unwrap(); - /// println!("New message published: {msg}"); + /// // Handle the event /// } /// } /// # Ok(()) @@ -912,75 +902,6 @@ impl Client { } } - /// Subscribe to given gossipsub topic - /// - /// # Arguments - /// * 'topic_id' - [String] - /// - /// # Example - /// ```no_run - /// use sn_client::{Client, Error}; - /// use bls::SecretKey; - /// # #[tokio::main] - /// # async fn main() -> Result<(),Error>{ - /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; - /// // Subscribing to the gossipsub topic "Royalty Transfer Notification" - /// client.subscribe_to_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION")); - /// # Ok(()) - /// # } - /// ``` - pub fn subscribe_to_topic(&self, topic_id: String) { - info!("Subscribing to topic id: {topic_id}"); - self.network.subscribe_to_topic(topic_id); - self.network.start_handle_gossip(); - } - - /// Unsubscribe from given gossipsub topic - /// - /// # Arguments - /// * 'topic_id' - [String] - /// - /// # Example - /// ```no_run - /// use sn_client::{Client, Error}; - /// use bls::SecretKey; - /// # #[tokio::main] - /// # async fn main() -> Result<(),Error>{ - /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; - /// // Unsubscribing to the gossipsub topic "Royalty Transfer Notification" - /// client.unsubscribe_from_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION")); - /// # Ok(()) - /// # } - /// ``` - pub fn unsubscribe_from_topic(&self, topic_id: String) { - info!("Unsubscribing from topic id: {topic_id}"); - self.network.unsubscribe_from_topic(topic_id); - } - - /// Publish message on given topic - /// - /// # Arguments - /// * 'topic_id' - [String] - /// * 'msg' - [Bytes] - /// - /// # Example - /// ```no_run - /// use sn_client::{Client, Error}; - /// use bls::SecretKey; - /// # #[tokio::main] - /// # async fn main() -> Result<(),Error>{ - /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; - /// let msg = String::from("Transfer Successful."); - /// // Note the use of .into() to set the argument as bytes - /// client.publish_on_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION"), msg.into()); - /// # Ok(()) - /// # } - /// ``` - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { - info!("Publishing msg on topic id: {topic_id}"); - self.network.publish_on_topic(topic_id, msg); - } - /// This function is used to receive a Vector of CashNoteRedemptions and turn them back into spendable CashNotes. /// For this we need a network connection. /// Verify CashNoteRedemptions and rebuild spendable currency from them. diff --git a/sn_client/src/event.rs b/sn_client/src/event.rs index 10b2f44aad..0f53c6a79f 100644 --- a/sn_client/src/event.rs +++ b/sn_client/src/event.rs @@ -6,7 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use bytes::Bytes; use serde::Serialize; use tokio::sync::broadcast::{self, error::RecvError}; @@ -48,14 +47,6 @@ pub enum ClientEvent { /// No network activity has been received for a given duration /// we should error out InactiveClient(tokio::time::Duration), - /// Gossipsub message received on a topic the client has subscribed to - GossipsubMsg { - /// Topic the message was published on - topic: String, - /// The raw bytes of the received message - #[debug(skip)] - msg: Bytes, - }, } /// Receiver Channel where users of the public API can listen to events broadcasted by the client. diff --git a/sn_client/src/test_utils.rs b/sn_client/src/test_utils.rs index 624f7d3a15..bff4b42636 100644 --- a/sn_client/src/test_utils.rs +++ b/sn_client/src/test_utils.rs @@ -49,7 +49,7 @@ pub async fn get_new_client(owner_sk: SecretKey) -> Result { }; println!("Client bootstrap with peer {bootstrap_peers:?}"); - let client = Client::new(owner_sk, bootstrap_peers, false, None, None).await?; + let client = Client::new(owner_sk, bootstrap_peers, None, None).await?; Ok(client) } diff --git a/sn_faucet/src/main.rs b/sn_faucet/src/main.rs index d1eaca9d81..70f005c380 100644 --- a/sn_faucet/src/main.rs +++ b/sn_faucet/src/main.rs @@ -60,7 +60,7 @@ async fn main() -> Result<()> { let secret_key = bls::SecretKey::random(); let broadcaster = ClientEventsBroadcaster::default(); let handle = spawn_connection_progress_bar(broadcaster.subscribe()); - let result = Client::new(secret_key, bootstrap_peers, false, None, Some(broadcaster)).await; + let result = Client::new(secret_key, bootstrap_peers, None, Some(broadcaster)).await; // await on the progress bar to complete before handling the client result. If client errors out, we would // want to make the progress bar clean up gracefully. diff --git a/sn_faucet/src/token_distribution.rs b/sn_faucet/src/token_distribution.rs index db143696af..ef672f4e8f 100644 --- a/sn_faucet/src/token_distribution.rs +++ b/sn_faucet/src/token_distribution.rs @@ -550,8 +550,7 @@ mod tests { .cashnote_redemptions(&MainSecretKey::new(wallet_sk.clone())) .is_ok()); - let receiver_client = - Client::new(bls::SecretKey::random(), None, false, None, None).await?; + let receiver_client = Client::new(bls::SecretKey::random(), None, None, None).await?; let tmp_path = TempDir::new()?.path().to_owned(); let receiver_wallet = HotWallet::load_from_path(&tmp_path, Some(MainSecretKey::new(wallet_sk)))?; diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index c76115428d..1ef9822a44 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -42,14 +42,13 @@ libp2p = { version = "0.53", features = [ "noise", "tcp", "yamux", - "gossipsub", "websocket", ] } prometheus-client = { version = "0.22", optional = true } rand = { version = "~0.8.5", features = ["small_rng"] } rayon = "1.8.0" rmp-serde = "1.1.1" -serde = { version = "1.0.133", features = [ "derive", "rc" ]} +serde = { version = "1.0.133", features = ["derive", "rc"] } sn_protocol = { path = "../sn_protocol", version = "0.15.5" } sn_transfers = { path = "../sn_transfers", version = "0.16.5" } sn_registers = { path = "../sn_registers", version = "0.3.9" } @@ -98,7 +97,6 @@ libp2p = { version = "0.53", features = [ "identify", "noise", "yamux", - "gossipsub", "websocket-websys", "wasm-bindgen", ] } diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 9537c85c07..47d031740e 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -12,7 +12,6 @@ use crate::{ multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE, REPLICATE_RANGE, }; -use bytes::Bytes; use libp2p::{ kad::{store::RecordStore, Quorum, Record, RecordKey}, swarm::dial_opts::DialOpts, @@ -150,18 +149,6 @@ pub enum SwarmCmd { }, /// Triggers interval repliation TriggerIntervalReplication, - /// Subscribe to a given Gossipsub topic - GossipsubSubscribe(String), - /// Unsubscribe from a given Gossipsub topic - GossipsubUnsubscribe(String), - /// Publish a message through Gossipsub protocol - GossipsubPublish { - /// Topic to publish on - topic_id: String, - /// Raw bytes of the message to publish - msg: Bytes, - }, - GossipHandler, /// Notify whether peer is in trouble SendNodeStatus { peer_id: PeerId, @@ -230,19 +217,6 @@ impl Debug for SwarmCmd { SwarmCmd::TriggerIntervalReplication => { write!(f, "SwarmCmd::TriggerIntervalReplication") } - SwarmCmd::GossipsubSubscribe(topic) => { - write!(f, "SwarmCmd::GossipsubSubscribe({topic:?})") - } - SwarmCmd::GossipsubUnsubscribe(topic) => { - write!(f, "SwarmCmd::GossipsubUnsubscribe({topic:?})") - } - SwarmCmd::GossipsubPublish { topic_id, msg } => { - write!( - f, - "SwarmCmd::GossipsubPublish {{ topic_id: {topic_id:?}, msg len: {:?} }}", - msg.len() - ) - } SwarmCmd::DialWithOpts { opts, .. } => { write!(f, "SwarmCmd::DialWithOpts {{ opts: {opts:?} }}") } @@ -293,9 +267,6 @@ impl Debug for SwarmCmd { SwarmCmd::SendRequest { req, peer, .. } => { write!(f, "SwarmCmd::SendRequest req: {req:?}, peer: {peer:?}") } - SwarmCmd::GossipHandler => { - write!(f, "SwarmCmd::GossipHandler") - } SwarmCmd::SendNodeStatus { peer_id, is_bad } => { write!( f, @@ -320,7 +291,7 @@ pub struct SwarmLocalState { impl SwarmDriver { pub(crate) fn handle_cmd(&mut self, cmd: SwarmCmd) -> Result<(), NetworkError> { let start = Instant::now(); - let mut cmd_string = ""; + let mut cmd_string; match cmd { SwarmCmd::TriggerIntervalReplication => { cmd_string = "TriggerIntervalReplication"; @@ -706,37 +677,7 @@ impl SwarmDriver { .send(current_state) .map_err(|_| NetworkError::InternalMsgChannelDropped)?; } - SwarmCmd::GossipsubSubscribe(topic_id) => { - let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); - if let Some(gossip) = self.swarm.behaviour_mut().gossipsub.as_mut() { - gossip.subscribe(&topic_id)?; - } - } - SwarmCmd::GossipsubUnsubscribe(topic_id) => { - let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); - if let Some(gossip) = self.swarm.behaviour_mut().gossipsub.as_mut() { - gossip.unsubscribe(&topic_id)?; - } - } - SwarmCmd::GossipsubPublish { topic_id, msg } => { - cmd_string = "GossipsubPublish"; - // If we publish a Gossipsub message, we might not receive the same message on our side. - // Hence push an event to notify that we've published a message - if self.is_gossip_handler { - self.send_event(NetworkEvent::GossipsubMsgPublished { - topic: topic_id.clone(), - msg: msg.clone(), - }); - } - let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); - if let Some(gossip) = self.swarm.behaviour_mut().gossipsub.as_mut() { - gossip.publish(topic_id, msg)?; - } - } - SwarmCmd::GossipHandler => { - self.is_gossip_handler = true; - } SwarmCmd::SendNodeStatus { peer_id, is_bad } => { cmd_string = "SendNodeStatus"; let _ = self.bad_nodes_ongoing_verifications.remove(&peer_id); diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 0963146367..cd84887ad0 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -37,7 +37,6 @@ use libp2p::{ multiaddr::Protocol, request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport}, swarm::{ - behaviour::toggle::Toggle, dial_opts::{DialOpts, PeerCondition}, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, Swarm, }, @@ -57,7 +56,6 @@ use std::{ num::NonZeroUsize, path::PathBuf, }; -use tiny_keccak::{Hasher, Sha3}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tracing::warn; @@ -185,7 +183,6 @@ pub(super) struct NodeBehaviour { #[cfg(feature = "local-discovery")] pub(super) mdns: mdns::tokio::Behaviour, pub(super) identify: libp2p::identify::Behaviour, - pub(super) gossipsub: Toggle, } #[derive(Debug)] @@ -194,7 +191,6 @@ pub struct NetworkBuilder { local: bool, root_dir: PathBuf, listen_addr: Option, - enable_gossip: bool, request_timeout: Option, concurrency_limit: Option, #[cfg(feature = "open-metrics")] @@ -210,7 +206,6 @@ impl NetworkBuilder { local, root_dir, listen_addr: None, - enable_gossip: false, request_timeout: None, concurrency_limit: None, #[cfg(feature = "open-metrics")] @@ -224,11 +219,6 @@ impl NetworkBuilder { self.listen_addr = Some(listen_addr); } - /// Enable gossip for the network - pub fn enable_gossip(&mut self) { - self.enable_gossip = true; - } - pub fn request_timeout(&mut self, request_timeout: Duration) { self.request_timeout = Some(request_timeout); } @@ -466,42 +456,6 @@ impl NetworkBuilder { let main_transport = transport::build_transport(&self.keypair); - let gossipsub = if self.enable_gossip { - // Gossipsub behaviour - let gossipsub_config = libp2p::gossipsub::ConfigBuilder::default() - // we don't currently require source peer id and/or signing - .validation_mode(libp2p::gossipsub::ValidationMode::Permissive) - // we use the hash of the msg content as the msg id to deduplicate them - .message_id_fn(|msg| { - let mut sha3 = Sha3::v256(); - let mut msg_id = [0; 32]; - sha3.update(&msg.data); - sha3.finalize(&mut msg_id); - msg_id.into() - }) - // set the heartbeat interval to be higher than default 1sec - .heartbeat_interval(Duration::from_secs(5)) - // default is 3sec, increase to 10sec to avoid false alert - .iwant_followup_time(Duration::from_secs(10)) - // default is 10sec, increase to 60sec to reduce the risk of looping - .published_message_ids_cache_time(Duration::from_secs(60)) - .build() - .map_err(|err| NetworkError::GossipsubConfigError(err.to_string()))?; - - // Set the message authenticity - let message_authenticity = libp2p::gossipsub::MessageAuthenticity::Anonymous; - - // build a gossipsub network behaviour - let gossipsub: libp2p::gossipsub::Behaviour = - libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config) - .expect("Failed to instantiate Gossipsub behaviour."); - Some(gossipsub) - } else { - None - }; - - let gossipsub = Toggle::from(gossipsub); - let transport = if !self.local { debug!("Preventing non-global dials"); // Wrap upper in a transport that prevents dialing local addresses. @@ -516,7 +470,6 @@ impl NetworkBuilder { identify, #[cfg(feature = "local-discovery")] mdns, - gossipsub, }; #[cfg(not(target_arch = "wasm32"))] @@ -550,7 +503,6 @@ impl NetworkBuilder { // We use 255 here which allows covering a network larger than 64k without any rotating. // This is based on the libp2p kad::kBuckets peers distribution. dialed_peers: CircularVec::new(255), - is_gossip_handler: false, network_discovery: NetworkDiscovery::new(&peer_id), bootstrap_peers: Default::default(), live_connected_peers: Default::default(), @@ -598,10 +550,6 @@ pub struct SwarmDriver { pub(crate) pending_get_record: PendingGetRecord, /// A list of the most recent peers we have dialed ourselves. pub(crate) dialed_peers: CircularVec, - // For normal nodes, though they subscribe to the gossip topic - // (to ensure no miss-up by carrying out libp2p low level gossip forwarding), - // they are not supposed to process the gossip msg that received from libp2p. - pub(crate) is_gossip_handler: bool, // A list of random `PeerId` candidates that falls into kbuckets, // This is to ensure a more accurate network discovery. pub(crate) network_discovery: NetworkDiscovery, diff --git a/sn_networking/src/error.rs b/sn_networking/src/error.rs index c9a8fb1288..978bcb5f1a 100644 --- a/sn_networking/src/error.rs +++ b/sn_networking/src/error.rs @@ -7,7 +7,6 @@ // permissions and limitations relating to use of the SAFE Network Software. use libp2p::{ - gossipsub::{PublishError, SubscriptionError}, kad::{self, QueryId, Record}, request_response::{OutboundFailure, OutboundRequestId}, swarm::DialError, @@ -152,16 +151,6 @@ pub enum NetworkError { source: std::io::Error, }, - // ---------- GossipSub Errors - #[error("Could ont build the gossipsub config: {0}")] - GossipsubConfigError(String), - - #[error("Gossipsub publish Error: {0}")] - GossipsubPublishError(#[from] PublishError), - - #[error("Gossipsub subscribe Error: {0}")] - GossipsubSubscriptionError(#[from] SubscriptionError), - // ---------- Internal Network Errors #[error("Could not get enough peers ({required}) to satisfy the request, found {found}")] NotEnoughPeers { found: usize, required: usize }, diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 7689b3db28..947f9dbe91 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -12,7 +12,6 @@ use crate::{ multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, CLOSE_GROUP_SIZE, REPLICATE_RANGE, }; -use bytes::Bytes; use core::fmt; use custom_debug::Debug as CustomDebug; use itertools::Itertools; @@ -55,7 +54,6 @@ pub(super) enum NodeEvent { #[cfg(feature = "local-discovery")] Mdns(Box), Identify(Box), - Gossipsub(Box), } impl From> for NodeEvent { @@ -83,12 +81,6 @@ impl From for NodeEvent { } } -impl From for NodeEvent { - fn from(event: libp2p::gossipsub::Event) -> Self { - NodeEvent::Gossipsub(Box::new(event)) - } -} - #[derive(CustomDebug)] /// Channel to send the `Response` through. pub enum MsgResponder { @@ -123,20 +115,6 @@ pub enum NetworkEvent { NewListenAddr(Multiaddr), /// Report unverified record UnverifiedRecord(Record), - /// Gossipsub message received - GossipsubMsgReceived { - /// Topic the message was published on - topic: String, - /// The raw bytes of the received message - msg: Bytes, - }, - /// The Gossipsub message that we published - GossipsubMsgPublished { - /// Topic the message was published on - topic: String, - /// The raw bytes of the sent message - msg: Bytes, - }, /// Terminate Node on HDD write erros TerminateNode, /// List of peer nodes that failed to fetch replication copy from. @@ -177,12 +155,6 @@ impl Debug for NetworkEvent { let pretty_key = PrettyPrintRecordKey::from(&record.key); write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})") } - NetworkEvent::GossipsubMsgReceived { topic, .. } => { - write!(f, "NetworkEvent::GossipsubMsgReceived({topic})") - } - NetworkEvent::GossipsubMsgPublished { topic, .. } => { - write!(f, "NetworkEvent::GossipsubMsgPublished({topic})") - } NetworkEvent::TerminateNode => { write!(f, "NetworkEvent::TerminateNode") } @@ -362,25 +334,7 @@ impl SwarmDriver { } } } - SwarmEvent::Behaviour(NodeEvent::Gossipsub(event)) => { - event_string = "gossip"; - - if self.is_gossip_handler { - match *event { - libp2p::gossipsub::Event::Message { - message, - message_id, - .. - } => { - info!("Gossipsub message received, id: {message_id:?}"); - let topic = message.topic.into_string(); - let msg = Bytes::from(message.data); - self.send_event(NetworkEvent::GossipsubMsgReceived { topic, msg }); - } - other => trace!("Gossipsub Event has been ignored: {other:?}"), - } - } - } + SwarmEvent::NewListenAddr { address, .. } => { event_string = "new listen addr"; diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index d6a3b7a1eb..e7b81d29ac 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -43,7 +43,6 @@ pub use self::{ use self::{cmd::SwarmCmd, error::Result}; use backoff::{Error as BackoffError, ExponentialBackoff}; -use bytes::Bytes; use futures::future::select_all; use libp2p::{ identity::Keypair, @@ -399,21 +398,6 @@ impl Network { get_fees_from_store_cost_responses(all_costs) } - /// Subscribe to given gossipsub topic - pub fn subscribe_to_topic(&self, topic_id: String) { - self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id)); - } - - /// Unsubscribe from given gossipsub topic - pub fn unsubscribe_from_topic(&self, topic_id: String) { - self.send_swarm_cmd(SwarmCmd::GossipsubUnsubscribe(topic_id)); - } - - /// Publish a msg on a given topic - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { - self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg }); - } - /// Get a record from the network /// This differs from non-wasm32 builds as no retries are applied #[cfg(target_arch = "wasm32")] @@ -735,10 +719,6 @@ impl Network { Ok(state) } - pub fn start_handle_gossip(&self) { - self.send_swarm_cmd(SwarmCmd::GossipHandler) - } - pub fn trigger_interval_replication(&self) { self.send_swarm_cmd(SwarmCmd::TriggerIntervalReplication) } diff --git a/sn_networking/src/metrics.rs b/sn_networking/src/metrics.rs index 7f6cb96644..0c20050a89 100644 --- a/sn_networking/src/metrics.rs +++ b/sn_networking/src/metrics.rs @@ -96,12 +96,6 @@ impl NetworkMetrics { } } -impl Recorder for NetworkMetrics { - fn record(&self, event: &libp2p::gossipsub::Event) { - self.libp2p_metrics.record(event) - } -} - impl Recorder for NetworkMetrics { fn record(&self, event: &libp2p::kad::Event) { self.libp2p_metrics.record(event) diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 8883b943e2..c68ad6a91e 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -14,14 +14,13 @@ name = "safenode" path = "src/bin/safenode/main.rs" [features] -default=["metrics"] -local-discovery=["sn_networking/local-discovery"] +default = ["metrics"] +local-discovery = ["sn_networking/local-discovery"] otlp = ["sn_logging/otlp"] metrics = ["sn_logging/process-metrics"] network-contacts = ["sn_peers_acquisition/network-contacts"] open-metrics = ["sn_networking/open-metrics", "prometheus-client"] -encrypt-records=["sn_networking/encrypt-records"] -royalties-by-gossip = [] +encrypt-records = ["sn_networking/encrypt-records"] [dependencies] assert_fs = "1.0.0" @@ -39,7 +38,7 @@ futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.11.0" lazy_static = "~1.4.0" -libp2p = { version="0.53", features = ["tokio", "dns", "kad", "macros"] } +libp2p = { version = "0.53", features = ["tokio", "dns", "kad", "macros"] } prometheus-client = { version = "0.22", optional = true } # watch out updating this, protoc compiler needs to be installed on all build systems # arm builds + musl are very problematic @@ -49,9 +48,9 @@ rand = { version = "~0.8.5", features = ["small_rng"] } rmp-serde = "1.1.1" rayon = "1.8.0" self_encryption = "~0.29.0" -serde = { version = "1.0.133", features = [ "derive", "rc" ]} -sn_build_info = { path="../sn_build_info", version = "0.1.5" } -sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.2.8" } +serde = { version = "1.0.133", features = ["derive", "rc"] } +sn_build_info = { path = "../sn_build_info", version = "0.1.5" } +sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.2.8" } sn_client = { path = "../sn_client", version = "0.104.31" } sn_logging = { path = "../sn_logging", version = "0.2.24" } sn_networking = { path = "../sn_networking", version = "0.13.35" } @@ -60,7 +59,15 @@ sn_registers = { path = "../sn_registers", version = "0.3.10" } sn_transfers = { path = "../sn_transfers", version = "0.16.5" } sn_service_management = { path = "../sn_service_management", version = "0.1.2" } thiserror = "1.0.23" -tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time", "signal"] } +tokio = { version = "1.32.0", features = [ + "io-util", + "macros", + "parking_lot", + "rt", + "sync", + "time", + "signal", +] } tokio-stream = { version = "~0.1.12" } tracing = { version = "~0.1.26" } tracing-appender = "~0.2.0" @@ -74,9 +81,13 @@ color-eyre = "0.6.2" [dev-dependencies] assert_matches = "1.5.0" -reqwest = { version="0.11.18", default-features=false, features = ["rustls"] } +reqwest = { version = "0.11.18", default-features = false, features = [ + "rustls", +] } serde_json = "1.0" -sn_protocol = { path = "../sn_protocol", version = "0.15.5", features = ["rpc"]} +sn_protocol = { path = "../sn_protocol", version = "0.15.5", features = [ + "rpc", +] } tempfile = "3.6.0" # Do not specify the version field. Release process expects even the local dev deps to be published. # Removing the version field is a workaround. diff --git a/sn_node/examples/register_inspect.rs b/sn_node/examples/register_inspect.rs index 8a68120760..b404510625 100644 --- a/sn_node/examples/register_inspect.rs +++ b/sn_node/examples/register_inspect.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { let signer = SecretKey::random(); println!("Starting SAFE client..."); - let client = Client::new(signer, None, false, None, None).await?; + let client = Client::new(signer, None, None, None).await?; println!("SAFE client signer public key: {:?}", client.signer_pk()); // The address of the register to be displayed diff --git a/sn_node/examples/registers.rs b/sn_node/examples/registers.rs index 5e9d8e2423..cb7b988ae1 100644 --- a/sn_node/examples/registers.rs +++ b/sn_node/examples/registers.rs @@ -52,7 +52,7 @@ async fn main() -> Result<()> { let signer = SecretKey::random(); println!("Starting SAFE client..."); - let client = Client::new(signer, None, false, None, None).await?; + let client = Client::new(signer, None, None, None).await?; println!("SAFE client signer public key: {:?}", client.signer_pk()); // We'll retrieve (or create if not found) a Register, and write on it diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index 6884ff580b..6943221741 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -6,8 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use bls::{PublicKey, PK_SIZE}; -use bytes::Bytes; use eyre::{ErrReport, Result}; use sn_logging::ReloadHandle; use sn_node::RunningNode; @@ -15,13 +13,10 @@ use sn_protocol::node_rpc::NodeCtrl; use sn_protocol::safenode_proto::{ k_buckets_response, safe_node_server::{SafeNode, SafeNodeServer}, - GossipsubPublishRequest, GossipsubPublishResponse, GossipsubSubscribeRequest, - GossipsubSubscribeResponse, GossipsubUnsubscribeRequest, GossipsubUnsubscribeResponse, KBucketsRequest, KBucketsResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent, NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest, RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse, - TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateLogLevelRequest, - UpdateLogLevelResponse, UpdateRequest, UpdateResponse, + UpdateLogLevelRequest, UpdateLogLevelResponse, UpdateRequest, UpdateResponse, }; use std::{ collections::HashMap, @@ -147,37 +142,6 @@ impl SafeNode for SafeNodeRpcService { Ok(Response::new(ReceiverStream::new(client_rx))) } - async fn transfer_notifs_filter( - &self, - request: Request, - ) -> Result, Status> { - debug!( - "RPC request received at {}: {:?}", - self.addr, - request.get_ref() - ); - - let mut pk_bytes = [0u8; PK_SIZE]; - pk_bytes.copy_from_slice(&request.get_ref().pk); - let pk = match PublicKey::from_bytes(pk_bytes) { - Ok(pk) => pk, - Err(err) => { - return Err(Status::new( - Code::Internal, - format!("Failed to decode provided pk: {err}"), - )) - } - }; - - match self.running_node.transfer_notifs_filter(Some(pk)) { - Ok(()) => Ok(Response::new(TransferNotifsFilterResponse {})), - Err(err) => Err(Status::new( - Code::Internal, - format!("Failed to set transfer notifs filter with {pk:?}: {err}"), - )), - } - } - async fn record_addresses( &self, request: Request, @@ -226,59 +190,6 @@ impl SafeNode for SafeNodeRpcService { Ok(Response::new(KBucketsResponse { kbuckets })) } - async fn subscribe_to_topic( - &self, - request: Request, - ) -> Result, Status> { - debug!( - "RPC request received at {}: {:?}", - self.addr, - request.get_ref() - ); - - let topic = &request.get_ref().topic; - - // Assuming the rpc subscription request also force the node to handle the gossip. - // So far, this is only used during test to allow counting the gossip msgs received by node. - self.running_node.start_handle_gossip(); - self.running_node.subscribe_to_topic(topic.clone()); - Ok(Response::new(GossipsubSubscribeResponse {})) - } - - async fn unsubscribe_from_topic( - &self, - request: Request, - ) -> Result, Status> { - debug!( - "RPC request received at {}: {:?}", - self.addr, - request.get_ref() - ); - - let topic = &request.get_ref().topic; - - self.running_node.unsubscribe_from_topic(topic.clone()); - Ok(Response::new(GossipsubUnsubscribeResponse {})) - } - - async fn publish_on_topic( - &self, - request: Request, - ) -> Result, Status> { - debug!( - "RPC request received at {}: {:?}", - self.addr, - request.get_ref() - ); - - let topic = &request.get_ref().topic; - // Convert the message from Vec to Bytes - let msg = Bytes::from(request.get_ref().msg.clone()); - - self.running_node.publish_on_topic(topic.clone(), msg); - Ok(Response::new(GossipsubPublishResponse {})) - } - async fn stop(&self, request: Request) -> Result, Status> { debug!( "RPC request received at {}: {:?}", diff --git a/sn_node/src/event.rs b/sn_node/src/event.rs index fcbb2f2b77..91d9651ac2 100644 --- a/sn_node/src/event.rs +++ b/sn_node/src/event.rs @@ -8,7 +8,7 @@ use crate::error::{Error, Result}; use bls::PublicKey; -use bytes::Bytes; + use serde::{Deserialize, Serialize}; use sn_protocol::storage::{ChunkAddress, RegisterAddress}; use sn_transfers::{CashNoteRedemption, UniquePubkey}; @@ -67,14 +67,6 @@ pub enum NodeEvent { SpendStored(UniquePubkey), /// One of the sub event channel closed and unrecoverable. ChannelClosed, - /// Gossipsub message received - GossipsubMsg { - /// Topic the message was published on - topic: String, - /// The raw bytes of the received message - #[debug(skip)] - msg: Bytes, - }, /// Transfer notification message received for a public key TransferNotif { /// Public key the transfer notification is about diff --git a/sn_node/src/lib.rs b/sn_node/src/lib.rs index 1e28dfe57e..0b88febe99 100644 --- a/sn_node/src/lib.rs +++ b/sn_node/src/lib.rs @@ -40,14 +40,11 @@ mod replication; pub use self::{ event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver}, log_markers::Marker, - node::{ - NodeBuilder, NodeCmd, PERIODIC_REPLICATION_INTERVAL_MAX_S, ROYALTY_TRANSFER_NOTIF_TOPIC, - }, + node::{NodeBuilder, NodeCmd, PERIODIC_REPLICATION_INTERVAL_MAX_S}, }; use crate::error::{Error, Result}; -use bls::PublicKey; -use bytes::Bytes; + use libp2p::PeerId; use sn_networking::{Network, SwarmLocalState}; use sn_protocol::{get_port_from_multiaddr, NetworkAddress}; @@ -64,6 +61,7 @@ use tokio::sync::broadcast; pub struct RunningNode { network: Network, node_events_channel: NodeEventsChannel, + #[allow(dead_code)] node_cmds: broadcast::Sender, } @@ -132,35 +130,4 @@ impl RunningNode { let kbuckets = self.network.get_kbuckets().await?; Ok(kbuckets) } - - /// Subscribe to given gossipsub topic - pub fn subscribe_to_topic(&self, topic_id: String) { - self.network.subscribe_to_topic(topic_id); - } - - /// Starts handling gossipsub topics - pub fn start_handle_gossip(&self) { - self.network.start_handle_gossip(); - } - - /// Unsubscribe from given gossipsub topic - pub fn unsubscribe_from_topic(&self, topic_id: String) { - self.network.unsubscribe_from_topic(topic_id); - } - - /// Publish a message on a given gossipsub topic - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { - self.network.publish_on_topic(topic_id, msg); - } - - /// Set a PublicKey to start decoding and accepting Transfer notifications received over gossipsub. - /// All Transfer notifications are dropped/discarded if no public key is set. - /// All Transfer notifications received for a key which don't match the set public key is also discarded. - pub fn transfer_notifs_filter(&self, filter: Option) -> Result<()> { - let _ = self - .node_cmds - .send(NodeCmd::TransferNotifsFilter(filter)) - .map_err(|err| Error::NodeCmdFailed(err.to_string()))?; - Ok(()) - } } diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bd17fc2bcc..4b4a6d7060 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -14,7 +14,7 @@ use super::{ #[cfg(feature = "open-metrics")] use crate::metrics::NodeMetrics; use crate::RunningNode; -use bls::{PublicKey, PK_SIZE}; + use bytes::Bytes; use libp2p::{identity::Keypair, Multiaddr, PeerId}; #[cfg(feature = "open-metrics")] @@ -28,7 +28,7 @@ use sn_protocol::{ messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response}, NetworkAddress, PrettyPrintRecordKey, }; -use sn_transfers::{CashNoteRedemption, HotWallet, MainPubkey, MainSecretKey, NanoTokens}; +use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens}; use std::{ net::SocketAddr, path::PathBuf, @@ -43,16 +43,6 @@ use tokio::{ task::{spawn, JoinHandle}, }; -/// Expected topic name where notifications of royalty transfers are sent on. -/// The notification msg is expected to contain the serialised public key, followed by the -/// serialised transfer info encrypted against the referenced public key. -pub const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION"; - -/// Defines the percentage (ie 1/FORWARDER_CHOOSING_FACTOR th of all nodes) of nodes -/// which will act as royalty_transfer_notify forwarder. -#[cfg(feature = "royalties-by-gossip")] -const FORWARDER_CHOOSING_FACTOR: usize = 10; - /// Interval to trigger replication of all records to all peers. /// This is the max time it should take. Minimum interval at any ndoe will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 45; @@ -130,7 +120,6 @@ impl NodeBuilder { let mut network_builder = NetworkBuilder::new(self.keypair, self.local, self.root_dir); - network_builder.enable_gossip(); network_builder.listen_addr(self.addr); #[cfg(feature = "open-metrics")] network_builder.metrics_registry(metrics_registry); @@ -147,7 +136,6 @@ impl NodeBuilder { node_cmds: node_cmds.clone(), initial_peers: Arc::new(self.initial_peers), reward_address: Arc::new(reward_address), - transfer_notifs_filter: None, #[cfg(feature = "open-metrics")] node_metrics, }; @@ -160,31 +148,13 @@ impl NodeBuilder { // Run the node node.run(swarm_driver, network_event_receiver); - // Feature guard ROYALTY_TRANSFER_NOTIF_TOPIC forwarder subscription - #[cfg(feature = "royalties-by-gossip")] - { - // Having a portion of nodes (1/50) subscribe to the ROYALTY_TRANSFER_NOTIF_TOPIC - // Such nodes become `forwarder` to ensure the actual beneficary won't miss. - let index: usize = StdRng::from_entropy().gen_range(0..FORWARDER_CHOOSING_FACTOR); - if index == FORWARDER_CHOOSING_FACTOR / 2 { - info!("Picked as a forwarding node to subscribe to the {ROYALTY_TRANSFER_NOTIF_TOPIC} topic"); - // Forwarder only needs to forward topic msgs on libp2p level, - // i.e. no need to handle topic msgs, hence not a `listener`. - running_node.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); - info!("Node has been subscribed to gossipsub topic '{ROYALTY_TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."); - } - } - Ok(running_node) } } /// Commands that can be sent by the user to the Node instance, e.g. to mutate some settings. -#[derive(Clone)] -pub enum NodeCmd { - /// Set a PublicKey to start decoding and accepting Transfer notifications received over gossipsub. - TransferNotifsFilter(Option), -} +#[derive(Clone, Debug)] +pub enum NodeCmd {} /// `Node` represents a single node in the distributed network. It handles /// network events, processes incoming requests, interacts with the data @@ -198,18 +168,13 @@ pub(crate) struct Node { // Peers that are dialed at startup of node. initial_peers: Arc>, reward_address: Arc, - transfer_notifs_filter: Option, #[cfg(feature = "open-metrics")] pub(crate) node_metrics: NodeMetrics, } impl Node { /// Runs the provided `SwarmDriver` and spawns a task to process for `NetworkEvents` - fn run( - mut self, - swarm_driver: SwarmDriver, - mut network_event_receiver: Receiver, - ) { + fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver) { let mut rng = StdRng::from_entropy(); let peers_connected = Arc::new(AtomicUsize::new(0)); @@ -293,9 +258,8 @@ impl Node { } node_cmd = cmds_receiver.recv() => { match node_cmd { - Ok(NodeCmd::TransferNotifsFilter(filter)) => { - self.transfer_notifs_filter = filter; - self.network.start_handle_gossip(); + Ok(cmd) => { + info!("{cmd:?} received... unhandled") } Err(err) => error!("When trying to read from the NodeCmds channel/receiver: {err:?}") } @@ -409,39 +373,7 @@ impl Node { } }); } - NetworkEvent::GossipsubMsgReceived { topic, msg } - | NetworkEvent::GossipsubMsgPublished { topic, msg } => { - event_header = "GossipsubMsg"; - trace!("Received a gossip msg for the topic of {topic}"); - let events_channel = self.events_channel.clone(); - if events_channel.receiver_count() == 0 { - trace!( - "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}", - start.elapsed() - ); - return; - } - if topic == ROYALTY_TRANSFER_NOTIF_TOPIC { - // this is expected to be a notification of a transfer which we treat specially, - // and we try to decode it only if it's referring to a PK the user is interested in - if let Some(filter_pk) = self.transfer_notifs_filter { - let _handle = spawn(async move { - match try_decode_transfer_notif(&msg, filter_pk) { - Ok(Some(notif_event)) => events_channel.broadcast(notif_event), - Ok(None) => { /* transfer notif filered out */ } - Err(err) => { - warn!("GossipsubMsg matching the transfer notif. topic name, couldn't be decoded as such: {err:?}"); - events_channel - .broadcast(NodeEvent::GossipsubMsg { topic, msg }); - } - } - }); - } - } else { - events_channel.broadcast(NodeEvent::GossipsubMsg { topic, msg }); - } - } NetworkEvent::TerminateNode => { event_header = "TerminateNode"; error!("Received termination from swarm_driver due to too many HDD write errors."); @@ -675,21 +607,3 @@ impl Node { } } } - -fn try_decode_transfer_notif(msg: &[u8], filter: PublicKey) -> eyre::Result> { - let mut key_bytes = [0u8; PK_SIZE]; - key_bytes.copy_from_slice( - msg.get(0..PK_SIZE) - .ok_or_else(|| eyre::eyre!("msg doesn't have enough bytes"))?, - ); - let key = PublicKey::from_bytes(key_bytes)?; - if key == filter { - let cashnote_redemptions: Vec = rmp_serde::from_slice(&msg[PK_SIZE..])?; - Ok(Some(NodeEvent::TransferNotif { - key, - cashnote_redemptions, - })) - } else { - Ok(None) - } -} diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 6bf33de5ad..e66c433b55 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -6,14 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -#[cfg(feature = "royalties-by-gossip")] -use crate::node::ROYALTY_TRANSFER_NOTIF_TOPIC; use crate::{node::Node, Error, Marker, Result}; -#[cfg(feature = "royalties-by-gossip")] -use bytes::{BufMut, BytesMut}; use libp2p::kad::{Record, RecordKey}; -#[cfg(feature = "royalties-by-gossip")] -use serde::Serialize; use sn_networking::{get_raw_signed_spends_from_record, GetRecordError, NetworkError}; use sn_protocol::{ messages::CmdOk, @@ -518,28 +512,6 @@ impl Node { return Err(Error::NoNetworkRoyaltiesPayment(pretty_key.into_owned())); } - // Feature guard network_royalty payment publish - #[cfg(feature = "royalties-by-gossip")] - { - // publish a notification over gossipsub topic ROYALTY_TRANSFER_NOTIF_TOPIC - // for the network royalties payment. - let royalties_pk = *NETWORK_ROYALTIES_PK; - trace!("Publishing a royalties transfer notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}"); - let royalties_pk_bytes = royalties_pk.to_bytes(); - - let mut msg = BytesMut::with_capacity(royalties_pk_bytes.len()); - msg.extend_from_slice(&royalties_pk_bytes); - let mut msg = msg.writer(); - let mut serialiser = rmp_serde::Serializer::new(&mut msg); - match royalties_cash_notes_r.serialize(&mut serialiser) { - Ok(()) => { - let msg = msg.into_inner().freeze(); - self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg); - } - Err(err) => warn!("Failed to serialise network royalties payment data to publish a notification over gossipsub for record {pretty_key}: {err:?}"), - } - } - // check if the quote is valid let storecost = payment.quote.cost; self.verify_quote_for_storecost(payment.quote, address)?; diff --git a/sn_node/tests/common/client.rs b/sn_node/tests/common/client.rs index 64744f3fed..9ee29d6cb3 100644 --- a/sn_node/tests/common/client.rs +++ b/sn_node/tests/common/client.rs @@ -108,21 +108,11 @@ pub fn get_all_rpc_addresses(skip_genesis_for_droplet: bool) -> Result Client { - match DeploymentInventory::load() { - Ok(inventory) => Droplet::get_gossip_client(&inventory).await, - Err(_) => NonDroplet::get_gossip_client().await, - } -} - /// Adds funds to the provided to_wallet_dir /// If SN_INVENTORY flag is passed, the amount is retrieved from the faucet url /// else obtain it from the provided faucet HotWallet /// -/// We obtain 100 SNT from the network per call. Use `get_gossip_client_and_wallet` during the initial setup which would +/// We obtain 100 SNT from the network per call. Use `get_client_and_wallet` during the initial setup which would /// obtain 10*100 SNT pub async fn add_funds_to_wallet(client: &Client, to_wallet_dir: &Path) -> Result { match DeploymentInventory::load() { @@ -139,17 +129,17 @@ pub async fn add_funds_to_wallet(client: &Client, to_wallet_dir: &Path) -> Resul /// /// We get a maximum of 10*100 SNT from the network. This is hardcoded as the Droplet tests have the fetch the /// coins from the faucet and each request is limited to 100 SNT. -pub async fn get_gossip_client_and_funded_wallet(root_dir: &Path) -> Result<(Client, HotWallet)> { +pub async fn get_client_and_funded_wallet(root_dir: &Path) -> Result<(Client, HotWallet)> { match DeploymentInventory::load() { Ok(inventory) => { - let client = Droplet::get_gossip_client(&inventory).await; + let client = Droplet::get_client(&inventory).await; let local_wallet = Droplet::get_funded_wallet(&client, root_dir, inventory.faucet_address, true) .await?; Ok((client, local_wallet)) } Err(_) => { - let client = NonDroplet::get_gossip_client().await; + let client = NonDroplet::get_client().await; let local_wallet = NonDroplet::get_funded_wallet(&client, root_dir, true).await?; Ok((client, local_wallet)) @@ -160,7 +150,7 @@ pub async fn get_gossip_client_and_funded_wallet(root_dir: &Path) -> Result<(Cli pub struct NonDroplet; impl NonDroplet { /// Get a new Client for testing - pub async fn get_gossip_client() -> Client { + pub async fn get_client() -> Client { let secret_key = bls::SecretKey::random(); let bootstrap_peers = if !cfg!(feature = "local-discovery") { @@ -177,7 +167,7 @@ impl NonDroplet { println!("Client bootstrap with peer {bootstrap_peers:?}"); info!("Client bootstrap with peer {bootstrap_peers:?}"); - Client::new(secret_key, bootstrap_peers, true, None, None) + Client::new(secret_key, bootstrap_peers, None, None) .await .expect("Client shall be successfully created.") } @@ -280,7 +270,7 @@ impl NonDroplet { pub struct Droplet; impl Droplet { /// Create a new client and bootstrap from the provided safe_peers - pub async fn get_gossip_client(inventory: &DeploymentInventory) -> Client { + pub async fn get_client(inventory: &DeploymentInventory) -> Client { let secret_key = bls::SecretKey::random(); let mut bootstrap_peers = Vec::new(); @@ -300,7 +290,7 @@ impl Droplet { println!("Client bootstrap with peer {bootstrap_peers:?}"); info!("Client bootstrap with peer {bootstrap_peers:?}"); - Client::new(secret_key, Some(bootstrap_peers), true, None, None) + Client::new(secret_key, Some(bootstrap_peers), None, None) .await .expect("Client shall be successfully created.") } diff --git a/sn_node/tests/data_with_churn.rs b/sn_node/tests/data_with_churn.rs index 969d3af88a..b9b445b8d1 100644 --- a/sn_node/tests/data_with_churn.rs +++ b/sn_node/tests/data_with_churn.rs @@ -8,7 +8,7 @@ mod common; -use crate::common::client::{add_funds_to_wallet, get_gossip_client_and_funded_wallet}; +use crate::common::client::{add_funds_to_wallet, get_client_and_funded_wallet}; use assert_fs::TempDir; use common::{ client::{get_node_count, get_wallet}, @@ -124,8 +124,7 @@ async fn data_availability_during_churn() -> Result<()> { info!("Creating a client and paying wallet..."); let paying_wallet_dir = TempDir::new()?; - let (client, _paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, _paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; // Waiting for the paying_wallet funded. sleep(std::time::Duration::from_secs(10)).await; diff --git a/sn_node/tests/double_spend.rs b/sn_node/tests/double_spend.rs index c5242017f8..d861e2b5d8 100644 --- a/sn_node/tests/double_spend.rs +++ b/sn_node/tests/double_spend.rs @@ -10,7 +10,7 @@ mod common; use assert_fs::TempDir; use assert_matches::assert_matches; -use common::client::{get_gossip_client_and_funded_wallet, get_wallet}; +use common::client::{get_client_and_funded_wallet, get_wallet}; use eyre::Result; use sn_logging::LogBuilder; use sn_transfers::{ @@ -27,8 +27,7 @@ async fn cash_note_transfer_double_spend_fail() -> Result<()> { // create 1 wallet add money from faucet let first_wallet_dir = TempDir::new()?; - let (client, mut first_wallet) = - get_gossip_client_and_funded_wallet(first_wallet_dir.path()).await?; + let (client, mut first_wallet) = get_client_and_funded_wallet(first_wallet_dir.path()).await?; let first_wallet_balance = first_wallet.balance().as_nano(); // create wallet 2 and 3 to receive money from 1 @@ -91,8 +90,7 @@ async fn genesis_double_spend_fail() -> Result<()> { // create a client and an unused wallet to make sure some money already exists in the system let first_wallet_dir = TempDir::new()?; - let (client, mut first_wallet) = - get_gossip_client_and_funded_wallet(first_wallet_dir.path()).await?; + let (client, mut first_wallet) = get_client_and_funded_wallet(first_wallet_dir.path()).await?; let first_wallet_addr = first_wallet.address(); // create a new genesis wallet with the intention to spend genesis again diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs deleted file mode 100644 index fb4cb6d494..0000000000 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -mod common; - -use crate::common::{ - client::{get_all_rpc_addresses, get_node_count}, - get_safenode_rpc_client, -}; -use eyre::Result; -use rand::seq::SliceRandom; -use sn_logging::LogBuilder; -use sn_node::NodeEvent; -use sn_protocol::safenode_proto::{ - GossipsubPublishRequest, GossipsubSubscribeRequest, GossipsubUnsubscribeRequest, - NodeEventsRequest, -}; -use std::{net::SocketAddr, time::Duration}; -use tokio::time::timeout; -use tokio_stream::StreamExt; -use tonic::Request; -use tracing::{error, info}; - -const TEST_CYCLES: u8 = 20; - -#[tokio::test] -async fn msgs_over_gossipsub() -> Result<()> { - let _guard = LogBuilder::init_single_threaded_tokio_test("msgs_over_gossipsub"); - - let node_count = get_node_count(); - let nodes_subscribed = node_count / 2; // 12 out of 25 nodes will be subscribers - - let node_rpc_addresses = get_all_rpc_addresses(false)? - .into_iter() - .enumerate() - .collect::>(); - - for c in 0..TEST_CYCLES { - let topic = format!("TestTopic-{}", rand::random::()); - println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); - println!("============================================================"); - info!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); - info!("============================================================"); - - // get a random subset of `nodes_subscribed`` out of `node_count` nodes to subscribe to the topic - let mut rng = rand::thread_rng(); - let random_subs_nodes: Vec<_> = node_rpc_addresses - .choose_multiple(&mut rng, nodes_subscribed) - .cloned() - .collect(); - - let mut subs_handles = vec![]; - for (node_index, rpc_addr) in random_subs_nodes.clone() { - // request current node to subscribe to the topic - println!("Node #{node_index} ({rpc_addr}) subscribing to {topic} ..."); - info!("Node #{node_index} ({rpc_addr}) subscribing to {topic} ..."); - node_subscribe_to_topic(rpc_addr, topic.clone()).await?; - - let handle = tokio::spawn(async move { - let mut rpc_client = get_safenode_rpc_client(rpc_addr).await?; - let response = rpc_client - .node_events(Request::new(NodeEventsRequest {})) - .await?; - - let mut count: usize = 0; - - let _ = timeout(Duration::from_secs(40), async { - let mut stream = response.into_inner(); - while let Some(Ok(e)) = stream.next().await { - match NodeEvent::from_bytes(&e.event) { - Ok(NodeEvent::GossipsubMsg { topic, msg }) => { - println!( - "Msg received on node #{node_index} '{topic}': {}", - String::from_utf8(msg.to_vec()).unwrap() - ); - info!( - "Msg received on node #{node_index} '{topic}': {}", - String::from_utf8(msg.to_vec()).unwrap() - ); - count += 1; - } - Ok(_) => { /* ignored */ } - Err(_) => { - error!("Error while parsing received NodeEvent"); - println!("Error while parsing received NodeEvent"); - } - } - } - }) - .await; - - Ok::(count) - }); - - subs_handles.push((node_index, rpc_addr, handle)); - } - - tokio::time::sleep(Duration::from_secs(3)).await; - - // have all other nodes to publish each a different msg to that same topic - let mut other_nodes = node_rpc_addresses.clone(); - other_nodes - .retain(|(node_index, _)| random_subs_nodes.iter().all(|(n, _)| n != node_index)); - other_nodes_to_publish_on_topic(other_nodes, topic.clone()).await?; - - for (node_index, addr, handle) in subs_handles.into_iter() { - let count = handle.await??; - println!("Messages received by node {node_index}: {count}"); - info!("Messages received by node {node_index}: {count}"); - assert_eq!( - count, - node_count - nodes_subscribed, - "Not enough messages received by node at index {node_index}" - ); - node_unsubscribe_from_topic(addr, topic.clone()).await?; - } - } - - Ok(()) -} - -async fn node_subscribe_to_topic(addr: SocketAddr, topic: String) -> Result<()> { - let mut rpc_client = get_safenode_rpc_client(addr).await?; - - // subscribe to given topic - let _response = rpc_client - .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { topic })) - .await?; - - Ok(()) -} - -async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result<()> { - let mut rpc_client = get_safenode_rpc_client(addr).await?; - - // unsubscribe from given topic - let _response = rpc_client - .unsubscribe_from_topic(Request::new(GossipsubUnsubscribeRequest { topic })) - .await?; - - Ok(()) -} - -async fn other_nodes_to_publish_on_topic( - nodes: Vec<(usize, SocketAddr)>, - topic: String, -) -> Result<()> { - for (node_index, addr) in nodes { - let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); - - let mut rpc_client = get_safenode_rpc_client(addr).await?; - println!("Node {node_index} to publish on {topic} message: {msg}"); - info!("Node {node_index} to publish on {topic} message: {msg}"); - - let _response = rpc_client - .publish_on_topic(Request::new(GossipsubPublishRequest { - topic: topic.clone(), - msg: msg.into(), - })) - .await?; - } - - Ok(()) -} diff --git a/sn_node/tests/nodes_rewards.rs b/sn_node/tests/nodes_rewards.rs deleted file mode 100644 index f787f2faf8..0000000000 --- a/sn_node/tests/nodes_rewards.rs +++ /dev/null @@ -1,444 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -mod common; - -#[cfg(all(test, feature = "royalties-by-gossip"))] -mod tests { - use crate::common::{ - client::{get_all_rpc_addresses, get_gossip_client_and_funded_wallet}, - get_safenode_rpc_client, random_content, - }; - use assert_fs::TempDir; - use bls::{PublicKey, SecretKey, PK_SIZE}; - use eyre::{eyre, Result}; - use sn_client::{Client, ClientEvent, FilesUpload, WalletClient}; - use sn_logging::LogBuilder; - use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC}; - use sn_protocol::safenode_proto::{ - GossipsubSubscribeRequest, NodeEventsRequest, NodeInfoRequest, TransferNotifsFilterRequest, - }; - use sn_registers::Permissions; - use sn_transfers::{ - CashNoteRedemption, HotWallet, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK, - }; - use std::net::SocketAddr; - use tokio::{ - task::JoinHandle, - time::{sleep, timeout, Duration}, - }; - use tokio_stream::StreamExt; - use tonic::Request; - use tracing::{debug, error, info}; - use xor_name::XorName; - - #[tokio::test] - async fn nodes_rewards_for_storing_chunks() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("nodes_rewards"); - - let paying_wallet_dir = TempDir::new()?; - let chunks_dir = TempDir::new()?; - - let (client, _paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; - - let (files_api, _content_bytes, _content_addr, chunks) = - random_content(&client, paying_wallet_dir.to_path_buf(), chunks_dir.path())?; - - let chunks_len = chunks.len(); - let prev_rewards_balance = current_rewards_balance().await?; - info!("With {prev_rewards_balance:?} current balance, paying for {} random addresses... {chunks:?}", chunks.len()); - - let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); - files_upload.upload_chunks(chunks).await?; - let storage_cost = files_upload.get_upload_storage_cost(); - - info!("Paid {storage_cost:?} total rewards for the chunks"); - - verify_rewards(prev_rewards_balance, storage_cost, chunks_len).await?; - - Ok(()) - } - - #[tokio::test] - async fn nodes_rewards_for_storing_registers() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("nodes_rewards"); - - let paying_wallet_dir = TempDir::new()?; - - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; - let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); - - let rb = current_rewards_balance().await?; - - let mut rng = rand::thread_rng(); - let register_addr = XorName::random(&mut rng); - - info!("Paying for random Register address {register_addr:?} with current balance {rb:?}"); - - let prev_rewards_balance = current_rewards_balance().await?; - - let (_register, storage_cost, _royalties_fees) = client - .create_and_pay_for_register( - register_addr, - &mut wallet_client, - false, - Permissions::default(), - ) - .await?; - info!("Cost is {storage_cost:?}: {prev_rewards_balance:?}"); - - verify_rewards(prev_rewards_balance, storage_cost, 1).await?; - - Ok(()) - } - - #[tokio::test] - async fn nodes_rewards_for_chunks_notifs_over_gossipsub() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("nodes_rewards"); - - let paying_wallet_dir = TempDir::new()?; - let chunks_dir = TempDir::new()?; - - let (client, _paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; - - let (files_api, _content_bytes, _content_addr, chunks) = - random_content(&client, paying_wallet_dir.to_path_buf(), chunks_dir.path())?; - - let num_of_chunks = chunks.len(); - let handle = spawn_royalties_payment_client_listener(client.clone(), num_of_chunks).await?; - - let num_of_chunks = chunks.len(); - - info!("Paying for {num_of_chunks} random addresses..."); - let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); - files_upload.upload_chunks(chunks).await?; - let storage_cost = files_upload.get_upload_storage_cost(); - let royalties_fees = files_upload.get_upload_royalty_fees(); - - info!("Random chunks stored, paid {storage_cost}/{royalties_fees}"); - - let (count, amount) = handle.await??; - - info!("Number of notifications received: {count}"); - info!("Amount notified for royalties fees: {amount}"); - assert_eq!( - amount, royalties_fees, - "Unexpected amount of royalties fees received" - ); - assert!( - count >= num_of_chunks, - "Unexpected number of royalties fees notifications received" - ); - - Ok(()) - } - - #[tokio::test] - async fn nodes_rewards_for_register_notifs_over_gossipsub() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("nodes_rewards"); - - let paying_wallet_dir = TempDir::new()?; - - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; - let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); - - let mut rng = rand::thread_rng(); - let register_addr = XorName::random(&mut rng); - - let handle = spawn_royalties_payment_client_listener(client.clone(), 1).await?; - - info!("Paying for random Register address {register_addr:?} ..."); - let (_, storage_cost, royalties_fees) = client - .create_and_pay_for_register( - register_addr, - &mut wallet_client, - false, - Permissions::default(), - ) - .await?; - info!("Random Register created, paid {storage_cost}/{royalties_fees}"); - - let (count, amount) = handle.await??; - info!("Number of notifications received: {count}"); - info!("Amount notified for royalties fees: {amount}"); - assert_eq!( - amount, royalties_fees, - "Unexpected amount of royalties fees received" - ); - assert_eq!( - count, 1, - "Unexpected number of royalties fees notifications received" - ); - - Ok(()) - } - - #[tokio::test] - async fn nodes_rewards_transfer_notifs_filter() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("nodes_rewards"); - - let paying_wallet_dir = TempDir::new()?; - let chunks_dir = TempDir::new()?; - - let (client, _paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; - - let (files_api, _content_bytes, _content_addr, chunks) = - random_content(&client, paying_wallet_dir.to_path_buf(), chunks_dir.path())?; - let node_rpc_addresses = get_all_rpc_addresses(false)?; - - // this node shall receive the notifications since we set the correct royalties pk as filter - let royalties_pk = NETWORK_ROYALTIES_PK.public_key(); - let handle_1 = spawn_royalties_payment_listener( - node_rpc_addresses[0], - royalties_pk, - true, - chunks.len(), - false, - ) - .await; - // this other node shall *not* receive any notification since we set the wrong pk as filter - let random_pk = SecretKey::random().public_key(); - let handle_2 = - spawn_royalties_payment_listener(node_rpc_addresses[1], random_pk, true, 0, false) - .await; - // this other node shall *not* receive any notification either since we don't set any pk as filter - let handle_3 = - spawn_royalties_payment_listener(node_rpc_addresses[2], royalties_pk, false, 0, true) - .await; - - let num_of_chunks = chunks.len(); - info!("Paying for {num_of_chunks} chunks"); - let mut files_upload = FilesUpload::new(files_api.clone()).set_show_holders(true); - files_upload.upload_chunks(chunks).await?; - let storage_cost = files_upload.get_upload_storage_cost(); - let royalties_fees = files_upload.get_upload_royalty_fees(); - - info!("Random chunks stored, paid {storage_cost}/{royalties_fees}"); - - let count_1 = handle_1.await??; - info!("Number of notifications received by node #1: {count_1}"); - let count_2 = handle_2.await??; - info!("Number of notifications received by node #2: {count_2}"); - let count_3 = handle_3.await??; - info!("Number of notifications received by node #3: {count_3}"); - - assert!( - count_1 >= num_of_chunks, - "expected: {num_of_chunks:?}, received {count_1:?}... Not enough notifications received" - ); - assert_eq!(count_2, 0, "Notifications were not expected"); - assert_eq!(count_3, 0, "Notifications were not expected"); - - Ok(()) - } - - async fn verify_rewards( - prev_rewards_balance: NanoTokens, - rewards_paid: NanoTokens, - put_record_count: usize, - ) -> Result<()> { - let expected_rewards_balance = prev_rewards_balance - .checked_add(rewards_paid) - .ok_or_else(|| eyre!("Failed to sum up rewards balance"))?; - - let mut iteration = 0; - let mut cur_rewards_history = Vec::new(); - - // An initial sleep to avoid access to allow for reward receipts to be processed - sleep(Duration::from_secs(20)).await; - - while iteration < put_record_count { - iteration += 1; - info!("Current iteration {iteration}"); - let new_rewards_balance = current_rewards_balance().await?; - if expected_rewards_balance == new_rewards_balance { - return Ok(()); - } - cur_rewards_history.push(new_rewards_balance); - sleep(Duration::from_secs(10)).await; - } - - Err(eyre!("Network doesn't get expected reward {expected_rewards_balance:?} after {iteration} iterations, history is {cur_rewards_history:?}")) - } - - // Helper to collect all the node wallet's balance. - async fn current_rewards_balance() -> Result { - let mut total_rewards = NanoTokens::zero(); - - for rpc_addr in get_all_rpc_addresses(false)? { - let mut rpc_client = get_safenode_rpc_client(rpc_addr).await?; - let response = rpc_client - .node_info(Request::new(NodeInfoRequest {})) - .await?; - let balance = NanoTokens::from(response.get_ref().wallet_balance); - total_rewards = total_rewards - .checked_add(balance) - .ok_or_else(|| eyre!("Failed to sum up rewards balance"))?; - } - - info!("Current total balance is {total_rewards:?}"); - - Ok(total_rewards) - } - - async fn spawn_royalties_payment_listener( - rpc_addr: SocketAddr, - royalties_pk: PublicKey, - set_filter: bool, - expected_royalties: usize, - need_extra_wait: bool, - ) -> JoinHandle> { - let handle = tokio::spawn(async move { - let mut rpc_client = get_safenode_rpc_client(rpc_addr).await?; - - if set_filter { - let _ = rpc_client - .transfer_notifs_filter(Request::new(TransferNotifsFilterRequest { - pk: royalties_pk.to_bytes().to_vec(), - })) - .await?; - } - - let _ = rpc_client - .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { - topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), - })) - .await?; - - let response = rpc_client - .node_events(Request::new(NodeEventsRequest {})) - .await?; - - let mut count = 0; - let mut stream = response.into_inner(); - - // if expected royalties is 0 or 1 we'll wait for 20s as a minimum, - // otherwise we'll wait for 10s per expected royalty - let secs = std::cmp::max(120, expected_royalties as u64 * 15); - - let duration = Duration::from_secs(secs); - info!("Awaiting transfers notifs for {duration:?}..."); - if timeout(duration, async { - while let Some(Ok(e)) = stream.next().await { - match NodeEvent::from_bytes(&e.event) { - Ok(NodeEvent::TransferNotif { key, .. }) => { - info!("Transfer notif received for key {key:?}"); - if key == royalties_pk { - count += 1; - info!("Received {count} royalty notifs so far"); - } - } - Ok(_) => { /* ignored */ } - Err(_) => { - error!("Error while parsing received NodeEvent"); - } - } - } - }) - .await - .is_err() - { - debug!("Timeout after {duration:?}."); - } - - Ok(count) - }); - - // small wait to ensure that the gossipsub subscription is in place - if need_extra_wait { - sleep(Duration::from_secs(20)).await; - } - - handle - } - - async fn spawn_royalties_payment_client_listener( - client: Client, - expected_royalties: usize, - ) -> Result>> { - let temp_dir = assert_fs::TempDir::new()?; - let sk = SecretKey::from_hex(sn_transfers::GENESIS_CASHNOTE_SK)?; - let mut wallet = HotWallet::load_from_path(&temp_dir, Some(MainSecretKey::new(sk)))?; - let royalties_pk = NETWORK_ROYALTIES_PK.public_key(); - client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); - - let mut events_receiver = client.events_channel(); - - let handle = tokio::spawn(async move { - let mut count = 0; - - // if expected royalties is 0 or 1 we'll wait for 300s as a minimum, - // otherwise we'll wait for 60s per expected royalty - let secs = std::cmp::max(300, expected_royalties as u64 * 60); - let duration = Duration::from_secs(secs); - info!("Awaiting transfers notifs for {duration:?}..."); - if timeout(duration, async { - while let Ok(event) = events_receiver.recv().await { - let cashnote_redemptions = match event { - ClientEvent::GossipsubMsg { topic, msg } => { - // we assume it's a notification of a transfer as that's the only topic we've subscribed to - match try_decode_transfer_notif(&msg) { - Ok((key, cashnote_redemptions)) => { - info!("Transfer notif received for key {key:?}"); - if key != royalties_pk { - continue; - } - count += 1; - cashnote_redemptions - } - Err(err) => { - error!("GossipsubMsg received on topic '{topic}' couldn't be decoded as transfer notif: {err:?}"); - continue; - }, - } - }, - _ => continue - }; - - match client - .verify_cash_notes_redemptions(wallet.address(), &cashnote_redemptions) - .await - { - Ok(cash_notes) => if let Err(err) = wallet.deposit(&cash_notes) { - error!("Failed to deposit cash notes: {err}"); - } - Err(err) => error!("At least one of the CashNoteRedemptions received is invalid, dropping them: {err:?}") - } - } - }) - .await - .is_err() - { - debug!("Timeout after {duration:?}."); - } - - Ok((count, wallet.balance())) - }); - - // small wait to ensure that the gossipsub subscription is in place - sleep(Duration::from_secs(20)).await; - - Ok(handle) - } - - fn try_decode_transfer_notif(msg: &[u8]) -> eyre::Result<(PublicKey, Vec)> { - let mut key_bytes = [0u8; PK_SIZE]; - key_bytes.copy_from_slice( - msg.get(0..PK_SIZE) - .ok_or_else(|| eyre::eyre!("msg doesn't have enough bytes"))?, - ); - let key = PublicKey::from_bytes(key_bytes)?; - let cashnote_redemptions: Vec = rmp_serde::from_slice(&msg[PK_SIZE..])?; - Ok((key, cashnote_redemptions)) - } -} diff --git a/sn_node/tests/sequential_transfers.rs b/sn_node/tests/sequential_transfers.rs index fb0c9037b3..2b17624db1 100644 --- a/sn_node/tests/sequential_transfers.rs +++ b/sn_node/tests/sequential_transfers.rs @@ -9,7 +9,7 @@ mod common; use assert_fs::TempDir; -use common::client::{get_gossip_client_and_funded_wallet, get_wallet}; +use common::client::{get_client_and_funded_wallet, get_wallet}; use eyre::Result; use sn_client::send; use sn_logging::LogBuilder; @@ -22,8 +22,7 @@ async fn cash_note_transfer_multiple_sequential_succeed() -> Result<()> { let first_wallet_dir = TempDir::new()?; - let (client, first_wallet) = - get_gossip_client_and_funded_wallet(first_wallet_dir.path()).await?; + let (client, first_wallet) = get_client_and_funded_wallet(first_wallet_dir.path()).await?; let first_wallet_balance = first_wallet.balance().as_nano(); let second_wallet_balance = NanoTokens::from(first_wallet_balance / 2); diff --git a/sn_node/tests/storage_payments.rs b/sn_node/tests/storage_payments.rs index f87e531adc..c9a06f2157 100644 --- a/sn_node/tests/storage_payments.rs +++ b/sn_node/tests/storage_payments.rs @@ -8,7 +8,7 @@ mod common; -use crate::common::{client::get_gossip_client_and_funded_wallet, random_content}; +use crate::common::{client::get_client_and_funded_wallet, random_content}; use assert_fs::TempDir; use eyre::{eyre, Result}; use libp2p::PeerId; @@ -34,8 +34,7 @@ async fn storage_payment_succeeds() -> Result<()> { let paying_wallet_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let balance_before = paying_wallet.balance(); let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); @@ -74,8 +73,7 @@ async fn storage_payment_fails_with_insufficient_money() -> Result<()> { let paying_wallet_dir: TempDir = TempDir::new()?; let chunks_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let (files_api, content_bytes, _random_content_addrs, chunks) = random_content(&client, paying_wallet_dir.to_path_buf(), chunks_dir.path())?; @@ -112,8 +110,7 @@ async fn storage_payment_proofs_cached_in_wallet() -> Result<()> { let paying_wallet_dir: TempDir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let wallet_original_balance = paying_wallet.balance().as_nano(); let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); @@ -184,8 +181,7 @@ async fn storage_payment_chunk_upload_succeeds() -> Result<()> { let paying_wallet_dir = TempDir::new()?; let chunks_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); let (files_api, _content_bytes, file_addr, chunks) = @@ -217,8 +213,7 @@ async fn storage_payment_chunk_upload_fails_if_no_tokens_sent() -> Result<()> { let paying_wallet_dir = TempDir::new()?; let chunks_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); let (files_api, content_bytes, content_addr, chunks) = @@ -267,8 +262,7 @@ async fn storage_payment_register_creation_succeeds() -> Result<()> { let paying_wallet_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); let mut rng = rand::thread_rng(); @@ -308,8 +302,7 @@ async fn storage_payment_register_creation_and_mutation_fails() -> Result<()> { let paying_wallet_dir = TempDir::new()?; - let (client, paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); let mut rng = rand::thread_rng(); diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index 8db388f10d..08238fe037 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -10,7 +10,7 @@ mod common; use crate::common::{ - client::{get_all_rpc_addresses, get_gossip_client_and_funded_wallet}, + client::{get_all_rpc_addresses, get_client_and_funded_wallet}, get_all_peer_ids, get_safenode_rpc_client, NodeRestart, }; use assert_fs::TempDir; @@ -108,8 +108,7 @@ async fn verify_data_location() -> Result<()> { let paying_wallet_dir = TempDir::new()?; - let (client, _paying_wallet) = - get_gossip_client_and_funded_wallet(paying_wallet_dir.path()).await?; + let (client, _paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; store_chunks(client.clone(), chunk_count, paying_wallet_dir.to_path_buf()).await?; store_registers(client, register_count, paying_wallet_dir.to_path_buf()).await?; diff --git a/sn_node_manager/src/lib.rs b/sn_node_manager/src/lib.rs index 8128affb12..d0b2b7c56b 100644 --- a/sn_node_manager/src/lib.rs +++ b/sn_node_manager/src/lib.rs @@ -431,9 +431,6 @@ mod tests { async fn node_info(&self) -> ServiceControlResult; async fn network_info(&self) -> ServiceControlResult; async fn record_addresses(&self) -> ServiceControlResult>; - async fn gossipsub_subscribe(&self, topic: &str) -> ServiceControlResult<()>; - async fn gossipsub_unsubscribe(&self, topic: &str) -> ServiceControlResult<()>; - async fn gossipsub_publish(&self, topic: &str, message: &str) -> ServiceControlResult<()>; async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> ServiceControlResult<()>; async fn node_stop(&self, delay_millis: u64) -> ServiceControlResult<()>; async fn node_update(&self, delay_millis: u64) -> ServiceControlResult<()>; diff --git a/sn_node_manager/src/local.rs b/sn_node_manager/src/local.rs index 5c79e02b77..c9151d5ece 100644 --- a/sn_node_manager/src/local.rs +++ b/sn_node_manager/src/local.rs @@ -401,9 +401,6 @@ mod tests { async fn node_info(&self) -> RpcResult; async fn network_info(&self) -> RpcResult; async fn record_addresses(&self) -> RpcResult>; - async fn gossipsub_subscribe(&self, topic: &str) -> RpcResult<()>; - async fn gossipsub_unsubscribe(&self, topic: &str) -> RpcResult<()>; - async fn gossipsub_publish(&self, topic: &str, message: &str) -> RpcResult<()>; async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>; async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>; async fn node_update(&self, delay_millis: u64) -> RpcResult<()>; diff --git a/sn_node_rpc_client/README.md b/sn_node_rpc_client/README.md index ea6ab307b6..5cedf7c74d 100644 --- a/sn_node_rpc_client/README.md +++ b/sn_node_rpc_client/README.md @@ -12,9 +12,6 @@ Run `cargo run -- ` to connect to a node. Provide the address of - `netinfo`: Retrieve information about the node's connections to the network - `events`: Start listening for node events - `transfers`: Start listening for transfers events -- `subscribe`: Subscribe to a given Gossipsub topic -- `unsubscribe`: Unsubscribe from a given Gossipsub topic -- `publish`: Publish a msg on a given Gossipsub topic - `restart`: Restart the node after the specified delay - `stop`: Stop the node after the specified delay - `update`: Update to latest `safenode` released version, and restart it diff --git a/sn_node_rpc_client/src/main.rs b/sn_node_rpc_client/src/main.rs index ff16fb153f..fa835f43a9 100644 --- a/sn_node_rpc_client/src/main.rs +++ b/sn_node_rpc_client/src/main.rs @@ -14,12 +14,9 @@ use color_eyre::eyre::{eyre, Result}; use libp2p::Multiaddr; use sn_client::Client; use sn_logging::LogBuilder; -use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC}; +use sn_node::NodeEvent; use sn_peers_acquisition::{get_peers_from_args, PeersArgs}; -use sn_protocol::safenode_proto::{ - safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest, - TransferNotifsFilterRequest, -}; +use sn_protocol::safenode_proto::{safe_node_client::SafeNodeClient, NodeEventsRequest}; use sn_protocol::storage::SpendAddress; use sn_service_management::rpc::{RpcActions, RpcClient}; use sn_transfers::{MainPubkey, WatchOnlyWallet}; @@ -67,26 +64,6 @@ enum Cmd { #[command(flatten)] peers: PeersArgs, }, - /// Subscribe to a given Gossipsub topic - #[clap(name = "subscribe")] - Subscribe { - /// Name of the topic - topic: String, - }, - /// Unsubscribe from a given Gossipsub topic - #[clap(name = "unsubscribe")] - Unsubscribe { - /// Name of the topic - topic: String, - }, - /// Publish a msg on a given Gossipsub topic - #[clap(name = "publish")] - Publish { - /// Name of the topic - topic: String, - /// Message to publish - msg: String, - }, /// Restart the node after the specified delay #[clap(name = "restart")] Restart { @@ -154,9 +131,6 @@ async fn main() -> Result<()> { transfers_events(addr, sk, log_cash_notes, bootstrap_peers).await } - Cmd::Subscribe { topic } => gossipsub_subscribe(addr, topic).await, - Cmd::Unsubscribe { topic } => gossipsub_unsubscribe(addr, topic).await, - Cmd::Publish { topic, msg } => gossipsub_publish(addr, topic, msg).await, Cmd::Restart { delay_millis, retain_peer_id, @@ -235,8 +209,7 @@ pub async fn transfers_events( ) -> Result<()> { let (client, mut wallet) = match MainPubkey::from_hex(&sk) { Ok(main_pubkey) => { - let client = - Client::new(SecretKey::random(), bootstrap_peers, true, None, None).await?; + let client = Client::new(SecretKey::random(), bootstrap_peers, None, None).await?; let wallet_dir = TempDir::new()?; let wallet = WatchOnlyWallet::load_from(&wallet_dir, main_pubkey)?; (client, wallet) @@ -247,17 +220,6 @@ pub async fn transfers_events( let mut node_client = SafeNodeClient::connect(endpoint).await?; let main_pk = wallet.address(); let pk = main_pk.public_key(); - let _ = node_client - .transfer_notifs_filter(Request::new(TransferNotifsFilterRequest { - pk: pk.to_bytes().to_vec(), - })) - .await?; - - let _ = node_client - .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { - topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), - })) - .await?; let response = node_client .node_events(Request::new(NodeEventsRequest {})) @@ -349,30 +311,6 @@ pub async fn record_addresses(addr: SocketAddr) -> Result<()> { Ok(()) } -pub async fn gossipsub_subscribe(addr: SocketAddr, topic: String) -> Result<()> { - let endpoint = format!("https://{addr}"); - let client = RpcClient::new(&endpoint); - client.gossipsub_subscribe(&topic).await?; - println!("Node successfully received the request to subscribe to topic '{topic}'"); - Ok(()) -} - -pub async fn gossipsub_unsubscribe(addr: SocketAddr, topic: String) -> Result<()> { - let endpoint = format!("https://{addr}"); - let client = RpcClient::new(&endpoint); - client.gossipsub_unsubscribe(&topic).await?; - println!("Node successfully received the request to unsubscribe from topic '{topic}'"); - Ok(()) -} - -pub async fn gossipsub_publish(addr: SocketAddr, topic: String, msg: String) -> Result<()> { - let endpoint = format!("https://{addr}"); - let client = RpcClient::new(&endpoint); - client.gossipsub_publish(&topic, &msg).await?; - println!("Node successfully received the request to publish on topic '{topic}'"); - Ok(()) -} - pub async fn node_restart(addr: SocketAddr, delay_millis: u64, retain_peer_id: bool) -> Result<()> { let endpoint = format!("https://{addr}"); let client = RpcClient::new(&endpoint); diff --git a/sn_protocol/src/safenode_proto/req_resp_types.proto b/sn_protocol/src/safenode_proto/req_resp_types.proto index e58d78cde3..f0333e610a 100644 --- a/sn_protocol/src/safenode_proto/req_resp_types.proto +++ b/sn_protocol/src/safenode_proto/req_resp_types.proto @@ -57,35 +57,6 @@ message KBucketsResponse { map kbuckets = 1; } -// Subsribe to a gossipsub topic -message GossipsubSubscribeRequest { - string topic = 1; -} - -message GossipsubSubscribeResponse {} - -// Unsubsribe from a gossipsub topic -message GossipsubUnsubscribeRequest { - string topic = 1; -} - -message GossipsubUnsubscribeResponse {} - -// Publish a msg on a gossipsub topic -message GossipsubPublishRequest { - string topic = 1; - bytes msg = 2; -} - -message GossipsubPublishResponse {} - -// Set a PublicKey to start decoding and accepting Transfer notifications received over gossipsub. -message TransferNotifsFilterRequest { - bytes pk = 1; -} - -message TransferNotifsFilterResponse {} - // Stop the safenode app message StopRequest { uint64 delay_millis = 1; @@ -113,4 +84,4 @@ message UpdateLogLevelRequest { string log_level = 1; } -message UpdateLogLevelResponse{} \ No newline at end of file +message UpdateLogLevelResponse{} diff --git a/sn_protocol/src/safenode_proto/safenode.proto b/sn_protocol/src/safenode_proto/safenode.proto index 483ed66da5..d6f647610d 100644 --- a/sn_protocol/src/safenode_proto/safenode.proto +++ b/sn_protocol/src/safenode_proto/safenode.proto @@ -31,24 +31,12 @@ service SafeNode { // Returns a stream of events as triggered by this node rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent); - // Set a PublicKey to start decoding and accepting Transfer notifications received over gossipsub. - rpc TransferNotifsFilter (TransferNotifsFilterRequest) returns (TransferNotifsFilterResponse); - // Returns the Addresses of all the Records stored by this node rpc RecordAddresses (RecordAddressesRequest) returns (RecordAddressesResponse); // Returns the entire Kbucket of this node rpc KBuckets (KBucketsRequest) returns (KBucketsResponse); - // Subscribe to a Gossipsub topic - rpc SubscribeToTopic (GossipsubSubscribeRequest) returns (GossipsubSubscribeResponse); - - // Unsubscribe from a Gossipsub topic - rpc UnsubscribeFromTopic (GossipsubUnsubscribeRequest) returns (GossipsubUnsubscribeResponse); - - // Publish a msg on a Gossipsub topic - rpc PublishOnTopic (GossipsubPublishRequest) returns (GossipsubPublishResponse); - // Stop the execution of this node rpc Stop (StopRequest) returns (StopResponse); diff --git a/sn_service_management/README.md b/sn_service_management/README.md index ee5fa237b3..b5827d037a 100644 --- a/sn_service_management/README.md +++ b/sn_service_management/README.md @@ -5,13 +5,11 @@ Provides utilities for dealing with services, which are mainly used by the node ## RPC Actions The `RpcActions` trait defines the protocol that is currently available for interacting with `safenode`: + ``` node_info: Returns information about the node, such as its peer ID and version. network_info: Retrieves network-related information, such as the peers currently connected to the node. record_addresses: Provides a list of the node's record addresses. -gossipsub_subscribe: Subscribes to a specific topic on the gossipsub network. -gossipsub_unsubscribe: Unsubscribes from a given topic on the gossipsub network. -gossipsub_publish: Publishes a message to a specified topic on the gossipsub network. restart_node: Requests the node to restart. stop_node: Requests the node to stop its operations. update_node: Updates the node with provided parameters. @@ -20,6 +18,7 @@ update_node: Updates the node with provided parameters. Users of the crate can program against the trait rather than the `RpcClient` implementation. This can facilitate behaviour-based unit testing, like so: + ``` use mockall::mock; use mockall::predicate::*; diff --git a/sn_service_management/src/error.rs b/sn_service_management/src/error.rs index aa0442740e..26e1639b94 100644 --- a/sn_service_management/src/error.rs +++ b/sn_service_management/src/error.rs @@ -31,12 +31,6 @@ pub enum Error { PeerIdParseError(#[from] libp2p_identity::ParseError), #[error("Could not connect to RPC endpoint '{0}'")] RpcConnectionError(String), - #[error("Could not publish on topic {0} through RPC: {1}")] - RpcGossipPublishError(String, String), - #[error("Could not subscribe to gossip topic {0} through RPC: {1}")] - RpcGossipSubscribeError(String, String), - #[error("Could not unsubscribe from gossip topic {0} through RPC: {1}")] - RpcGossipUnsubscribeError(String, String), #[error("Could not obtain node info through RPC: {0}")] RpcNodeInfoError(String), #[error("Could not obtain network info through RPC: {0}")] diff --git a/sn_service_management/src/rpc.rs b/sn_service_management/src/rpc.rs index 75b6dfd6ac..f69a237356 100644 --- a/sn_service_management/src/rpc.rs +++ b/sn_service_management/src/rpc.rs @@ -10,8 +10,7 @@ use crate::error::{Error, Result}; use async_trait::async_trait; use libp2p::{kad::RecordKey, Multiaddr, PeerId}; use sn_protocol::safenode_proto::{ - safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest, - GossipsubUnsubscribeRequest, NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, + safe_node_client::SafeNodeClient, NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, RestartRequest, StopRequest, UpdateLogLevelRequest, UpdateRequest, }; use std::{net::SocketAddr, path::PathBuf, str::FromStr}; @@ -45,9 +44,6 @@ pub trait RpcActions: Sync { async fn node_info(&self) -> Result; async fn network_info(&self) -> Result; async fn record_addresses(&self) -> Result>; - async fn gossipsub_subscribe(&self, topic: &str) -> Result<()>; - async fn gossipsub_unsubscribe(&self, topic: &str) -> Result<()>; - async fn gossipsub_publish(&self, topic: &str, message: &str) -> Result<()>; async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()>; async fn node_stop(&self, delay_millis: u64) -> Result<()>; async fn node_update(&self, delay_millis: u64) -> Result<()>; @@ -165,49 +161,6 @@ impl RpcActions for RpcClient { Ok(record_addresses) } - async fn gossipsub_subscribe(&self, topic: &str) -> Result<()> { - let mut client = self.connect_with_retry().await?; - let _response = client - .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { - topic: topic.to_string(), - })) - .await - .map_err(|e| { - error!("Could not subscribe to gossip topic {topic:?} through RPC: {e:?}"); - Error::RpcGossipSubscribeError(topic.to_string(), e.to_string()) - })?; - Ok(()) - } - - async fn gossipsub_unsubscribe(&self, topic: &str) -> Result<()> { - let mut client = self.connect_with_retry().await?; - let _response = client - .unsubscribe_from_topic(Request::new(GossipsubUnsubscribeRequest { - topic: topic.to_string(), - })) - .await - .map_err(|e| { - error!("Could not unsubscribe from gossip topic {topic:?} through RPC: {e:?}"); - Error::RpcGossipUnsubscribeError(topic.to_string(), e.to_string()) - })?; - Ok(()) - } - - async fn gossipsub_publish(&self, topic: &str, msg: &str) -> Result<()> { - let mut client = self.connect_with_retry().await?; - let _response = client - .publish_on_topic(Request::new(GossipsubPublishRequest { - topic: topic.to_string(), - msg: msg.into(), - })) - .await - .map_err(|e| { - error!("Could not publish on topic {topic:?} through RPC: {e:?}"); - Error::RpcGossipPublishError(topic.to_string(), e.to_string()) - })?; - Ok(()) - } - async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()> { let mut client = self.connect_with_retry().await?; let _response = client