Skip to content

Commit

Permalink
Fix only files not working properly (#59)
Browse files Browse the repository at this point in the history
* 1/n fixing only files - tracking stats better

* 2/n proper tracking of stats when only certain files selected
  • Loading branch information
ikatson authored Dec 14, 2023
1 parent 50fc7f2 commit 325855b
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 46 deletions.
24 changes: 23 additions & 1 deletion crates/librqbit/src/chunk_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct ChunkTracker {

// What pieces to download first.
priority_piece_ids: Vec<usize>,

total_selected_bytes: u64,
}

// TODO: this should be redone from "have" pieces, not from "needed" pieces.
Expand Down Expand Up @@ -58,7 +60,12 @@ pub enum ChunkMarkingResult {
}

impl ChunkTracker {
pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self {
pub fn new(
needed_pieces: BF,
have_pieces: BF,
lengths: Lengths,
total_selected_bytes: u64,
) -> Self {
// TODO: ideally this needs to be a list based on needed files, e.g.
// last needed piece for each file. But let's keep simple for now.

Expand All @@ -80,9 +87,14 @@ impl ChunkTracker {
lengths,
have: have_pieces,
priority_piece_ids,
total_selected_bytes,
}
}

pub fn get_total_selected_bytes(&self) -> u64 {
self.total_selected_bytes
}

pub fn get_lengths(&self) -> &Lengths {
&self.lengths
}
Expand All @@ -104,6 +116,16 @@ impl ChunkTracker {
.sum()
}

pub fn calc_needed_bytes(&self) -> u64 {
self.needed_pieces
.iter_ones()
.filter_map(|piece_id| {
let piece_id = self.lengths.validate_piece_index(piece_id as u32)?;
Some(self.lengths.piece_length(piece_id) as u64)
})
.sum()
}

pub fn iter_needed_pieces(&self) -> impl Iterator<Item = usize> + '_ {
self.priority_piece_ids
.iter()
Expand Down
20 changes: 18 additions & 2 deletions crates/librqbit/src/file_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ use tracing::{debug, trace, warn};
use crate::type_aliases::{PeerHandle, BF};

pub(crate) struct InitialCheckResults {
// The pieces that we need to download.
pub needed_pieces: BF,
// The pieces we have downloaded.
pub have_pieces: BF,
// How many bytes we have. This can be MORE than "total_selected_bytes",
// if we downloaded some pieces, and later the "only_files" was changed.
pub have_bytes: u64,
// How many bytes we need to download.
pub needed_bytes: u64,
// How many bytes are in selected pieces.
// If all selected, this must be equal to total torrent length.
pub total_selected_bytes: u64,
}

pub fn update_hash_from_file<Sha1: ISha1>(
Expand Down Expand Up @@ -77,6 +85,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {

let mut have_bytes = 0u64;
let mut needed_bytes = 0u64;
let mut total_selected_bytes = 0u64;

#[derive(Debug)]
struct CurrentFile<'a> {
Expand Down Expand Up @@ -135,6 +144,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
let mut to_read_in_file =
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;

// Keep changing the current file to next until we find a file that has greater than 0 length.
while to_read_in_file == 0 {
current_file = file_iterator
.next()
Expand All @@ -157,7 +167,8 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {

let mut fd = current_file.fd.lock();

fd.seek(SeekFrom::Start(pos)).unwrap();
fd.seek(SeekFrom::Start(pos))
.context("bug? error seeking")?;
if let Err(err) = update_hash_from_file(
&mut fd,
&mut computed_hash,
Expand All @@ -173,6 +184,10 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
}
}

if at_least_one_file_required {
total_selected_bytes += piece_info.len as u64;
}

if at_least_one_file_required && some_files_broken {
trace!(
"piece {} had errors, marking as needed",
Expand All @@ -187,7 +202,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
if self
.torrent
.compare_hash(piece_info.piece_index.get(), computed_hash.finish())
.unwrap()
.context("bug: either torrent info broken or we have a bug - piece index invalid")?
{
trace!(
"piece {} is fine, not marking as needed",
Expand Down Expand Up @@ -215,6 +230,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
have_pieces,
have_bytes,
needed_bytes,
total_selected_bytes,
})
}

Expand Down
7 changes: 5 additions & 2 deletions crates/librqbit/src/torrent_state/initializing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ impl TorrentStateInitializing {
})?;

info!(
"Initial check results: have {}, needed {}",
"Initial check results: have {}, needed {}, total selected {}",
SF::new(initial_check_results.have_bytes),
SF::new(initial_check_results.needed_bytes)
SF::new(initial_check_results.needed_bytes),
SF::new(initial_check_results.total_selected_bytes)
);

self.meta.spawner.spawn_block_in_place(|| {
Expand Down Expand Up @@ -126,6 +127,7 @@ impl TorrentStateInitializing {
initial_check_results.needed_pieces,
initial_check_results.have_pieces,
self.meta.lengths,
initial_check_results.total_selected_bytes,
);

let paused = TorrentStatePaused {
Expand All @@ -134,6 +136,7 @@ impl TorrentStateInitializing {
filenames,
chunk_tracker,
have_bytes: initial_check_results.have_bytes,
needed_bytes: initial_check_results.needed_bytes,
};
Ok(paused)
}
Expand Down
16 changes: 10 additions & 6 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub struct TorrentStateLive {
filenames: Vec<PathBuf>,

initially_needed_bytes: u64,
total_selected_bytes: u64,

stats: AtomicStats,
lengths: Lengths,
Expand Down Expand Up @@ -203,7 +204,8 @@ impl TorrentStateLive {
let up_speed_estimator = SpeedEstimator::new(5);

let have_bytes = paused.have_bytes;
let needed_bytes = paused.info.lengths.total_length() - have_bytes;
let needed_bytes = paused.needed_bytes;
let total_selected_bytes = paused.chunk_tracker.get_total_selected_bytes();
let lengths = *paused.chunk_tracker.get_lengths();

let state = Arc::new(TorrentStateLive {
Expand All @@ -222,6 +224,7 @@ impl TorrentStateLive {
},
initially_needed_bytes: needed_bytes,
lengths,
total_selected_bytes,
peer_semaphore: Arc::new(Semaphore::new(128)),
peer_queue_tx,
finished_notify: Notify::new(),
Expand Down Expand Up @@ -599,6 +602,10 @@ impl TorrentStateLive {
});
}

pub fn get_total_selected_bytes(&self) -> u64 {
self.total_selected_bytes
}

pub fn get_uploaded_bytes(&self) -> u64 {
self.stats.uploaded_bytes.load(Ordering::Relaxed)
}
Expand Down Expand Up @@ -690,16 +697,11 @@ impl TorrentStateLive {
pub fn stats_snapshot(&self) -> StatsSnapshot {
use Ordering::*;
let downloaded_bytes = self.stats.downloaded_and_checked_bytes.load(Relaxed);
let remaining = self.initially_needed_bytes - downloaded_bytes;
StatsSnapshot {
have_bytes: self.stats.have_bytes.load(Relaxed),
downloaded_and_checked_bytes: downloaded_bytes,
downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed),
total_bytes: self.lengths.total_length(),
initially_needed_bytes: self.initially_needed_bytes,
remaining_bytes: remaining,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
peer_stats: self.peers.stats(),
}
Expand Down Expand Up @@ -750,6 +752,7 @@ impl TorrentStateLive {
chunk_tracker.mark_piece_broken(piece_id);
}
let have_bytes = chunk_tracker.calc_have_bytes();
let needed_bytes = chunk_tracker.calc_needed_bytes();

// g.chunks;
Ok(TorrentStatePaused {
Expand All @@ -758,6 +761,7 @@ impl TorrentStateLive {
filenames,
chunk_tracker,
have_bytes,
needed_bytes,
})
}

Expand Down
8 changes: 3 additions & 5 deletions crates/librqbit/src/torrent_state/live/stats/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats;

#[derive(Debug, Serialize, Default)]
pub struct StatsSnapshot {
pub have_bytes: u64,
pub downloaded_and_checked_bytes: u64,
pub downloaded_and_checked_pieces: u64,

pub fetched_bytes: u64,
pub uploaded_bytes: u64,
pub initially_needed_bytes: u64,
pub remaining_bytes: u64,
pub total_bytes: u64,

pub downloaded_and_checked_pieces: u64,
pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats,
}
Expand Down
14 changes: 10 additions & 4 deletions crates/librqbit/src/torrent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,20 @@ impl ManagedTorrent {
}
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.progress_bytes = p.have_bytes;
resp.finished = p.have_bytes == resp.total_bytes;
resp.total_bytes = p.chunk_tracker.get_total_selected_bytes();
resp.progress_bytes = resp.total_bytes - p.needed_bytes;
resp.finished = resp.progress_bytes == resp.total_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";
let live_stats = LiveStats::from(l.as_ref());
resp.progress_bytes = live_stats.snapshot.have_bytes;
resp.finished = resp.progress_bytes == resp.total_bytes;
let total = l.get_total_selected_bytes();
let remaining = l.get_left_to_download_bytes();
let progress = total - remaining;

resp.progress_bytes = progress;
resp.total_bytes = total;
resp.finished = remaining == 0;
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/torrent_state/paused.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct TorrentStatePaused {
pub(crate) filenames: Vec<PathBuf>,
pub(crate) chunk_tracker: ChunkTracker,
pub(crate) have_bytes: u64,
pub(crate) needed_bytes: u64,
}

// impl TorrentStatePaused {
Expand Down
47 changes: 21 additions & 26 deletions crates/rqbit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use librqbit::{
http_api::{HttpApi, HttpApiOptions},
http_api_client, librqbit_spawn,
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions},
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, ManagedTorrentState,
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse,
PeerConnectionOptions, Session, SessionOptions,
};
use size_format::SizeFormatterBinary as SF;
Expand Down Expand Up @@ -271,29 +271,23 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
loop {
session.with_torrents(|torrents| {
for (idx, torrent) in torrents {
let live = torrent.with_state(|s| {
match s {
ManagedTorrentState::Initializing(i) => {
let total = torrent.get_total_bytes();
let progress = i.get_checked_bytes();
let pct = (progress as f64 / total as f64) * 100f64;
info!("[{}] initializing {:.2}%", idx, pct)
},
ManagedTorrentState::Live(h) => return Some(h.clone()),
_ => {},
};
None
});
let handle = match live {
Some(live) => live,
None => continue
let stats = torrent.stats();
if stats.state == "initializing" {
let total = stats.total_bytes;
let progress = stats.progress_bytes;
let pct = (progress as f64 / total as f64) * 100f64;
info!("[{}] initializing {:.2}%", idx, pct);
continue;
}
let (live, live_stats) = match (torrent.live(), stats.live.as_ref()) {
(Some(live), Some(live_stats)) => (live, live_stats),
_ => continue
};
let stats = handle.stats_snapshot();
let down_speed = handle.down_speed_estimator();
let up_speed = handle.up_speed_estimator();
let down_speed = live.down_speed_estimator();
let up_speed = live.up_speed_estimator();
let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes;
let downloaded_pct = if stats.remaining_bytes == 0 {
let progress = stats.progress_bytes;
let downloaded_pct = if stats.finished {
100f64
} else {
(progress as f64 / total as f64) * 100f64
Expand All @@ -303,6 +297,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
Some(d) => format!(", ETA: {:?}", d),
None => String::new()
};
let peer_stats = &live_stats.snapshot.peer_stats;
info!(
"[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}}}",
idx,
Expand All @@ -311,11 +306,11 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
SF::new(total),
down_speed.mbps(),
up_speed.mbps(),
SF::new(stats.uploaded_bytes),
SF::new(live_stats.snapshot.uploaded_bytes),
eta,
stats.peer_stats.live + stats.peer_stats.connecting,
stats.peer_stats.queued,
stats.peer_stats.dead,
peer_stats.live + peer_stats.connecting,
peer_stats.queued,
peer_stats.dead,
);
}
});
Expand Down

0 comments on commit 325855b

Please sign in to comment.