Skip to content

Commit

Permalink
Merge pull request #43 from ikatson/desktop-configuration
Browse files Browse the repository at this point in the history
Desktop configuration
  • Loading branch information
ikatson authored Dec 7, 2023
2 parents dd355b0 + 25ca003 commit e480ca7
Show file tree
Hide file tree
Showing 32 changed files with 1,074 additions and 230 deletions.
9 changes: 6 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/dht/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit-dht"
version = "4.1.0"
version = "5.0.0-beta.1"
edition = "2021"
description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0"
Expand Down Expand Up @@ -32,10 +32,10 @@ futures = "0.3"
rand = "0.8"
indexmap = "2"
dashmap = {version = "5.5.3", features = ["serde"]}

clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.3.0"}
chrono = {version = "0.4.31", features = ["serde"]}
tokio-util = "0.7.10"

[dev-dependencies]
tracing-subscriber = "0.3"
1 change: 1 addition & 0 deletions crates/dht/examples/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let dht = DhtBuilder::new().await.context("error initializing DHT")?;

let mut stream = dht.get_peers(info_hash, None)?;

let stats_printer = async {
Expand Down
26 changes: 21 additions & 5 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};

use leaky_bucket::RateLimiter;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn};
use librqbit_core::{
id20::Id20,
peer_id::generate_peer_id,
spawn_utils::{spawn, spawn_with_cancel},
};
use parking_lot::RwLock;

use serde::Serialize;
Expand All @@ -35,6 +39,7 @@ use tokio::{
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};

use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, error_span, info, trace, warn, Instrument};

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -535,6 +540,8 @@ pub struct DhtState {
// This is to send raw messages
worker_sender: UnboundedSender<WorkerSendRequest>,

cancellation_token: CancellationToken,

pub(crate) peer_store: PeerStore,
}

Expand All @@ -545,6 +552,7 @@ impl DhtState {
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
peer_store: PeerStore,
cancellation_token: CancellationToken,
) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self {
Expand All @@ -556,6 +564,7 @@ impl DhtState {
listen_addr,
rate_limiter: make_rate_limiter(),
peer_store,
cancellation_token,
}
}

Expand Down Expand Up @@ -1124,13 +1133,18 @@ pub struct DhtConfig {
pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>,
pub peer_store: Option<PeerStore>,
pub cancellation_token: Option<CancellationToken>,
}

impl DhtState {
pub async fn new() -> anyhow::Result<Arc<Self>> {
Self::with_config(DhtConfig::default()).await
}
pub async fn with_config(config: DhtConfig) -> anyhow::Result<Arc<Self>> {
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}

pub async fn with_config(mut config: DhtConfig) -> anyhow::Result<Arc<Self>> {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
Expand All @@ -1151,21 +1165,23 @@ impl DhtState {
.bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());

let token = config.cancellation_token.take().unwrap_or_default();

let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
));

spawn(error_span!("dht"), {
spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await?;
Ok(())
worker.start(in_rx, &bootstrap_addrs).await
}
});
Ok(state)
Expand Down
79 changes: 48 additions & 31 deletions crates/dht/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...

use librqbit_core::directories::get_configuration_directory;
use librqbit_core::spawn_utils::spawn;
use librqbit_core::spawn_utils::spawn_with_cancel;
use serde::{Deserialize, Serialize};
use std::fs::OpenOptions;
use std::io::{BufReader, BufWriter};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio_util::sync::CancellationToken;

use anyhow::Context;
use tracing::{debug, error, error_span, info, trace, warn};
Expand Down Expand Up @@ -68,18 +69,27 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result<
}

