Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create encrypted UDP transport with streaming and firewall hole-punching #949

Merged
merged 272 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 138 commits
Commits
Show all changes
272 commits
Select commit Hold shift + click to select a range
6e53e32
wip
sanity Jan 10, 2024
61c857c
wip
sanity Jan 10, 2024
323a1b8
more work on UDP Connection
sanity Jan 10, 2024
e1c3b7f
commit before removing UdpConnection
sanity Jan 12, 2024
7e09cc2
wip
sanity Jan 12, 2024
db70f79
write spec for Freenet Transport Protocol
sanity Jan 12, 2024
12b9ac0
write spec for Freenet Transport Protocol
sanity Jan 12, 2024
d52f985
spec improvements
sanity Jan 12, 2024
d9ae667
use RSA for transport keypair
sanity Jan 12, 2024
98726c9
reformat, edits
sanity Jan 13, 2024
7d21d44
reformat, edits
sanity Jan 13, 2024
868a0ec
reformat, edits
sanity Jan 13, 2024
655e679
reformat, edits
sanity Jan 13, 2024
40c5798
reformat, edits
sanity Jan 13, 2024
19a8c18
reformat, edits
sanity Jan 13, 2024
4164ae9
work on udp_transport
sanity Jan 13, 2024
73cb4af
Merge branch 'main' into 186770521_port_tahrir_transport_code
sanity Jan 14, 2024
1c2d929
Cleaning up a bit, minor changes
iduartgomez Jan 14, 2024
3ec34a3
Encpasualte public API
iduartgomez Jan 14, 2024
a68f7a7
Change back to connection_info, makes more sense for now
iduartgomez Jan 14, 2024
7227b99
Docs
iduartgomez Jan 14, 2024
8feca05
Docs
iduartgomez Jan 14, 2024
1e19e95
typo
sanity Jan 14, 2024
b8a421f
typo
sanity Jan 14, 2024
c9732db
minor
sanity Jan 14, 2024
8029e9e
wip: connection establishing
iduartgomez Jan 14, 2024
6f40839
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Jan 14, 2024
b50edd6
add bandwidth tracker
sanity Jan 14, 2024
8e81a32
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
sanity Jan 14, 2024
2d5f8ac
update bwlimit
sanity Jan 14, 2024
83b5dbd
specify bandwidth limit in can_send_packet parameter rather than stor…
sanity Jan 14, 2024
6f8688a
clean up
sanity Jan 14, 2024
2752d4d
bandwidth_limit moved from struct to can_send_packet() param
sanity Jan 15, 2024
3be609b
speed up tests significantly with a mock timesource
sanity Jan 15, 2024
5813c42
clean up visibility
sanity Jan 15, 2024
f33969c
cleanup
sanity Jan 16, 2024
bdf97c2
adjust packet size
sanity Jan 16, 2024
2401a6b
Add nat traversal + clean up
iduartgomez Jan 16, 2024
3b7c65c
Don't send protocol on separate step
iduartgomez Jan 16, 2024
bbc0f26
Set connection channel
iduartgomez Jan 16, 2024
08072a6
Remove peerid ref + fix peer internal channels
iduartgomez Jan 17, 2024
55bef37
rewrite and hopefully fix transport doc
sanity Jan 17, 2024
d5249dc
minor copy
sanity Jan 17, 2024
9d7f964
minor copy
sanity Jan 17, 2024
7682a72
improve bw tracker unit tests
sanity Jan 18, 2024
2eb2b4c
Merge branch 'main' into 186770521_port_tahrir_transport_code
sanity Jan 20, 2024
0399c7e
wip
iduartgomez Jan 21, 2024
7c2ab69
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Jan 21, 2024
6409e00
wip
iduartgomez Jan 21, 2024
932fa46
add note re: nonce
sanity Jan 21, 2024
9b07da9
improve comment
sanity Jan 21, 2024
350e068
generate nonce randomly for every packet using efficient RNG
sanity Jan 21, 2024
139fe14
fix, still need to unit test
sanity Jan 21, 2024
756a76d
Revert "fix, still need to unit test"
sanity Jan 21, 2024
e632c1b
Revert "generate nonce randomly for every packet using efficient RNG"
sanity Jan 21, 2024
d07018b
nonce is now prepended, unit test passes
sanity Jan 21, 2024
95321bc
comment
sanity Jan 21, 2024
80a39ca
encrypt data in-place
sanity Jan 21, 2024
02fb8aa
verify max size data can be encrypted
sanity Jan 21, 2024
451623d
break PacketData into own module, improve tests
sanity Jan 21, 2024
b3a7d9e
relax test to avoid random failure
sanity Jan 21, 2024
0f304f0
test packet corruption detection
sanity Jan 21, 2024
677e757
indent
sanity Jan 22, 2024
17a92bc
Attempt to move some checks to const time
iduartgomez Jan 22, 2024
9703f72
Merge
iduartgomez Jan 22, 2024
cbfc8ff
wip
iduartgomez Jan 22, 2024
3a7c799
wip
iduartgomez Jan 22, 2024
a8d5922
wip
iduartgomez Jan 23, 2024
bb8b888
Merge branch 'main' into 186770521_port_tahrir_transport_code
sanity Jan 23, 2024
9fbbc79
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Jan 25, 2024
f0ec7ec
Implement exponential backoff when establishing UDP connection (#971)
sanity Jan 25, 2024
e293826
typo
sanity Jan 26, 2024
826891b
wip
iduartgomez Dec 20, 2023
4f85c74
wip
iduartgomez Jan 27, 2024
b4055ce
wip
iduartgomez Jan 27, 2024
d606064
wip
iduartgomez Jan 28, 2024
09f5a4d
make peer connection a future
iduartgomez Jan 28, 2024
42efb03
wip
iduartgomez Jan 28, 2024
d51f519
committing WIP with build failure due to const MAX_PACKET_SIZE assertion
sanity Jan 30, 2024
f8a50b2
fix assertion error
sanity Jan 30, 2024
50dcc07
message_ids should be u32s not u16s due to wraparound risk
sanity Jan 30, 2024
1c9c4e8
ReceiptTracker implemented and tested
sanity Jan 30, 2024
a788b65
rename
sanity Jan 30, 2024
94b7f23
send/receive packet trackers implemented and unit tested
sanity Jan 31, 2024
57f48e4
adjust doc
sanity Jan 31, 2024
575c233
improve docs
sanity Jan 31, 2024
11c61ca
improve docs
sanity Jan 31, 2024
b56733d
improve docs
sanity Jan 31, 2024
08baf74
improve docs
sanity Jan 31, 2024
dc17a0e
wip
sanity Jan 31, 2024
7829b42
use bandwidth tracker
iduartgomez Jan 31, 2024
9410c97
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Jan 31, 2024
fcf994d
Fix duplicate time source definitions
iduartgomez Jan 31, 2024
c0af162
fix various MockTimeSource issues
sanity Jan 31, 2024
4d696fd
address some clippy suggestions
sanity Jan 31, 2024
fc12096
improve constants
sanity Jan 31, 2024
52318a4
SystemTime means something else, rename
sanity Feb 1, 2024
2966c4a
cache time every 20ms to avoid the system call for every packet
sanity Feb 1, 2024
5526afb
naming, comments
sanity Feb 1, 2024
f054e16
naming, comments
sanity Feb 1, 2024
26dc47a
Mutex -> RwLock and force use of CachingSystemTimeSrc::new()
sanity Feb 1, 2024
e43ee3b
wip
iduartgomez Feb 1, 2024
68c6d11
wip
iduartgomez Feb 1, 2024
86ca09f
Optimize system time to be lock free
iduartgomez Feb 1, 2024
c880c68
comment code for those of us less familiar with atomic operations
sanity Feb 1, 2024
d4a9cf6
Fix issue in resend logic of SentPacketTracker
sanity Feb 1, 2024
593b7f2
remove unnecessary mut
sanity Feb 1, 2024
915e22d
add test_many_timesources() test to check for deadlocks
sanity Feb 1, 2024
c592c19
add test for creation of many trackers, trying to isolate deadlock
sanity Feb 1, 2024
844a9fa
Improve resend logic in SentPacketTracker
sanity Feb 2, 2024
5737bf1
add integration test for sent/received packet tracker
sanity Feb 2, 2024
734d2bf
Merge branch 'main' into 186770521_port_tahrir_transport_code
iduartgomez Feb 2, 2024
eaf3005
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Feb 2, 2024
98a2f16
wip: finish nat traversal
iduartgomez Feb 2, 2024
ecaf02c
wip: send pub key on connect
iduartgomez Feb 3, 2024
617d80d
send pub key
iduartgomez Feb 3, 2024
42b3413
wip: integrate packet trackers
iduartgomez Feb 3, 2024
369469c
Check for duplicate intro packets
iduartgomez Feb 4, 2024
e288323
Notify of inbound connections
iduartgomez Feb 4, 2024
5912117
Make UDP listener more testable
iduartgomez Feb 4, 2024
b97ddfd
Add receiver stream impl
iduartgomez Feb 4, 2024
71d72fd
wip: sender stream
iduartgomez Feb 5, 2024
8e213a8
wip
iduartgomez Feb 7, 2024
362ca89
impl sender stream
iduartgomez Feb 7, 2024
251af04
Make future of sender stream
iduartgomez Feb 7, 2024
cc14f51
wip
iduartgomez Feb 7, 2024
b5fea97
Separate listening from sending
iduartgomez Feb 8, 2024
f5dde41
wip
sanity Feb 9, 2024
efd51a4
Merge branch '186770521_port_tahrir_transport_code' into receiver-str…
sanity Feb 9, 2024
c6a507d
wip
iduartgomez Feb 9, 2024
59a2649
simple test passes
sanity Feb 9, 2024
c393aac
Merge branch 'receiver-stream-tests' of github.com:freenet/freenet-co…
sanity Feb 9, 2024
fd9ff52
receiverstream tests now pass
sanity Feb 9, 2024
a2155fe
reorg tests
sanity Feb 9, 2024
82d213f
more ReceiverStream into own module
sanity Feb 10, 2024
a8876d3
move SenderStream into own module
sanity Feb 10, 2024
f038d5b
verify internals in tests
sanity Feb 10, 2024
a51d620
wip
iduartgomez Feb 10, 2024
fd158b6
refactor outbound connection to have multiple connections
iduartgomez Feb 10, 2024
2571f13
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
iduartgomez Feb 10, 2024
d7c19e7
merge
iduartgomez Feb 10, 2024
1f2b8e0
Add message_id to LongMessageFragment
sanity Feb 10, 2024
89f0e02
skeleton of ConnectionHandler sender/receiver functions
sanity Feb 11, 2024
167d1e3
refactor: senderstream does not need to be a future
iduartgomez Feb 11, 2024
d0ca222
fix typo
sanity Feb 11, 2024
9655dde
switched back to using RwLock, tests now work again
sanity Feb 11, 2024
3a55c01
Revert "switched back to using RwLock, tests now work again"
sanity Feb 11, 2024
a5c1880
fix tests, add comments warning of problem with restarting tokio
sanity Feb 11, 2024
9f46e1b
move time_source into own submodule
sanity Feb 11, 2024
4274ab7
CachingSystemTimeSrc should use threads, not tokio
sanity Feb 12, 2024
026fb6a
update cached time every 10ms rather than every 20ms
sanity Feb 12, 2024
f1437a6
Don't use caching time source for now in case it causes hard-to-debug…
sanity Feb 12, 2024
a131451
tidy imports
sanity Feb 13, 2024
01dec52
Move resend checking out of stream
iduartgomez Feb 14, 2024
6f4a6fe
Big fixed and simpification re. receipt checking
iduartgomez Feb 14, 2024
5f506b8
Isolate send_long_message
iduartgomez Feb 14, 2024
71fa5cd
Centralize outbound traffic
iduartgomez Feb 17, 2024
f8191db
wip
iduartgomez Feb 17, 2024
472fa79
wip
iduartgomez Feb 17, 2024
46a8f59
Finish outbound stream multiplexing
iduartgomez Feb 17, 2024
80e6b73
Multiplex inbound streams
iduartgomez Feb 17, 2024
ebcf1db
Properly report receipts and refactor outbound packet sending
iduartgomez Feb 17, 2024
1387e4a
Minor fragment no counting fix
iduartgomez Feb 18, 2024
9895767
Fixes for NAT traversal
iduartgomez Feb 18, 2024
e9c59a0
Move inbound decoding outside of connection loop
iduartgomez Feb 19, 2024
ade7519
start on inbound_stream / outbound_stream integration test, not worki…
sanity Feb 23, 2024
e8bb3fe
Rename stream_id to be less confussing
iduartgomez Feb 23, 2024
377efe7
more work on unit test, still not working - rename function for clarity
sanity Feb 24, 2024
1c45fed
another function rename
sanity Feb 24, 2024
285ece2
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
sanity Feb 24, 2024
5c3c842
Fix minor error with skipping a message id on outbound stream
iduartgomez Feb 24, 2024
4d3967b
Rename a few functions for clarity
sanity Feb 24, 2024
e44e077
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
sanity Feb 24, 2024
e82005c
fix build
sanity Feb 24, 2024
9be3c31
rename parameter for readability
sanity Feb 24, 2024
a667286
rename parameter for readability
sanity Feb 24, 2024
2c8a8f7
rename parameter for readability
sanity Feb 24, 2024
b2afad0
use MessageId type alias for clarity
sanity Feb 24, 2024
ce5251e
Add gateway implementation
iduartgomez Feb 24, 2024
35a39cd
rename for clarity
sanity Feb 24, 2024
d482e9e
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
sanity Feb 24, 2024
5aa1b6e
FixCompile error
iduartgomez Feb 24, 2024
703cc33
more MessageIds
sanity Feb 25, 2024
afa721e
Merge branch '186770521_port_tahrir_transport_code' of github.com:fre…
sanity Feb 25, 2024
3e26a5a
Improve gateway codepath a bit
iduartgomez Feb 25, 2024
9282e18
rename stream_id to long_message_id because term 'stream' has another…
sanity Feb 25, 2024
676df13
type stream id and don't mix it up with message_id
iduartgomez Feb 25, 2024
fa11dac
Change MessageId for PacketId since that's what they really are
iduartgomez Feb 25, 2024
b4844ef
More naming consistency changes
iduartgomez Feb 25, 2024
acdf394
stream_id -> long_message_id
sanity Feb 26, 2024
1c2c6c2
change remaining references from 'stream' to 'long message'
sanity Feb 26, 2024
fc3c7df
Terminology change/standardization:
sanity Feb 27, 2024
b9ba8eb
Suggestions for how to do the unit testing
iduartgomez Feb 29, 2024
6b05c05
test_send_stream_success() implemented but not yet working
sanity Mar 1, 2024
c9e24fd
Remove sent tracker from outbound stream
iduartgomez Mar 2, 2024
bd95712
Test both streams coupled
iduartgomez Mar 2, 2024
1a00796
Set p2p simulation test
iduartgomez Mar 2, 2024
a23d3dc
Add a function to get your own addr after a connection to a gateway
iduartgomez Mar 3, 2024
d023234
initial work, doesn't compile
sanity Mar 7, 2024
5da6877
Isolate weird bincode serialization issue
iduartgomez Mar 9, 2024
27b3140
Fix issue with serde repr
iduartgomez Mar 9, 2024
bb75b02
Test all symmetric messages encryption
iduartgomez Mar 9, 2024
b4aefc6
Fix issue with passing the correct message size
iduartgomez Mar 9, 2024
1cc7739
Fix issue with outbound stream
iduartgomez Mar 10, 2024
b1e749c
Fix issue with outbound fragment idx
iduartgomez Mar 10, 2024
7b6506b
progress but still errors
sanity Mar 10, 2024
c123cf0
wip
sanity Mar 10, 2024
9fd46dd
wip
sanity Mar 10, 2024
f991bd2
wip
sanity Mar 10, 2024
09ba0a9
Simplify and compile errors
iduartgomez Mar 10, 2024
ebccefd
Merge branch '186770521_port_tahrir_transport_code' into typed-packet…
iduartgomez Mar 10, 2024
fc841df
Cleanup packetdata stuff a bit more
iduartgomez Mar 10, 2024
24b242b
Fixes to nat traversal, happy path test passes
iduartgomez Mar 10, 2024
bfd6182
simplify is_intro_packet without sacraficing efficiency
sanity Mar 10, 2024
39d82b6
merge
sanity Mar 10, 2024
758d0b7
remove dead code warning
sanity Mar 11, 2024
86fb5b5
wip
sanity Mar 13, 2024
a54beb3
unit test sending a short message, passes
sanity Mar 14, 2024
3fdd7fc
simulate_send_streamed_message() implemented but panicing, committing…
sanity Mar 14, 2024
8a77000
Fix unwrap issue
iduartgomez Mar 16, 2024
d0cffc1
Fix issue with incorrectly recreating a new stream
iduartgomez Mar 17, 2024
cfbef70
Attempt at finding deadlock
iduartgomez Mar 17, 2024
5c1663f
temp fix for strange deadlock when cap 1 for socket and inbound stream
iduartgomez Mar 19, 2024
ad05a30
Always process inbound messages
iduartgomez Mar 19, 2024
350c5be
Except when it was already processed
iduartgomez Mar 19, 2024
0288176
Fix issue with inbound stream buf being full
iduartgomez Mar 23, 2024
be0f617
Fix issue with also outbound streams not advancing
iduartgomez Mar 23, 2024
0a36977
Make inbound stream only use 1 cap buf again
iduartgomez Mar 23, 2024
81c4468
Generalize conn handler tests
iduartgomez Mar 24, 2024
1a4707e
wip: more than 2 conns fail to establish
iduartgomez Mar 24, 2024
dd4eaa8
ignore rustc debug output
sanity Mar 24, 2024
491ad95
Merge branch 'main' into 186770521_port_tahrir_transport_code
sanity Mar 24, 2024
8ac2bc0
Allow parallel connections
iduartgomez Mar 24, 2024
d8cd206
Further fixes to allow parallel connection ops
iduartgomez Mar 24, 2024
9df001e
Test more parallel connections
iduartgomez Mar 24, 2024
18a1cbd
wip: test packet dropping
iduartgomez Mar 31, 2024
7692ba5
Fix failing connection establishment due to packet loss
iduartgomez Mar 31, 2024
5386653
Don't report packet send in case of error at rate limiter
iduartgomez Mar 31, 2024
639bb2b
Fix race condition when completing connection task
iduartgomez Apr 7, 2024
0db7b03
Remove unexpected deadlock due to test impl
iduartgomez Apr 7, 2024
d96d3f2
Give enough time drop packet tests to send receipts and resends
iduartgomez Apr 7, 2024
4834cf6
Add gateway unit test (#1033)
al8n Apr 8, 2024
67c976e
Drop packets (#1034)
al8n Apr 13, 2024
bec4f42
Kill connection if not keep alive message has been received in a while
iduartgomez Apr 13, 2024
f0105d4
Fix test cases, in all cases the connection should happen
iduartgomez Apr 13, 2024
c8ad00a
wip: handle packet dropping gracefully while rstablishing connection
iduartgomez Apr 13, 2024
3262ef1
Fix packet drop issues and simplify connection
iduartgomez Apr 13, 2024
39109ef
Fine tune tests a bit
iduartgomez Apr 13, 2024
2a8cff9
Update deps
iduartgomez Apr 13, 2024
a1003ef
Merge remote-tracking branch 'origin/main' into 186770521_port_tahrir…
iduartgomez Apr 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ path = "src/bin/freenet.rs"

[dependencies]
anyhow = "1"
arc-swap = "1.6"
asynchronous-codec = "0.7"
aes-gcm = "0.10.3"
async-trait = "0.1"
axum = { default-features = false, features = ["http1", "matched-path", "query", "tower-log", "ws", "json"], workspace = true }
bincode = "1"
bincode = "1.3.3"
blake3 = { workspace = true }
bs58 = "0.5"
byteorder = "1"
Expand Down Expand Up @@ -54,6 +56,7 @@ serde_with = { workspace = true }
sqlx = { features = ["runtime-tokio-rustls", "sqlite"], optional = true, version = "0.7" }
stretto = { features = ["async", "sync"], version = "0.8" }
tar = { version = "0.4.38" }
time = "0.3.30"
thiserror = "1"
tokio = { features = ["fs", "macros", "rt-multi-thread", "sync", "process"], version = "1" }
tokio-tungstenite = "0.21"
Expand All @@ -63,7 +66,7 @@ unsigned-varint = { version = "0.8", features = ["codec", "asynchronous_codec"]
wasmer = { features = ["sys"], workspace = true }
xz2 = { version = "0.1" }
reqwest = { version = "0.11.23", features = ["json"] }
# enum-iterator = "1.4.1"
rsa = { version = "0.9.6", features = ["serde"] }

# Tracing deps
opentelemetry = "0.21.0"
Expand All @@ -74,7 +77,6 @@ tracing-subscriber = { optional = true, version = "0.3.16" }

# internal deps
freenet-stdlib = { features = ["net"], workspace = true }
time = "0.3.30"

[dev-dependencies]
arbitrary = { features = ["derive"], version = "1" }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod server;
mod topology;
/// Tracing and loging infrastructure. Includes our custom event log register. Tracing collectors, etc.
mod tracing;
/// Code for communicating with other peers over UDP, handles hole-punching, error handling, etc.
mod transport;
pub mod util;
/// WASM code execution runtime, tailored for the contract and delegate APIs.
mod wasm_runtime;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod tests {
// Simulate a bunch of random requests clustered around 0.35
for _ in 0..NUM_REQUESTS {
let requested_location = random_location(&random_location(&this_peer_location));
// FIXME: Is PeerKeyLocation unimportant for this test?
// todo: Is PeerKeyLocation unimportant for this test?
topology_manager.record_request(
PeerKeyLocation::random(),
requested_location,
Expand Down
82 changes: 82 additions & 0 deletions crates/core/src/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#![allow(dead_code)] // TODO: Remove before integration
//! Freenet Transport protocol implementation.
//!
//! Please see `docs/architecture/transport.md` for more information.

mod bw;
mod connection_handler;
mod crypto;
mod packet_data;
mod peer_connection;
mod received_packet_tracker;
mod sent_packet_tracker;
mod symmetric_message;

type MessagePayload = Vec<u8>;
type MessageId = u32;
iduartgomez marked this conversation as resolved.
Show resolved Hide resolved

use self::packet_data::PacketData;
use std::time::Duration;

/// We can wait up to 100ms to confirm a message was received, this allows us to batch
/// receipts together and send them in a single message.
const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);

struct BytesPerSecond(f64);

impl BytesPerSecond {
pub fn new(bytes_per_second: f64) -> Self {
assert!(bytes_per_second >= 0.0);
Self(bytes_per_second)
}

pub fn as_f64(&self) -> f64 {
self.0
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::transport::received_packet_tracker::ReportResult;
use crate::transport::sent_packet_tracker::{ResendAction, MESSAGE_CONFIRMATION_TIMEOUT};

#[test]
fn test_packet_send_receive_acknowledge_flow() {
let mut sent_tracker = sent_packet_tracker::tests::mock_sent_packet_tracker();
let mut received_tracker = received_packet_tracker::tests::mock_received_packet_tracker();

// Simulate sending packets
for id in 1..=5 {
sent_tracker.report_sent_packet(id, vec![id as u8]);
}

// Simulate receiving some packets
for id in [1, 3, 5] {
assert_eq!(
received_tracker.report_received_packet(id),
ReportResult::Ok
);
}

// Get receipts and simulate acknowledging them
let receipts = received_tracker.get_receipts();
assert_eq!(receipts, vec![1, 3, 5]);
sent_tracker.report_received_receipts(&receipts);

// Check resend action for lost packets
sent_tracker
.time_source
.advance_time(MESSAGE_CONFIRMATION_TIMEOUT);
for id in [2, 4] {
match sent_tracker.get_resend() {
ResendAction::Resend(message_id, packet) => {
assert_eq!(message_id, id);
// Simulate resending packet
sent_tracker.report_sent_packet(id, packet);
}
_ => panic!("Expected resend action for packet {}", id),
}
}
}
}
156 changes: 156 additions & 0 deletions crates/core/src/transport/bw.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use crate::util::{CachingSystemTimeSrc, TimeSource};
use std::collections::VecDeque;
use std::time::{Duration, Instant};

/// Keeps track of the bandwidth used in the last window_size. Recommend a `window_size` of
/// 10 seconds.
pub(super) struct PacketBWTracker<T: TimeSource> {
packets: VecDeque<(usize, Instant)>,
window_size: Duration,
current_bandwidth: usize,
time_source: T,
}

impl PacketBWTracker<CachingSystemTimeSrc> {
pub(super) fn new(window_size: Duration) -> Self {
PacketBWTracker {
packets: VecDeque::new(),
window_size,
current_bandwidth: 0,
time_source: CachingSystemTimeSrc::new(),
}
}
}

impl<T: TimeSource> PacketBWTracker<T> {
/// Report that a packet was sent
pub(super) fn add_packet(&mut self, packet_size: usize) {
let now = self.time_source.now();
self.packets.push_back((packet_size, now));
self.current_bandwidth += packet_size;
self.cleanup();
}

/// Removes packets that are older than the window size.
fn cleanup(&mut self) {
let now = self.time_source.now();
while self
.packets
.front()
.map_or(false, |&(_, time)| now - time > self.window_size)
{
let expired = self.packets.pop_front();
if let Some((size, _)) = expired {
self.current_bandwidth -= size;
}
}
}

/// Returns none if the packet can be sent immediately without `bandwidth_limit` being
/// exceeded within the `window_size`. Otherwise returns Some(wait_time) where wait_time is the
/// amount of time that should be waited before sending the packet.
///
/// `bandwidth_limit` should be set to 50% higher than the target upstream bandwidth the
/// [topology manager](crate::topology::TopologyManager) is aiming for, as it serves
/// as a hard limit which we'd prefer not to hit.
pub(super) fn can_send_packet(
&mut self,
bandwidth_limit: usize,
packet_size: usize,
) -> Option<Duration> {
self.cleanup();

if self.current_bandwidth + packet_size <= bandwidth_limit {
return None;
}

let mut temp_bandwidth = self.current_bandwidth;
let mut wait_time = None;

for &(size, time) in self.packets.iter() {
temp_bandwidth -= size;
if temp_bandwidth + packet_size <= bandwidth_limit {
wait_time = Some(self.window_size - (self.time_source.now() - time));
break;
}
}

wait_time
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::util::MockTimeSource;

fn mock_tracker(window_size: Duration) -> PacketBWTracker<MockTimeSource> {
PacketBWTracker {
packets: VecDeque::new(),
window_size,
current_bandwidth: 0,
time_source: MockTimeSource::new(Instant::now()),
}
}

fn verify_bandwidth_match<T: TimeSource>(tracker: &PacketBWTracker<T>) {
let mut total_bandwidth = 0;
for &(size, _) in tracker.packets.iter() {
total_bandwidth += size;
}
assert_eq!(total_bandwidth, tracker.current_bandwidth);
}

#[test]
fn test_adding_packets() {
let mut tracker = PacketBWTracker::new(Duration::from_secs(1));
verify_bandwidth_match(&tracker);
tracker.add_packet(1500);
verify_bandwidth_match(&tracker);
assert_eq!(tracker.packets.len(), 1);
}

#[test]
fn test_bandwidth_calculation() {
let mut tracker = PacketBWTracker::new(Duration::from_secs(1));
tracker.add_packet(1500);
tracker.add_packet(2500);
verify_bandwidth_match(&tracker);
assert_eq!(
tracker.packets.iter().map(|&(size, _)| size).sum::<usize>(),
4000
);
}

#[test]
fn test_packet_expiry() {
let mut tracker = mock_tracker(Duration::from_millis(200));
tracker.add_packet(1500);
verify_bandwidth_match(&tracker);
tracker.time_source.advance_time(Duration::from_millis(300));
tracker.cleanup();
verify_bandwidth_match(&tracker);
assert!(tracker.packets.is_empty());
}

#[test]
fn test_wait_time_calculation() {
let mut tracker = mock_tracker(Duration::from_secs(1));
tracker.add_packet(5000);
verify_bandwidth_match(&tracker);
tracker.time_source.advance_time(Duration::from_millis(500));
tracker.add_packet(4000);
verify_bandwidth_match(&tracker);
match tracker.can_send_packet(10000, 2000) {
None => panic!("Should require waiting"),
Some(wait_time) => assert_eq!(wait_time, Duration::from_millis(500)),
}
}

#[test]
fn test_immediate_send() {
let mut tracker = PacketBWTracker::new(Duration::from_millis(10));
tracker.add_packet(3000);
assert_eq!(tracker.can_send_packet(10000, 2000), None);
}
}
Loading
Loading