Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactoring #35

Merged
merged 40 commits into from
Nov 20, 2023
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8e50829
1/n [exponential backoff peers]: refactor "tx" to be once per connect…
ikatson Nov 17, 2023
55e692d
2/n Wrap PeerState into peer
ikatson Nov 17, 2023
6ebf212
Peers now reconnect!
ikatson Nov 17, 2023
2203ffe
Fixed bugs
ikatson Nov 18, 2023
db12bba
Not even sure what I'm doing
ikatson Nov 18, 2023
48a1482
Changed log to tracing
ikatson Nov 19, 2023
d39479a
Small refactorings
ikatson Nov 19, 2023
a745257
Split up a couple methods
ikatson Nov 19, 2023
38c9902
Change peer states to dashmap
ikatson Nov 19, 2023
adf3eef
Removed a couple deadlocks
ikatson Nov 19, 2023
19c3fd7
Uninline some methods for easier backtrace debugging
ikatson Nov 19, 2023
2b84202
Debugged one more deadlock (or it was the same one)
ikatson Nov 19, 2023
1a55936
Nothing
ikatson Nov 19, 2023
ff71ade
timed existence for lock time debugging
ikatson Nov 19, 2023
2ad2881
timed existence for lock time debugging
ikatson Nov 19, 2023
3f0c4b7
Fix the NotNeeded warning
ikatson Nov 19, 2023
124c605
more counters
ikatson Nov 19, 2023
17d4082
Timing on_received_message
ikatson Nov 19, 2023
4b3da0b
Trying to see why it hangs for a bit sometimes
ikatson Nov 19, 2023
0c6781b
Saving
ikatson Nov 19, 2023
b891cd4
Remove a couple unused methods
ikatson Nov 19, 2023
b40d33b
Fix a bug with wrong number of queued peers
ikatson Nov 19, 2023
98dbecf
Fix a logging bug
ikatson Nov 19, 2023
0170b19
Remove inline-nevers
ikatson Nov 19, 2023
adb98a2
Add more docs
ikatson Nov 19, 2023
2ebbc0a
Add examples
ikatson Nov 19, 2023
d6cef09
Remove an instance of double-locking in the same scope
ikatson Nov 19, 2023
0c89ee9
Add parameter with_peers to stats_snapshot while its slow
ikatson Nov 19, 2023
22ea146
Starting to implement aggregate peer stats
ikatson Nov 19, 2023
aa99872
WTF is going on with counters
ikatson Nov 20, 2023
88c2f9e
Finally fixed a stupid tracing bug with counters
ikatson Nov 20, 2023
1238593
Remove old slow peer stats computation
ikatson Nov 20, 2023
4a331d9
Make default peer connect timeout less = 2s
ikatson Nov 20, 2023
a9794de
New bug showing up when torrent is downloading super fast
ikatson Nov 20, 2023
1de690a
nothing
ikatson Nov 20, 2023
34ee9d9
Remove Option<BF> to just BF
ikatson Nov 20, 2023
2695a8e
Preventively fixed other race conditions
ikatson Nov 20, 2023
e2f909c
Add some documentation
ikatson Nov 20, 2023
b751b98
Update versions to 3.0.0 as theres a lot of changes internally
ikatson Nov 20, 2023
1b68b0e
Remove unnecessary debugging files
ikatson Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Preventively fixed other race conditions
  • Loading branch information
ikatson committed Nov 20, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 2695a8ec268b0e8f1b005e674884a3585f9ac779
17 changes: 4 additions & 13 deletions crates/librqbit/src/chunk_tracker.rs
Original file line number Diff line number Diff line change
@@ -8,8 +8,9 @@ pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
// It's set to 1 if we need a piece, but the moment we start requesting a peer,
// it's set to 0.

// Better to rename into piece_queue or smth, and maybe use some other form of a queue.
//
// Initially this is the opposite of "have", until we start making requests.
// An in-flight request is not in "needed", and not in "have".
needed_pieces: BF,

// This has a bit set per each chunk (block) that we have written to the output file.
@@ -21,6 +22,7 @@ pub struct ChunkTracker {

lengths: Lengths,

// What pieces to download first.
priority_piece_ids: Vec<usize>,
}

@@ -168,17 +170,6 @@ impl ChunkTracker {
piece.index, chunk_info, chunk_range,
);

// TODO: remove me, it's for debugging
// {
// use std::io::Write;
// let mut f = std::fs::OpenOptions::new()
// .write(true)
// .create(true)
// .open("/tmp/chunks")
// .unwrap();
// write!(f, "{:?}", &self.have).unwrap();
// }