impl PersistentDht {
pub async fn create(config: Option<PersistentDhtConfig>) -> anyhow::Result<Dht> {
pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
let dirs = get_configuration_directory("dht")?;
let path = dirs.cache_dir().join("dht.json");
Ok(path)
}

pub async fn create(
config: Option<PersistentDhtConfig>,
cancellation_token: Option<CancellationToken>,
) -> anyhow::Result<Dht> {
let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename,
None => {
let dirs = get_configuration_directory("dht")?;
let path = dirs.cache_dir().join("dht.json");
info!("will store DHT routing table to {:?} periodically", &path);
path
}
None => Self::default_persistence_filename()?,
};

info!(
"will store DHT routing table to {:?} periodically",
&config_filename
);

if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?;
Expand Down Expand Up @@ -117,34 +127,41 @@ impl PersistentDht {
routing_table,
listen_addr,
peer_store,
cancellation_token,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;

spawn(error_span!("dht_persistence"), {
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};

loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;

match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => debug!("dumped DHT to {:?}", &config_filename),
Err(e) => error!("error dumping DHT to {:?}: {:#}", &config_filename, e),
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};

loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;

match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => debug!("dumped DHT to {:?}", &config_filename),
Err(e) => {
error!("error dumping DHT to {:?}: {:#}", &config_filename, e)
}
}
}
}
}
});
},
);

Ok(dht)
}
}
7 changes: 4 additions & 3 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit"
version = "5.0.0-beta.0"
version = "5.0.0-beta.1"
authors = ["Igor Katson <[email protected]>"]
edition = "2021"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
Expand Down Expand Up @@ -28,7 +28,7 @@ librqbit-core = {path = "../librqbit_core", version = "3.3.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.1.0"}
dht = {path = "../dht", package="librqbit-dht", version="5.0.0-beta.1"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"}

tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
Expand Down Expand Up @@ -64,8 +64,9 @@ backoff = "0.4.0"
dashmap = "5.5.3"
base64 = "0.21.5"
serde_with = "3.4.0"
tokio-util = "0.7.10"

[dev-dependencies]
futures = {version = "0.3"}
tracing-subscriber = "0.3"
tokio-test = "0.4"
tokio-test = "0.4"
5 changes: 5 additions & 0 deletions crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub type Result<T> = std::result::Result<T, ApiError>;

/// Library API for use in different web frameworks.
/// Contains all methods you might want to expose with (de)serializable inputs/outputs.
#[derive(Clone)]
pub struct Api {
session: Arc<Session>,
rust_log_reload_tx: Option<UnboundedSender<String>>,
Expand All @@ -39,6 +40,10 @@ impl Api {
}
}

pub fn session(&self) -> &Arc<Session> {
&self.session
}

pub fn mgr_handle(&self, idx: TorrentId) -> Result<ManagedTorrentHandle> {
self.session
.get(idx)
Expand Down
11 changes: 11 additions & 0 deletions crates/librqbit/src/api_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ impl ApiError {
}
}

pub const fn new_from_text(status: StatusCode, text: &'static str) -> Self {
Self {
status: Some(status),
kind: ApiErrorKind::Text(text),
plaintext: false,
}
}

#[allow(dead_code)]
pub fn not_implemented(msg: &str) -> Self {
Self {
Expand Down Expand Up @@ -69,6 +77,7 @@ impl ApiError {
enum ApiErrorKind {
TorrentNotFound(usize),
DhtDisabled,
Text(&'static str),
Other(anyhow::Error),
}

Expand All @@ -91,6 +100,7 @@ impl Serialize for ApiError {
ApiErrorKind::TorrentNotFound(_) => "torrent_not_found",
ApiErrorKind::DhtDisabled => "dht_disabled",
ApiErrorKind::Other(_) => "internal_error",
ApiErrorKind::Text(_) => "internal_error",
},
human_readable: format!("{self}"),
status: self.status().as_u16(),
Expand Down Expand Up @@ -130,6 +140,7 @@ impl std::fmt::Display for ApiError {
ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"),
ApiErrorKind::Other(err) => write!(f, "{err:?}"),
ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"),
ApiErrorKind::Text(t) => write!(f, "{t}"),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/dht_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ mod tests {

let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap();

let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl HttpApi {
"GET /web/": "Web UI",
},
"server": "rqbit",
"version": env!("CARGO_PKG_VERSION"),
}))
}

Expand Down
Loading

0 comments on commit e480ca7

Please sign in to comment.