Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce incremental compile times a bit + small refactorings #86

Merged
merged 10 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ members = [
"crates/librqbit_core",
"crates/peer_binary_protocol",
"crates/dht",
"crates/upnp"
"crates/upnp",
"crates/tracker_comms",
]

[profile.dev]
Expand All @@ -22,4 +23,4 @@ debug = true

[profile.release-github]
inherits = "release"
debug = false
debug = false
90 changes: 49 additions & 41 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use anyhow::{bail, Context};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use bencode::ByteString;
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use futures::{
future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt,
};

use leaky_bucket::RateLimiter;
use librqbit_core::{
Expand Down Expand Up @@ -232,6 +234,7 @@ impl Drop for RequestPeersStream {
impl Stream for RequestPeersStream {
type Item = SocketAddr;

#[inline(never)]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand Down Expand Up @@ -1144,49 +1147,54 @@ impl DhtState {
&self.cancellation_token
}

pub async fn with_config(mut config: DhtConfig) -> anyhow::Result<Arc<Self>> {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
}?;

let listen_addr = socket
.local_addr()
.context("cannot determine UDP listen addr")?;
info!("DHT listening on {:?}", listen_addr);

let peer_id = config.peer_id.unwrap_or_else(generate_peer_id);
info!("starting up DHT with peer id {:?}", peer_id);
let bootstrap_addrs = config
.bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());

let token = config.cancellation_token.take().unwrap_or_default();

let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
));
#[inline(never)]
pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
async move {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
}?;

let listen_addr = socket
.local_addr()
.context("cannot determine UDP listen addr")?;
info!("DHT listening on {:?}", listen_addr);

let peer_id = config.peer_id.unwrap_or_else(generate_peer_id);
info!("starting up DHT with peer id {:?}", peer_id);
let bootstrap_addrs = config
.bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());

let token = config.cancellation_token.take().unwrap_or_default();

let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
));

spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await
}
});
Ok(state)
spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await
}
});
Ok(state)
}
.boxed()
}

#[inline(never)]
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,
Expand Down
170 changes: 90 additions & 80 deletions crates/dht/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...

use futures::future::BoxFuture;
use futures::FutureExt;
use librqbit_core::directories::get_configuration_directory;
use librqbit_core::spawn_utils::spawn_with_cancel;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -75,94 +77,102 @@ impl PersistentDht {
Ok(path)
}

pub async fn create(
#[inline(never)]
pub fn create(
config: Option<PersistentDhtConfig>,
cancellation_token: Option<CancellationToken>,
) -> anyhow::Result<Dht> {
let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename,
None => Self::default_persistence_filename()?,
};

info!(
filename=?config_filename,
"will store DHT routing table periodically",
);

if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?;
}
) -> BoxFuture<'static, anyhow::Result<Dht>> {
async move {
let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename,
None => Self::default_persistence_filename()?,
};

info!(
filename=?config_filename,
"will store DHT routing table periodically",
);

if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?;
}

let de = match OpenOptions::new().read(true).open(&config_filename) {
Ok(dht_json) => {
let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(reader) {
Ok(r) => {
info!(filename=?config_filename, "loaded DHT routing table from");
Some(r)
}
Err(e) => {
warn!(
filename=?config_filename,
"cannot deserialize routing table: {:#}",
e
);
None
let de = match OpenOptions::new().read(true).open(&config_filename) {
Ok(dht_json) => {
let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(
reader,
) {
Ok(r) => {
info!(filename=?config_filename, "loaded DHT routing table from");
Some(r)
}
Err(e) => {
warn!(
filename=?config_filename,
"cannot deserialize routing table: {:#}",
e
);
None
}
}
}
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => None,
_ => return Err(e).with_context(|| format!("error reading {config_filename:?}")),
},
};
let (listen_addr, routing_table, peer_store) = de
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());

let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
cancellation_token,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};

loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;

match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => trace!(filename=?config_filename, "dumped DHT"),
Err(e) => {
error!(filename=?config_filename, "error dumping DHT: {:#}", e)
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => None,
_ => {
return Err(e).with_context(|| format!("error reading {config_filename:?}"))
}
},
};
let (listen_addr, routing_table, peer_store) = de
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());

let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
cancellation_token,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};

loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;

match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => trace!(filename=?config_filename, "dumped DHT"),
Err(e) => {
error!(filename=?config_filename, "error dumping DHT: {:#}", e)
}
}
}
}
}
},
);
},
);

Ok(dht)
Ok(dht)
}
.boxed()
}
}
2 changes: 2 additions & 0 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rust-tls = ["reqwest/rustls-tls"]

[dependencies]
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
tracker_comms = {path = "../tracker_comms", default-features=false, package="librqbit-tracker-comms", version="1.0.0"}
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.5.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
Expand Down Expand Up @@ -68,6 +69,7 @@ serde_with = "3.4.0"
tokio-util = "0.7.10"
bytes = "1.5.0"
rlimit = "0.10.1"
async-stream = "0.3.5"

[dev-dependencies]
futures = {version = "0.3"}
Expand Down
Loading
Loading