if chunk_range.all() {
return Some(ChunkMarkingResult::Completed);
}
2 changes: 1 addition & 1 deletion crates/librqbit/src/peer_state.rs
Original file line number Diff line number Diff line change
@@ -224,7 +224,7 @@ pub struct LivePeerState {
pub i_am_choked: bool,
pub peer_interested: bool,

// This is used to limit the number of requests we send to a peer at a time.
// This is used to limit the number of chunk requests we send to a peer at a time.
pub requests_sem: Arc<Semaphore>,

// This is used to unpause processes after we were choked.
103 changes: 66 additions & 37 deletions crates/librqbit/src/torrent_state.rs
Original file line number Diff line number Diff line change
@@ -170,9 +170,8 @@ impl PeerStates {
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
let bitfield = BF::from_vec(bitfield);
live.previously_requested_pieces = BF::with_capacity(bitfield.len());
live.bitfield = bitfield;
live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]);
live.bitfield = BF::from_vec(bitfield);
})
}
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
@@ -205,14 +204,12 @@ impl PeerStates {
}

pub struct TorrentStateLocked {
// What chunks we have and need.
pub chunks: ChunkTracker,
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
}

impl TorrentStateLocked {
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
self.inflight_pieces.remove(&piece)
}
// At a moment in time, we are expecting a piece from only one peer.
// inflight_pieces stores this information.
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
}

#[derive(Default, Debug)]
@@ -623,15 +620,22 @@ impl TorrentState {
None
}

// TODO: need to throttle this or make it smarter as it may loop and steal pieces forever from each other.
// NOTE: this doesn't actually "steal" it, but only returns an id we might steal.
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom;

self.peers
.with_live(handle, |live| {
let g = self.lock_read("try_steal_piece");
g.inflight_pieces
.keys()
.filter(|p| {
live.previously_requested_pieces
.get(p.get() as usize)
.map(|r| *r)
== Some(false)
})
.filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p))
.choose(&mut rng)
.copied()
@@ -1102,43 +1106,44 @@ impl PeerHandler {
},
};

let (tx, sem) = match self
.state
.peers
.with_live(handle, |l| (l.tx.clone(), l.requests_sem.clone()))
{
Some((tx, sem)) => (tx, sem),
None => return Ok(()),
};
let (tx, sem) =
match self
.state
.peers
.with_live_mut(handle, "peer_setup_for_piece_request", |l| {
l.previously_requested_pieces.set(next.get() as usize, true);
(l.tx.clone(), l.requests_sem.clone())
}) {
Some(res) => res,
None => return Ok(()),
};

for chunk in self.state.lengths.iter_chunk_infos(next) {
if self
.state
.lock_read("is_chunk_downloaded")
.chunks
.is_chunk_downloaded(&chunk)
{
continue;
}
let request = Request {
index: next.get(),
begin: chunk.offset,
length: chunk.size,
};

match self
.state
.peers
.with_live_mut(handle, "inflight_requests.insert", |l| {
l.inflight_requests.insert(InflightRequest::from(&chunk))
.with_live_mut(handle, "add chunk request", |live| {
live.inflight_requests.insert(InflightRequest::from(&chunk))
}) {
Some(true) => {}
Some(false) => {
warn!("probably a bug, we already requested {:?}", chunk);
// This request was already in-flight for this peer for this chunk.
// This might happen in theory, but not very likely.
//
// Example:
// someone stole a piece from us, and then died, the piece became "needed" again, and we reserved it
// all before the piece request was processed by us.
warn!("we already requested {:?} previously", chunk);
continue;
}
// peer died
None => return Ok(()),
}

let request = Request {
index: next.get(),
begin: chunk.offset,
length: chunk.size,
};

loop {
@@ -1241,12 +1246,33 @@ impl PeerHandler {
let full_piece_download_time = {
let mut g = self.state.lock_write("mark_chunk_downloaded");

match g.inflight_pieces.get(&chunk_info.piece_index) {
Some(InflightPiece { peer, .. }) if *peer == handle => {}
Some(InflightPiece { peer, .. }) => {
debug!(
"in-flight piece {} was stolen by {}, ignoring",
chunk_info.piece_index, peer
);
return Ok(());
}
None => {
debug!(
"in-flight piece {} not found. it was probably completed by someone else",
chunk_info.piece_index
);
return Ok(());
}
};

match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done, will write and checksum", piece.index,);
// This will prevent others from stealing it.
g.remove_inflight_piece(chunk_info.piece_index)
.map(|t| t.started.elapsed())
{
let piece = chunk_info.piece_index;
g.inflight_pieces.remove(&piece)
}
.map(|t| t.started.elapsed())
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
// TODO: we might need to send cancellations here.
@@ -1263,6 +1289,9 @@ impl PeerHandler {
}
};

// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
// have fallen off above in one of the defensive checks.

self.spawner
.spawn_block_in_place(move || {
let index = piece.index;