diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 146645a..860ac7e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,10 +1,9 @@ # Contributing -I'm eager to hear all your feedback and suggestions! -Just open a [GitHub issue](https://github.com/manforowicz/gday/issues) -and include as many details as you can. -For example, try running with `--verbosity debug` or `--verbosity trace` -and paste the log into your issue. +Open a [GitHub issue](https://github.com/manforowicz/gday/issues) +to report issues and suggest features. +Try running with `--verbosity debug` or `--verbosity trace` +and pasting the log into your issue. ## Contributing code diff --git a/Cargo.toml b/Cargo.toml index bad44e4..0bcbca2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,6 @@ version = "0.2.1" [profile.dist] inherits = "release" lto = "thin" + +[profile.release] +debug = true \ No newline at end of file diff --git a/README.md b/README.md index 55fbb64..6b90130 100644 --- a/README.md +++ b/README.md @@ -4,53 +4,62 @@ Command line tool to securely send files (without a relay or port forwarding).
-peer_1: gday send image.jpg folder
-<Asks for confirmation>
-Tell your mate to run "gday get 1.1C30.C71E.A".
-Transfer complete.
+peer_1: gday send file.mp4 folder
+Tell your mate to run "gday get 1.n5xn8.wvqsf".
 
-peer_2: gday get 1.1C30.C71E.A
-<Asks for confirmation>
+peer_2: gday get 1.n5xn8.wvqsf
 Transfer complete.
 
-[![asciicast](https://asciinema.org/a/1jjPVyccHweqgwA5V3un4tCnU.svg)](https://asciinema.org/a/1jjPVyccHweqgwA5V3un4tCnU) +[![asciicast](https://asciinema.org/a/Z8OJJr8xHRAJh6fuqocNcm9Zu.svg)](https://asciinema.org/a/Z8OJJr8xHRAJh6fuqocNcm9Zu) ## Installation To run the executable directly: -1. Go to [releases](https://github.com/manforowicz/gday/releases) -and download the correct file for your platform. +1. Download an executable from [releases](https://github.com/manforowicz/gday/releases). 2. Extract it (on Linux: `tar xf `). 3. Run it: `./gday` To install with **cargo**: ``` -$ cargo install gday +cargo install gday ``` To install with **brew**: ``` -$ brew install manforowicz/tap/gday +brew install manforowicz/tap/gday ``` ## Features -- File transfer is always direct, without relay servers. -A server is only used to exchange socket addresses at the beginning. + - No limit on the size of files and folders sent. + +- Files are sent directly, without relay servers. +A server is only used to exchange socket addresses at the beginning. + +- Automatically resumes interrupted transfers. Just `gday send` the same files, and partial downloads will be detected and resumed. + - Doesn't require port forwarding. Instead, uses [TCP Hole Punching](https://bford.info/pub/net/p2pnat/) to traverse [NATs](https://en.wikipedia.org/wiki/Network_address_translation). -Note: this may not work on very restrictive NATs. -- Server connection encrypted with [TLS](https://docs.rs/rustls/) -and file transfer encrypted with [ChaCha20Poly1305](https://docs.rs/chacha20poly1305/). +This may not work on very restrictive NATs. If that happens, enable IPv6 or move to a different network. + +- If a contact exchange server is down, just uses a different one from the default list. Or specify your own with `--server`. + +- Server connection encrypted with +[TLS](https://en.wikipedia.org/wiki/Transport_Layer_Security) +and file transfer end-to-end encrypted with +[ChaCha20Poly1305](https://en.wikipedia.org/wiki/ChaCha20-Poly1305). + - Automatically tries both IPv4 and IPv6. + - Immune to malicious servers impersonating your peer. -Uses [SPAKE2](https://docs.rs/spake2/) password authenticated key exchange -to derive an encryption key from a shared secret. +Uses [SPAKE2](https://datatracker.ietf.org/doc/rfc9382/) to derive an +encryption key from a shared secret. + - No `unsafe` Rust in this repository. @@ -66,7 +75,7 @@ Commands: Options: -s, --server Use a custom gday server with this domain name -p, --port Connect to a custom server port - -u, --unencrypted Use raw TCP without TLS + -u, --unencrypted Use TCP without TLS -v, --verbosity Verbosity. (trace, debug, info, warn, error) [default: warn] -h, --help Print help -V, --version Print version diff --git a/gday/Cargo.toml b/gday/Cargo.toml index 8416e9d..5fb8b7e 100644 --- a/gday/Cargo.toml +++ b/gday/Cargo.toml @@ -22,4 +22,4 @@ gday_hole_punch = { version = "^0.2.1", path = "../gday_hole_punch" } indicatif = "0.17.9" log = "0.4.22" owo-colors = "4.1.0" -tokio = { version = "1.41.1", features = ["io-std", "rt-multi-thread"] } +tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } diff --git a/gday/src/dialog.rs b/gday/src/dialog.rs index 49b4f7c..1615f4a 100644 --- a/gday/src/dialog.rs +++ b/gday/src/dialog.rs @@ -3,13 +3,15 @@ use gday_file_transfer::{FileOfferMsg, FileResponseMsg}; use indicatif::HumanBytes; use owo_colors::OwoColorize; -use std::{io::Write, path::Path}; -use tokio::io::{AsyncBufReadExt, BufReader}; +use std::{ + io::{BufRead, Write}, + path::Path, +}; /// Confirms that the user wants to send these `files``. /// /// If not, returns false. -pub async fn confirm_send(files: &FileOfferMsg) -> std::io::Result { +pub fn confirm_send(files: &FileOfferMsg) -> std::io::Result { // print all the file names and sizes println!("{}", "Files to send:".bold()); for file in &files.files { @@ -25,13 +27,12 @@ pub async fn confirm_send(files: &FileOfferMsg) -> std::io::Result { HumanBytes(total_size).bold() ); std::io::stdout().flush()?; - let input = get_lowercase_input().await?; + let input = get_lowercase_input()?; // act on user choice if "yes".starts_with(&input) { Ok(true) } else { - println!("Cancelled."); Ok(false) } } @@ -39,7 +40,7 @@ pub async fn confirm_send(files: &FileOfferMsg) -> std::io::Result { /// Asks the user which of the files in `offer` to accept. /// /// `save_dir` is the directory where the files will later be saved. -pub async fn ask_receive( +pub fn ask_receive( offer: &FileOfferMsg, save_dir: &Path, ) -> Result { @@ -51,7 +52,7 @@ pub async fn ask_receive( print!("{} ({})", file.short_path.display(), HumanBytes(file.len)); // an interrupted download exists - if let Some(local_len) = file.partial_download_exists(save_dir).await? { + if let Some(local_len) = file.partial_download_exists(save_dir)? { let remaining_len = file.len - local_len; print!( @@ -62,7 +63,7 @@ pub async fn ask_receive( ); // file was already downloaded - } else if file.already_exists(save_dir).await? { + } else if file.already_exists(save_dir)? { print!(" {}", "ALREADY EXISTS".green().bold()); } println!(); @@ -70,8 +71,9 @@ pub async fn ask_receive( println!(); - let new_files = FileResponseMsg::accept_only_new_and_interrupted(offer, save_dir).await?; + let new_files = FileResponseMsg::accept_only_new_and_interrupted(offer, save_dir)?; let all_files = FileResponseMsg::accept_all_files(offer); + let no_files = FileResponseMsg::reject_all_files(offer); // If there are no existing/interrupted files, // send or quit. @@ -79,16 +81,15 @@ pub async fn ask_receive( print!( "Download all {} files ({})? (y/n): ", all_files.get_num_fully_accepted(), - HumanBytes(offer.get_transfer_size(&new_files)?).bold() + HumanBytes(offer.get_transfer_size(&all_files)?).bold() ); std::io::stdout().flush()?; - let input = get_lowercase_input().await?; + let input = get_lowercase_input()?; if "yes".starts_with(&input) { return Ok(all_files); } else { - println!("Cancelled."); - std::process::exit(0); + return Ok(no_files); } } @@ -97,17 +98,33 @@ pub async fn ask_receive( all_files.response.len(), HumanBytes(offer.get_transfer_size(&all_files)?).bold() ); - println!( - "2. Download only the {} new files, and resume {} interrupted downloads ({}).", - new_files.get_num_fully_accepted(), - new_files.get_num_partially_accepted(), - HumanBytes(offer.get_transfer_size(&new_files)?).bold() - ); + + if new_files.get_num_partially_accepted() == 0 { + println!( + "2. Only download the {} new files ({}).", + new_files.get_num_fully_accepted(), + HumanBytes(offer.get_transfer_size(&new_files)?).bold() + ); + } else if new_files.get_num_fully_accepted() == 0 { + println!( + "2. Only resume the {} interrupted downloads ({}).", + new_files.get_num_partially_accepted(), + HumanBytes(offer.get_transfer_size(&new_files)?).bold() + ); + } else { + println!( + "2. Only download the {} new files, and resume {} interrupted downloads ({}).", + new_files.get_num_fully_accepted(), + new_files.get_num_partially_accepted(), + HumanBytes(offer.get_transfer_size(&new_files)?).bold() + ); + } + println!("3. Cancel."); print!("{} ", "Choose an option (1, 2, or 3):".bold()); std::io::stdout().flush()?; - match get_lowercase_input().await?.as_str() { + match get_lowercase_input()?.as_str() { // all files "1" => Ok(all_files), // new/interrupted files @@ -118,18 +135,14 @@ pub async fn ask_receive( } /// Reads a trimmed ascii-lowercase line of input from the user. -async fn get_lowercase_input() -> std::io::Result { - let Some(response) = BufReader::new(tokio::io::stdin()) - .lines() - .next_line() - .await? - else { +fn get_lowercase_input() -> std::io::Result { + let Some(response) = std::io::BufReader::new(std::io::stdin()).lines().next() else { return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "Couldn't read user input.", )); }; - let response = response.trim().to_ascii_lowercase(); + let response = response?.trim().to_ascii_lowercase(); Ok(response) } diff --git a/gday/src/main.rs b/gday/src/main.rs index 4dc73f7..06ab06e 100644 --- a/gday/src/main.rs +++ b/gday/src/main.rs @@ -56,8 +56,8 @@ enum Command { #[arg(short, long, conflicts_with = "length")] code: Option, - /// Length of the last 2 sections of the randomly-generated shareable code. - #[arg(short, long, default_value = "4", conflicts_with = "code")] + /// Length of room_code and shared_secret to generate. + #[arg(short, long, default_value = "5", conflicts_with = "code")] length: usize, /// Files and/or directories to send. @@ -124,8 +124,8 @@ async fn run(args: crate::Args) -> Result<(), Box> { length, } => { // If the user chose a custom server - let (mut server_connection, server_id) = if let Some(forced_server) = custom_server { - (forced_server, 0) + let (mut server_connection, server_id) = if let Some(custom_server) = custom_server { + (custom_server, 0) // If the user chose a custom code } else if let Some(code) = &code { @@ -158,12 +158,12 @@ async fn run(args: crate::Args) -> Result<(), Box> { }; // get metadata about the files to transfer - let local_files = gday_file_transfer::get_file_metas(&paths).await?; + let local_files = gday_file_transfer::get_file_metas(&paths)?; let offer_msg = FileOfferMsg::from(local_files.clone()); // confirm the user wants to send these files - if !dialog::confirm_send(&offer_msg).await? { - // Send aborted + if !dialog::confirm_send(&offer_msg)? { + println!("Cancelled."); return Ok(()); } @@ -204,7 +204,7 @@ async fn run(args: crate::Args) -> Result<(), Box> { println!("File offer sent to mate. Waiting on response."); - // receive file offer from peer + // receive response from peer let response: FileResponseMsg = read_from_async(&mut stream).await?; // Total number of files accepted @@ -226,12 +226,15 @@ async fn run(args: crate::Args) -> Result<(), Box> { if num_accepted != 0 { transfer::send_files(local_files, response, &mut stream).await?; } + + // Gracefully close the server connection + server_connection.shutdown().await?; } // receiving files crate::Command::Get { path, code } => { - let mut server_connection = if let Some(forced_server) = custom_server { - forced_server + let mut server_connection = if let Some(custom_server) = custom_server { + custom_server } else { server_connector::connect_to_server_id( DEFAULT_SERVERS, @@ -268,7 +271,7 @@ async fn run(args: crate::Args) -> Result<(), Box> { // receive file offer from peer let offer: FileOfferMsg = read_from_async(&mut stream).await?; - let response = ask_receive(&offer, &path).await?; + let response = ask_receive(&offer, &path)?; // respond to the file offer write_to_async(&response, &mut stream).await?; @@ -278,6 +281,9 @@ async fn run(args: crate::Args) -> Result<(), Box> { } else { transfer::receive_files(offer, response, &path, &mut stream).await?; } + + // Gracefully close the server connection + server_connection.shutdown().await?; } } diff --git a/gday/src/transfer.rs b/gday/src/transfer.rs index b1030ff..fdbc558 100644 --- a/gday/src/transfer.rs +++ b/gday/src/transfer.rs @@ -8,15 +8,16 @@ pub async fn send_files( response: FileResponseMsg, writer: &mut EncryptedStream, ) -> Result<(), Box> { - let progress_bar = create_progress_bar(); + let len = FileOfferMsg::from(offer.clone()).get_transfer_size(&response)?; + let progress_bar = create_progress_bar(len); let mut current_file = String::from("Starting..."); let update_progress = |report: &TransferReport| { progress_bar.set_position(report.processed_bytes); - progress_bar.set_length(report.total_bytes); if current_file.as_str() != report.current_file.to_string_lossy() { - current_file = report.current_file.to_string_lossy().to_string(); - progress_bar.set_message(format!("Receiving {}", current_file)); + current_file.clear(); + current_file.push_str(&report.current_file.to_string_lossy()); + progress_bar.set_message(format!("Sending {}", current_file)); } }; @@ -26,7 +27,7 @@ pub async fn send_files( Ok(()) } Err(err) => { - progress_bar.abandon_with_message("Transfer failed."); + progress_bar.abandon_with_message("Send failed."); Err(err.into()) } } @@ -42,14 +43,15 @@ pub async fn receive_files( save_dir: &std::path::Path, reader: &mut EncryptedStream, ) -> Result<(), Box> { - let progress_bar = create_progress_bar(); - let mut current_file = String::from("Starting..."); + let len = offer.get_transfer_size(&response)?; + let progress_bar = create_progress_bar(len); + let mut current_file = String::new(); let update_progress = |report: &TransferReport| { progress_bar.set_position(report.processed_bytes); - progress_bar.set_length(report.total_bytes); if current_file.as_str() != report.current_file.to_string_lossy() { - current_file = report.current_file.to_string_lossy().to_string(); + current_file.clear(); + current_file.push_str(&report.current_file.to_string_lossy()); progress_bar.set_message(format!("Receiving {}", current_file)); } }; @@ -64,20 +66,20 @@ pub async fn receive_files( Ok(()) } Err(err) => { - progress_bar.abandon_with_message("Transfer failed."); + progress_bar.abandon_with_message("Receive failed."); Err(err.into()) } } } /// Create a stylded [`ProgressBar`]. -fn create_progress_bar() -> ProgressBar { +fn create_progress_bar(len: u64) -> ProgressBar { let style = ProgressStyle::with_template( "{msg} [{wide_bar}] {bytes}/{total_bytes} | {bytes_per_sec} | eta: {eta}", ) .expect("Progress bar style string was invalid."); let draw = ProgressDrawTarget::stderr_with_hz(2); - ProgressBar::with_draw_target(None, draw) + ProgressBar::with_draw_target(Some(len), draw) .with_style(style) .with_message("starting...") } diff --git a/gday_contact_exchange_protocol/src/lib.rs b/gday_contact_exchange_protocol/src/lib.rs index e544153..4bd5f30 100644 --- a/gday_contact_exchange_protocol/src/lib.rs +++ b/gday_contact_exchange_protocol/src/lib.rs @@ -207,7 +207,7 @@ impl Display for ServerMsg { Self::PeerContact(c) => write!(f, "The server says your peer's contact is {c}."), Self::ErrorRoomTaken => write!( f, - "Can't create room with this code, because it's already taken." + "Can't create a room with this room code, because it's already taken." ), Self::ErrorPeerTimedOut => write!( f, diff --git a/gday_encryption/Cargo.toml b/gday_encryption/Cargo.toml index 7d84eee..a64de73 100644 --- a/gday_encryption/Cargo.toml +++ b/gday_encryption/Cargo.toml @@ -20,7 +20,7 @@ rand = "0.8.5" tokio = { version = "1.41.1", features = ["io-util"] } [dev-dependencies] -criterion = { version = "0.5.1", features = ["async_tokio", "tokio"] } +criterion = { version = "0.5.1", features = ["async_tokio"] } tokio = { version = "1.41.1", features = ["net", "rt", "macros"] } [[bench]] diff --git a/gday_encryption/src/helper_buf.rs b/gday_encryption/src/helper_buf.rs index ba06185..a304058 100644 --- a/gday_encryption/src/helper_buf.rs +++ b/gday_encryption/src/helper_buf.rs @@ -53,9 +53,11 @@ impl HelperBuf { /// Shifts the stored data to the beginning of the internal buffer. /// Maximizes `spare_capacity_len()` without changing anything else. pub fn left_align(&mut self) { - self.inner.copy_within(self.l_cursor..self.r_cursor, 0); - self.r_cursor -= self.l_cursor; - self.l_cursor = 0; + if self.l_cursor != 0 { + self.inner.copy_within(self.l_cursor..self.r_cursor, 0); + self.r_cursor -= self.l_cursor; + self.l_cursor = 0; + } } /// Returns a mutable [`aead::Buffer`] view into the part of this diff --git a/gday_encryption/src/lib.rs b/gday_encryption/src/lib.rs index 5d3c1e6..d684507 100644 --- a/gday_encryption/src/lib.rs +++ b/gday_encryption/src/lib.rs @@ -99,7 +99,8 @@ pub struct EncryptedStream { /// Data to be sent. Encrypted only when [`Self::flushing`]. /// - Invariant: the first 2 bytes are always - /// reserved for the length header + /// reserved for the length + /// - Invariant: Data can only be appended when `flushing` is false. to_send: HelperBuf, /// Is the content of `to_send` encrypted and ready to write? @@ -113,7 +114,7 @@ impl EncryptedStream { /// - The `key` must be a cryptographically random secret. /// - The `nonce` shouldn't be reused, but doesn't need to be secret. /// - /// - See [`Self::encrypt_connection()`] if you'd like an auto-generated nonce. + /// - See [`Self::encrypt_connection()`] if you'd like an auto-generatcan't createed nonce. pub fn new(io_stream: T, key: &[u8; 32], nonce: &[u8; 7]) -> Self { let mut to_send = HelperBuf::with_capacity(u16::MAX as usize + 2); // add 2 bytes for length header to uphold invariant @@ -170,7 +171,7 @@ impl AsyncRead for EncryptedStream { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { // if we're out of decrypted data, read more - if self.as_mut().decrypted.is_empty() { + if self.decrypted.is_empty() { ready!(self.as_mut().inner_read(cx))?; } @@ -183,7 +184,7 @@ impl AsyncRead for EncryptedStream { } } -impl AsyncBufRead for EncryptedStream { +impl AsyncBufRead for EncryptedStream { fn consume(self: std::pin::Pin<&mut EncryptedStream>, amt: usize) { self.project().decrypted.consume(amt); } @@ -219,9 +220,10 @@ impl AsyncWrite for EncryptedStream { .extend_from_slice(&buf[0..bytes_taken]) .expect("unreachable"); - // if `to_send` is full, flush it + // if `to_send` is full, start the process + // of flushing it if me.to_send.spare_capacity().len() - TAG_SIZE == 0 { - ready!(self.flush_write_buf(cx))?; + let _ = self.flush_write_buf(cx)?; } Poll::Ready(Ok(bytes_taken)) } @@ -310,6 +312,7 @@ impl EncryptedStream { // If we're just starting a flush, // encrypt the data. if !*me.flushing { + *me.flushing = true; // encrypt in place let mut msg = me.to_send.split_off_aead_buf(2); me.encryptor diff --git a/gday_encryption/tests/test_integration.rs b/gday_encryption/tests/test_integration.rs index 0590160..1719bdf 100644 --- a/gday_encryption/tests/test_integration.rs +++ b/gday_encryption/tests/test_integration.rs @@ -38,6 +38,9 @@ async fn test_transfers() { stream_a.write_all(chunk).await.unwrap(); stream_a.flush().await.unwrap(); } + // Ensure calling shutdown multiple times works + stream_a.shutdown().await.unwrap(); + stream_a.shutdown().await.unwrap(); }); // Stream that will receive the test data sent to the loopback address. @@ -52,6 +55,10 @@ async fn test_transfers() { stream_b.read_exact(&mut received).await.unwrap(); assert_eq!(*chunk, received); } + + // EOF should return 0 + assert_eq!(stream_b.read(&mut [0, 0, 0]).await.unwrap(), 0); + assert_eq!(stream_b.read(&mut [0, 0, 0]).await.unwrap(), 0); } /// Test bufread @@ -88,6 +95,9 @@ async fn test_bufread() { stream_a.write_all(chunk).await.unwrap(); stream_a.flush().await.unwrap(); } + + stream_a.shutdown().await.unwrap(); + stream_a.shutdown().await.unwrap(); }); // Stream that will receive the test data sent to the loopback address. @@ -105,6 +115,10 @@ async fn test_bufread() { assert_ne!(bytes_read, 0); } assert_eq!(received, bytes); + + // EOF should return 0 + assert_eq!(stream_b.read(&mut [0, 0, 0]).await.unwrap(), 0); + assert_eq!(stream_b.read(&mut [0, 0, 0]).await.unwrap(), 0); } /// Confirm there are no infinite loops on EOF diff --git a/gday_file_transfer/Cargo.toml b/gday_file_transfer/Cargo.toml index d580524..f9152c8 100644 --- a/gday_file_transfer/Cargo.toml +++ b/gday_file_transfer/Cargo.toml @@ -14,14 +14,13 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -futures = "0.3.31" os_str_bytes = "7.0.0" pin-project = "1.1.7" -rand = "0.8.5" serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" thiserror = "2.0.3" -tokio = { version = "1.41.1", features = ["fs", "net", "io-util", "rt", "macros"] } +tokio = { version = "1.41.1", features = ["io-util"] } [dev-dependencies] tempfile = "3.14.0" +tokio = { version = "1.41.1", features = ["macros"] } diff --git a/gday_file_transfer/src/file_meta.rs b/gday_file_transfer/src/file_meta.rs index ea6fa6b..bb7fe58 100644 --- a/gday_file_transfer/src/file_meta.rs +++ b/gday_file_transfer/src/file_meta.rs @@ -1,5 +1,4 @@ use crate::Error; -use futures::{future::BoxFuture, FutureExt}; use os_str_bytes::OsStrBytesExt; use serde::{Deserialize, Serialize}; use std::{ @@ -48,9 +47,9 @@ impl FileMeta { /// /// If all of these (up to `" (99)"`) are occupied, /// returns [`Error::FilenameOccupied`]. - pub async fn get_unoccupied_save_path(&self, save_dir: &Path) -> Result { + pub fn get_unoccupied_save_path(&self, save_dir: &Path) -> Result { let mut path = self.get_save_path(save_dir); - let number = get_first_unoccupied_number(&path).await?; + let number = get_first_unoccupied_number(&path)?; if number != 0 { suffix_with_number(&mut path, number); @@ -68,12 +67,9 @@ impl FileMeta { /// will be one less than that of /// [`Self::get_unoccupied_save_path()`] (or no suffix /// if [`Self::get_unoccupied_save_path()`] has suffix of 1). - pub async fn get_last_occupied_save_path( - &self, - save_dir: &Path, - ) -> Result, Error> { + pub fn get_last_occupied_save_path(&self, save_dir: &Path) -> Result, Error> { let mut path = self.get_save_path(save_dir); - let number = get_first_unoccupied_number(&path).await?; + let number = get_first_unoccupied_number(&path)?; if number == 0 { Ok(None) @@ -88,8 +84,8 @@ impl FileMeta { /// Returns `true` iff a file is already saved at /// [`Self::get_last_occupied_save_path()`] /// with the same length as [`Self::len`]. - pub async fn already_exists(&self, save_dir: &Path) -> Result { - if let Some(occupied) = self.get_last_occupied_save_path(save_dir).await? { + pub fn already_exists(&self, save_dir: &Path) -> Result { + if let Some(occupied) = self.get_last_occupied_save_path(save_dir)? { if let Ok(metadata) = occupied.metadata() { if metadata.is_file() && metadata.len() == self.len { return Ok(true); @@ -119,13 +115,13 @@ impl FileMeta { /// already exists and has a length smaller than [`Self::len`]. /// If so, returns the length of the partially downloaded file. /// If it doesn't exist, returns None. - pub async fn partial_download_exists(&self, save_dir: &Path) -> Result, Error> { + pub fn partial_download_exists(&self, save_dir: &Path) -> Result, Error> { let local_path = self.get_partial_download_path(save_dir)?; // check if the file can be opened - if let Ok(file) = tokio::fs::File::open(local_path).await { + if let Ok(file) = std::fs::File::open(local_path) { // check if its length is less than the meta length - if let Ok(local_meta) = file.metadata().await { + if let Ok(local_meta) = file.metadata() { let local_len = local_meta.len(); if local_len < self.len { return Ok(Some(local_len)); @@ -151,7 +147,7 @@ impl From for FileMeta { /// Otherwise, returns the smallest number, starting at 1, that /// when suffixed to `path` (using [`suffix_with_number()`]), /// gives an unoccupied path. -async fn get_first_unoccupied_number(path: &Path) -> Result { +fn get_first_unoccupied_number(path: &Path) -> Result { // if the file doesn't exist if !path.exists() { return Ok(0); @@ -202,7 +198,7 @@ fn suffix_with_number(path: &mut PathBuf, number: u32) { /// /// Each file's [`FileMeta::short_path`] will contain the path to the file, /// starting at the provided level, ignoring parent directories. -pub async fn get_file_metas(paths: &[PathBuf]) -> Result, Error> { +pub fn get_file_metas(paths: &[PathBuf]) -> Result, Error> { // canonicalize the paths to remove symlinks let paths = paths .iter() @@ -240,7 +236,7 @@ pub async fn get_file_metas(paths: &[PathBuf]) -> Result, Err let top_path = path.parent().unwrap_or(Path::new("")); // add all files in this path to the files set - get_file_metas_helper(top_path, &path, &mut files).await?; + get_file_metas_helper(top_path, &path, &mut files)?; } // build a vec from the set, and return @@ -251,41 +247,38 @@ pub async fn get_file_metas(paths: &[PathBuf]) -> Result, Err /// `top_path` from all paths. `top_path` must be a prefix of `path`. /// - `path` is the file or directory where recursive traversal begins. /// - `files` is a [`Vec`] to which found files will be inserted. -fn get_file_metas_helper<'a>( - top_path: &'a Path, - path: &'a Path, - files: &'a mut Vec, -) -> BoxFuture<'a, std::io::Result<()>> { - async move { - if path.is_dir() { - // recursively traverse subdirectories - let mut entries = tokio::fs::read_dir(path).await?; - while let Some(entry) = entries.next_entry().await? { - get_file_metas_helper(top_path, &entry.path(), files).await?; - } - } else if path.is_file() { - // return an error if a file couldn't be opened. - std::fs::File::open(path)?; - - // get the shortened path - let short_path = path - .strip_prefix(top_path) - .expect("`top_path` was not a prefix of `path`.") - .to_path_buf(); - - // get the file's size - let len = path.metadata()?.len(); - - // insert this file metadata into set - let meta = FileMetaLocal { - local_path: path.to_path_buf(), - short_path, - len, - }; - files.push(meta); +fn get_file_metas_helper( + top_path: &Path, + path: &Path, + files: &mut Vec, +) -> std::io::Result<()> { + if path.is_dir() { + // recursively traverse subdirectories + let entries = std::fs::read_dir(path)?; + for entry in entries { + get_file_metas_helper(top_path, &entry?.path(), files)?; } - - Ok(()) + } else if path.is_file() { + // return an error if a file couldn't be opened. + std::fs::File::open(path)?; + + // get the shortened path + let short_path = path + .strip_prefix(top_path) + .expect("`top_path` was not a prefix of `path`.") + .to_path_buf(); + + // get the file's size + let len = path.metadata()?.len(); + + // insert this file metadata into set + let meta = FileMetaLocal { + local_path: path.to_path_buf(), + short_path, + len, + }; + files.push(meta); } - .boxed() + + Ok(()) } diff --git a/gday_file_transfer/src/lib.rs b/gday_file_transfer/src/lib.rs index ad2aa25..3eed3ec 100644 --- a/gday_file_transfer/src/lib.rs +++ b/gday_file_transfer/src/lib.rs @@ -23,11 +23,13 @@ //! # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); //! # rt.block_on( async { //! -//! # let (mut stream1, mut stream2) = tokio::io::duplex(64); -//! # +//! # let (stream1, stream2) = tokio::io::duplex(64); +//! # let mut stream1 = tokio::io::BufReader::new(stream1); +//! # let mut stream2 = tokio::io::BufReader::new(stream2); +//! //! // Peer A offers files and folders they'd like to send //! let paths_to_send = ["folder/to/send/".into(), "a/file.txt".into()]; -//! let files_to_send = get_file_metas(&paths_to_send).await?; +//! let files_to_send = get_file_metas(&paths_to_send)?; //! let offer_msg = FileOfferMsg::from(files_to_send.clone()); //! write_to_async(offer_msg, &mut stream1).await?; //! @@ -36,7 +38,7 @@ //! let response_msg = FileResponseMsg::accept_only_new_and_interrupted( //! &offer_msg, //! Path::new("save/the/files/here/"), -//! ).await?; +//! )?; //! write_to_async(response_msg, &mut stream2).await?; //! //! // Peer A sends the accepted files diff --git a/gday_file_transfer/src/offer.rs b/gday_file_transfer/src/offer.rs index bb86605..4cd68e2 100644 --- a/gday_file_transfer/src/offer.rs +++ b/gday_file_transfer/src/offer.rs @@ -94,14 +94,14 @@ impl FileResponseMsg { /// by partially accepting files. /// /// Rejects all other files. - pub async fn accept_only_full_new_files( + pub fn accept_only_full_new_files( offer: &FileOfferMsg, save_dir: &Path, ) -> Result { let mut response = Vec::with_capacity(offer.files.len()); for file_meta in &offer.files { - if file_meta.already_exists(save_dir).await? { + if file_meta.already_exists(save_dir)? { // reject response.push(None); } else { @@ -119,16 +119,16 @@ impl FileResponseMsg { /// or have a different size. /// /// Rejects all other files. - pub async fn accept_only_new_and_interrupted( + pub fn accept_only_new_and_interrupted( offer: &FileOfferMsg, save_dir: &Path, ) -> Result { let mut response = Vec::with_capacity(offer.files.len()); for offered in &offer.files { - if let Some(existing_size) = offered.partial_download_exists(save_dir).await? { + if let Some(existing_size) = offered.partial_download_exists(save_dir)? { response.push(Some(existing_size)); - } else if offered.already_exists(save_dir).await? { + } else if offered.already_exists(save_dir)? { response.push(None); } else { response.push(Some(0)); diff --git a/gday_file_transfer/src/transfer.rs b/gday_file_transfer/src/transfer.rs index dbee2c4..78f537c 100644 --- a/gday_file_transfer/src/transfer.rs +++ b/gday_file_transfer/src/transfer.rs @@ -1,15 +1,13 @@ use tokio::io::{ - AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, }; use crate::{Error, FileMeta, FileMetaLocal, FileOfferMsg, FileResponseMsg}; -use std::io::SeekFrom; +use std::io::{ErrorKind, Seek, SeekFrom}; use std::path::Path; use std::pin::{pin, Pin}; use std::task::{ready, Context, Poll}; -const FILE_BUFFER_SIZE: usize = 1_000_000; - /// Holds the status of a file transfer #[derive(Debug, Clone)] pub struct TransferReport { @@ -51,28 +49,28 @@ pub async fn send_files( } // Wrap the writer to report progress over `progress_tx` - let mut writer = ProgressWrapper::new( - BufWriter::with_capacity(FILE_BUFFER_SIZE, writer), - total_bytes, - files.len() as u64, - progress_callback, - ); + let mut writer = + ProgressWrapper::new(writer, total_bytes, files.len() as u64, progress_callback); + + // 64 KiB copy buffer + let mut buf = vec![0; 0x10000]; // iterate over all the files for (offer, start) in files { // report the file path writer.progress.current_file.clone_from(&offer.short_path); - let mut file = tokio::fs::File::open(&offer.local_path).await?; + let mut file = std::fs::File::open(&offer.local_path)?; // confirm file length matches metadata length - if file.metadata().await?.len() != offer.len { + if file.metadata()?.len() != offer.len { return Err(Error::UnexpectedFileLen); } // copy the file into the writer - file.seek(SeekFrom::Start(start)).await?; - tokio::io::copy(&mut file, &mut writer).await?; + file.seek(SeekFrom::Start(start))?; + + file_to_net(&mut file, &mut writer, offer.len - start, &mut buf).await?; // report the number of processed files writer.progress.processed_files += 1; @@ -97,7 +95,7 @@ pub async fn receive_files( offer: &FileOfferMsg, response: &FileResponseMsg, save_path: &Path, - reader: impl AsyncRead, + reader: impl AsyncBufRead, progress_callback: impl FnMut(&TransferReport), ) -> Result<(), Error> { let reader = pin!(reader); @@ -118,12 +116,8 @@ pub async fn receive_files( } // Wrap the reader to report progress over `progress_tx` - let mut reader = ProgressWrapper::new( - tokio::io::BufReader::with_capacity(FILE_BUFFER_SIZE, reader), - total_bytes, - files.len() as u64, - progress_callback, - ); + let mut reader = + ProgressWrapper::new(reader, total_bytes, files.len() as u64, progress_callback); // iterate over all the files for (offer, start) in files { @@ -137,40 +131,95 @@ pub async fn receive_files( if start == 0 { // create a directory and TMP file if let Some(parent) = tmp_path.parent() { - tokio::fs::create_dir_all(parent).await?; + std::fs::create_dir_all(parent)?; } - let mut file = tokio::fs::File::create(&tmp_path).await?; - - // only take the length of the file from the reader - let mut reader = (&mut reader).take(offer.len); + let mut file = std::fs::File::create(&tmp_path)?; // copy from the reader into the file - tokio::io::copy(&mut reader, &mut file).await?; + net_to_file(&mut reader, &mut file, offer.len).await?; // resume interrupted download } else { // open the partially downloaded file in append mode - let mut file = tokio::fs::OpenOptions::new() - .append(true) - .open(&tmp_path) - .await?; - if file.metadata().await?.len() != start { + let mut file = std::fs::OpenOptions::new().append(true).open(&tmp_path)?; + if file.metadata()?.len() != start { return Err(Error::UnexpectedFileLen); } // only take the length of the remaining part of the file from the reader let mut reader = (&mut reader).take(offer.len - start); - // copy from the reader into the file - tokio::io::copy(&mut reader, &mut file).await?; + net_to_file(&mut reader, &mut file, offer.len - start).await?; } reader.progress.processed_files += 1; - tokio::fs::rename(tmp_path, offer.get_unoccupied_save_path(save_path).await?).await?; + std::fs::rename(tmp_path, offer.get_unoccupied_save_path(save_path)?)?; } Ok(()) } +/// We're using this instead of [`tokio::io::copy()`]. +/// +/// [`tokio::io::copy()`] spawns a task on a thread +/// during every file read/write. This occurs 1000s of times, +/// introducing unnecessary overhead. +/// +/// This function is similar, but uses standard blocking +/// reads from `src`. This is made on the assumption that each read +/// won't block everything for too long, so this +/// function should still be cancellable. +async fn file_to_net( + mut src: impl std::io::Read, + mut dst: impl tokio::io::AsyncWrite + Unpin, + mut amt: u64, + buf: &mut [u8], +) -> std::io::Result<()> { + while amt > 0 { + let to_read = std::cmp::min(amt, buf.len() as u64) as usize; + let bytes_read = src.read(&mut buf[0..to_read])?; + if bytes_read == 0 { + return Err(std::io::Error::new( + ErrorKind::UnexpectedEof, + "Peer interrupted transfer.", + )); + } + amt -= bytes_read as u64; + dst.write_all(&buf[0..to_read]).await?; + } + Ok(()) +} + +/// We're using this instead of [`tokio::io::copy_buf()`]. +/// +/// [`tokio::io::copy_buf()`] spawns a task on a thread +/// during every file read/write. This occurs 1000s of times, +/// introducing unnecessary overhead. +/// +/// This function is similar, but uses standard blocking +/// writes to `dst`. This is made on the assumption that each write +/// won't block everything for too long, so this +/// function should still be cancellable. +async fn net_to_file( + mut src: impl tokio::io::AsyncBufRead + Unpin, + mut dst: impl std::io::Write, + mut amt: u64, +) -> std::io::Result<()> { + while amt > 0 { + let buf = src.fill_buf().await?; + if buf.is_empty() { + return Err(std::io::Error::new( + ErrorKind::UnexpectedEof, + "Peer interrupted transfer.", + )); + } + let to_write = std::cmp::min(amt, buf.len() as u64) as usize; + let written = dst.write(&buf[0..to_write])?; + src.consume(written); + amt -= written as u64; + } + Ok(()) +} + /// Wraps an IO stream. Calls `progress_callback` on each /// read/write to report progress. #[pin_project::pin_project] diff --git a/gday_file_transfer/tests/test_file_meta.rs b/gday_file_transfer/tests/test_file_meta.rs index 5ef4d4a..6f42c9d 100644 --- a/gday_file_transfer/tests/test_file_meta.rs +++ b/gday_file_transfer/tests/test_file_meta.rs @@ -29,19 +29,18 @@ async fn test_file_meta_1() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz")); // unoccupied path should increment the appended number by one - let save_path = file_meta.get_unoccupied_save_path(dir_path).await.unwrap(); + let save_path = file_meta.get_unoccupied_save_path(dir_path).unwrap(); assert_eq!(save_path, dir_path.join("fol der/file (2).tar.gz")); // last occupied path let save_path = file_meta .get_last_occupied_save_path(dir_path) - .await .unwrap() .unwrap(); assert_eq!(save_path, dir_path.join("fol der/file (1).tar.gz")); // the file exists, but has the wrong size - let already_exists = file_meta.already_exists(dir_path).await.unwrap(); + let already_exists = file_meta.already_exists(dir_path).unwrap(); assert!(!already_exists); // the path should be suffixed with "part" and the length of the file @@ -49,7 +48,7 @@ async fn test_file_meta_1() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz.part5")); // a partial download does exist - let partial_exists = file_meta.partial_download_exists(dir_path).await.unwrap(); + let partial_exists = file_meta.partial_download_exists(dir_path).unwrap(); assert_eq!(partial_exists, Some(2)); } @@ -80,19 +79,18 @@ async fn test_file_meta_2() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz")); // unoccupied path should increment the appended number by one - let save_path = file_meta.get_unoccupied_save_path(dir_path).await.unwrap(); + let save_path = file_meta.get_unoccupied_save_path(dir_path).unwrap(); assert_eq!(save_path, dir_path.join("fol der/file (2).tar.gz")); // last occupied path let save_path = file_meta .get_last_occupied_save_path(dir_path) - .await .unwrap() .unwrap(); assert_eq!(save_path, dir_path.join("fol der/file (1).tar.gz")); // the file exists with the right size - let already_exists = file_meta.already_exists(dir_path).await.unwrap(); + let already_exists = file_meta.already_exists(dir_path).unwrap(); assert!(already_exists); // the path should be suffixed with "part" and the length of the file @@ -100,7 +98,7 @@ async fn test_file_meta_2() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz.part5")); // the partial download file has the wrong size suffix - let partial_exists = file_meta.partial_download_exists(dir_path).await.unwrap(); + let partial_exists = file_meta.partial_download_exists(dir_path).unwrap(); assert_eq!(partial_exists, None); } @@ -121,18 +119,15 @@ async fn test_file_meta_empty() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz")); // unoccupied path should increment the appended number by one - let save_path = file_meta.get_unoccupied_save_path(dir_path).await.unwrap(); + let save_path = file_meta.get_unoccupied_save_path(dir_path).unwrap(); assert_eq!(save_path, dir_path.join("fol der/file.tar.gz")); // last occupied path - let save_path = file_meta - .get_last_occupied_save_path(dir_path) - .await - .unwrap(); + let save_path = file_meta.get_last_occupied_save_path(dir_path).unwrap(); assert!(save_path.is_none()); // the file doesn't exist yet - let already_exists = file_meta.already_exists(dir_path).await.unwrap(); + let already_exists = file_meta.already_exists(dir_path).unwrap(); assert!(!already_exists); // the path should be suffixed with "part" and the length of the file @@ -140,6 +135,6 @@ async fn test_file_meta_empty() { assert_eq!(save_path, dir_path.join("fol der/file.tar.gz.part5")); // a partial download does not exist - let partial_exists = file_meta.partial_download_exists(dir_path).await.unwrap(); + let partial_exists = file_meta.partial_download_exists(dir_path).unwrap(); assert!(partial_exists.is_none()); } diff --git a/gday_file_transfer/tests/test_integration.rs b/gday_file_transfer/tests/test_integration.rs index a8ae507..574ecf1 100644 --- a/gday_file_transfer/tests/test_integration.rs +++ b/gday_file_transfer/tests/test_integration.rs @@ -62,26 +62,26 @@ async fn test_file_metas_errors() { // trying to get metadata about file that doesn't exist assert!(matches!( - get_file_metas(&[dir_path.join("dir/non-existent.txt")]).await, + get_file_metas(&[dir_path.join("dir/non-existent.txt")]), Err(gday_file_transfer::Error::IO(..)) )); // both paths end in the same name. // this would cause confusion with FileMetaLocal.short_path assert!(matches!( - get_file_metas(&[dir_path.join("file1"), dir_path.join("dir/file1")]).await, + get_file_metas(&[dir_path.join("file1"), dir_path.join("dir/file1")]), Err(gday_file_transfer::Error::PathsHaveSameName(..)) )); // one path is prefix of another. that's an error! assert!(matches!( - get_file_metas(&[dir_path.to_path_buf(), dir_path.join("dir")]).await, + get_file_metas(&[dir_path.to_path_buf(), dir_path.join("dir")]), Err(gday_file_transfer::Error::PathIsPrefix(..)) )); // one path is prefix of another. that's an error! assert!(matches!( - get_file_metas(&[dir_path.join("dir"), dir_path.to_path_buf()]).await, + get_file_metas(&[dir_path.join("dir"), dir_path.to_path_buf()]), Err(gday_file_transfer::Error::PathIsPrefix(..)) )); } @@ -93,9 +93,7 @@ async fn test_get_file_metas_1() { let test_dir = make_test_dir(); let dir_path = test_dir.path(); let dir_name = PathBuf::from(dir_path.file_name().unwrap()); - let mut result = gday_file_transfer::get_file_metas(&[dir_path.to_path_buf()]) - .await - .unwrap(); + let mut result = gday_file_transfer::get_file_metas(&[dir_path.to_path_buf()]).unwrap(); let mut expected = [ FileMetaLocal { @@ -166,7 +164,6 @@ async fn test_get_file_metas_2() { dir_path.join("dir/subdir2/file1"), dir_path.join("dir/subdir2/file2.txt"), ]) - .await .unwrap(); let mut expected = [ @@ -228,7 +225,7 @@ async fn file_transfer() { dir_a_path.join("file2.txt"), dir_a_path.join("dir/subdir1"), ]; - let file_metas = get_file_metas(&paths).await.unwrap(); + let file_metas = get_file_metas(&paths).unwrap(); let file_offer = FileOfferMsg::from(file_metas.clone()); // send offer, and read response @@ -266,9 +263,8 @@ async fn file_transfer() { // read the file offer message let file_offer: FileOfferMsg = read_from_async(&mut stream_b).await.unwrap(); - let response_msg = FileResponseMsg::accept_only_new_and_interrupted(&file_offer, dir_b.path()) - .await - .unwrap(); + let response_msg = + FileResponseMsg::accept_only_new_and_interrupted(&file_offer, dir_b.path()).unwrap(); assert_eq!(response_msg.get_num_not_rejected(), 3); assert_eq!(response_msg.get_num_partially_accepted(), 1); @@ -280,7 +276,7 @@ async fn file_transfer() { &file_offer, &response_msg, dir_b.path(), - &mut stream_b, + tokio::io::BufReader::new(stream_b), |_| {}, ) .await diff --git a/gday_file_transfer/tests/test_offer.rs b/gday_file_transfer/tests/test_offer.rs index 5f9cce5..004c364 100644 --- a/gday_file_transfer/tests/test_offer.rs +++ b/gday_file_transfer/tests/test_offer.rs @@ -93,9 +93,7 @@ async fn test_file_offer() { assert_eq!(reject_all.get_num_not_rejected(), 0); assert_eq!(offer.get_transfer_size(&reject_all).unwrap(), 0); - let only_new = FileResponseMsg::accept_only_full_new_files(&offer, dir_path) - .await - .unwrap(); + let only_new = FileResponseMsg::accept_only_full_new_files(&offer, dir_path).unwrap(); assert_eq!( only_new.response, vec![None, Some(0), Some(0), Some(0), None, Some(0)] @@ -106,9 +104,7 @@ async fn test_file_offer() { assert_eq!(offer.get_transfer_size(&only_new).unwrap(), 23); let only_new_and_interrupted = - FileResponseMsg::accept_only_new_and_interrupted(&offer, dir_path) - .await - .unwrap(); + FileResponseMsg::accept_only_new_and_interrupted(&offer, dir_path).unwrap(); assert_eq!( only_new_and_interrupted.response, vec![None, Some(0), Some(4), Some(0), Some(1), Some(0)] diff --git a/gday_hole_punch/Cargo.toml b/gday_hole_punch/Cargo.toml index f17e656..1bbe661 100644 --- a/gday_hole_punch/Cargo.toml +++ b/gday_hole_punch/Cargo.toml @@ -20,7 +20,7 @@ pin-project = "1.1.7" rand = "0.8.5" serde = "1.0.215" sha2 = "0.10.8" -socket2 = { version = "0.5.7", features = ["all"] } +socket2 = { version = "0.5.8" } spake2 = { version = "0.4.0", features = ["std"] } thiserror = "2.0.3" tokio = { version = "1.41.1", features = ["net", "rt", "time"] } diff --git a/gday_hole_punch/src/lib.rs b/gday_hole_punch/src/lib.rs index 1f5b79c..9879511 100644 --- a/gday_hole_punch/src/lib.rs +++ b/gday_hole_punch/src/lib.rs @@ -146,7 +146,7 @@ pub enum Error { ServerIDNotFound(u64), /// Couldn't connect to any of the contact exchange servers listed - #[error("Couldn't connect to any of the contact exchange servers listed.")] + #[error("Couldn't connect to any of the contact exchange servers in the list.")] CouldntConnectToServers, /// Invalid server DNS name for TLS @@ -158,24 +158,25 @@ pub enum Error { #[error( "Timed out while trying to connect to peer, likely due to an uncooperative \ NAT (network address translator). \ - Try from a different network, enable IPv6, or switch to a tool that transfers \ - files over a relay to evade NATs, such as magic-wormhole." + Enable IPv6 or try from a different network. \ + Or use a tool such as magic-wormhole that transfers \ + over a relay to evade NATs." )] HolePunchTimeout, /// Couldn't parse server ID of [`PeerCode`] - #[error("Couldn't parse server ID in your code: {0}. Check it for typos!")] + #[error("Couldn't parse the server ID in your code: {0}. Check it for typos!")] CouldntParseServerID(#[from] std::num::ParseIntError), /// The room_code or shared_secret of the peer code contained a period. - /// Periods aren't allowed because they're used as code delimeters. + /// Periods aren't allowed because they're used as delimeters. #[error( "The room_code or shared_secret of the peer code contained a period. \ - Periods aren't allowed because they're used as code delimeters." + Periods aren't allowed because they're used as delimeters." )] PeerCodeContainedPeriod, - /// Couldn't parse [`PeerCode`] + /// Wrong number of settings in [`PeerCode`]. #[error("Wrong number of segments in your code. Check it for typos!")] WrongNumberOfSegmentsPeerCode, } diff --git a/gday_hole_punch/src/server_connector.rs b/gday_hole_punch/src/server_connector.rs index ef0006f..10ddeb2 100644 --- a/gday_hole_punch/src/server_connector.rs +++ b/gday_hole_punch/src/server_connector.rs @@ -1,13 +1,14 @@ //! Functions for connecting to a Gday server. use crate::Error; use gday_contact_exchange_protocol::Contact; -use log::{debug, warn}; +use log::{debug, error, warn}; use rand::seq::SliceRandom; use socket2::SockRef; use std::fmt::Debug; use std::io::ErrorKind; use std::net::SocketAddr::{V4, V6}; use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::io::AsyncWriteExt; use tokio::net::{TcpStream, ToSocketAddrs}; pub use gday_contact_exchange_protocol::DEFAULT_PORT; @@ -209,6 +210,18 @@ impl ServerConnection { Ok(contact) } + + /// Calls shutdown on the underlying streams to gracefully + /// close the connection. + pub async fn shutdown(&mut self) -> std::io::Result<()> { + if let Some(stream) = &mut self.v4 { + stream.shutdown().await?; + } + if let Some(stream) = &mut self.v6 { + stream.shutdown().await?; + } + Ok(()) + } } /// In random order, sequentially try connecting to `servers`. @@ -289,6 +302,10 @@ pub async fn connect_to_random_domain_name( } }; } + error!( + "Couldn't connect to any of the {} contact exchange servers.", + domain_names.len() + ); Err(recent_error) } @@ -367,7 +384,7 @@ pub async fn connect_tcp( } else { Some(Err(std::io::Error::new( ErrorKind::TimedOut, - format!("Timed out while trying to connect to {addrs:?}."), + format!("Timed out while trying to connect to server {addrs:?}."), ))) } } else { @@ -381,7 +398,7 @@ pub async fn connect_tcp( } else { Some(Err(std::io::Error::new( ErrorKind::TimedOut, - format!("Timed out while trying to connect to {addrs:?}."), + format!("Timed out while trying to connect to server {addrs:?}."), ))) } } else { diff --git a/gday_server/Cargo.toml b/gday_server/Cargo.toml index 5609d08..7122f36 100644 --- a/gday_server/Cargo.toml +++ b/gday_server/Cargo.toml @@ -15,9 +15,9 @@ version.workspace = true [dependencies] clap = { version = "4.5.21", features = ["derive"] } -socket2 = { version = "0.5.7", features = ["all"] } +socket2 = { version = "0.5.8" } tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros", "net", "time", "sync"] } -tokio-rustls = { version = "0.26.0", features = ["ring", "logging", "tls12"], default-features = false } +tokio-rustls = { version = "0.26.0" } gday_contact_exchange_protocol = { version = "0.2.1", path = "../gday_contact_exchange_protocol" } thiserror = "2.0.3" log = "0.4.22" diff --git a/gday_server/README.md b/gday_server/README.md index 39293c1..bc85898 100644 --- a/gday_server/README.md +++ b/gday_server/README.md @@ -8,8 +8,7 @@ A server that runs the [gday_contact_exchange_protocol](https://docs.rs/gday_con To run the executable directly: -1. Go to [releases](https://github.com/manforowicz/gday/releases) -and download the correct file for your platform. +1. Download an executable from [releases](https://github.com/manforowicz/gday/releases). 2. Extract it (on Linux: `tar xf `). 3. Run it: `./gday_server` @@ -31,9 +30,9 @@ Options: -k, --key PEM file of private TLS server key -c, --certificate PEM file of signed TLS server certificate -u, --unencrypted Use unencrypted TCP instead of TLS - -a, --address
Custom socket address on which to listen. [default: `[::]:2311` for TLS, `[::]:2310` when --unencrypted] - -t, --timeout Number of seconds before a new room is deleted [default: 3600] - -r, --request-limit Max number of requests an IP address can send in a minute before they're rejected [default: 60] + -a, --addresses Socket addresses on which to listen [default: 0.0.0.0:2311 [::]:2311] + -t, --timeout Number of seconds before a new room is deleted [default: 600] + -r, --request-limit Max number of create room requests and requests with an invalid room code an IP address can send per minute before they're rejected [default: 10] -v, --verbosity Log verbosity. (trace, debug, info, warn, error) [default: info] -h, --help Print help -V, --version Print version @@ -41,19 +40,26 @@ Options: ## Deployment -One of the strengths of gday is its decentralized nature. Want to add your own server to the list of [default servers](https://docs.rs/gday_hole_punch/latest/gday_hole_punch/server_connector/constant.DEFAULT_SERVERS.html)? Here's how: 1. Get a [virtual private server](https://en.wikipedia.org/wiki/Virtual_private_server) (VPS) from a hosting service. It must have public IPv4 and IPv6 addresses and not be behind [NAT](https://en.wikipedia.org/wiki/Network_address_translation). -2. Buy/configure a domain name to point at your VPS. + +2. Buy and configure a domain name to point at your VPS. + 3. On the VPS, get a TLS certificate using [certbot](https://certbot.eff.org/) with your domain name. + 4. On the VPS, use a tool such as `wget` to download gday_server from the [releases page](https://github.com/manforowicz/gday/releases). + 5. On the VPS, run the `gday_server` with the correct TLS arguments. -6. On a local device, verify you can use `gday` with your server domain name passed as an argument. + +6. On a local device, verify you can use `gday` with your `--server` domain name passed as an argument. + 7. On the VPS, follow instructions in [gday_server.service](https://github.com/manforowicz/gday/blob/main/other/gday_server.service) to set up a [systemd service](https://www.freedesktop.org/software/systemd/man/latest/systemd.service.html). + 8. Verify `gday_server` auto-starts in the background, even when you reboot the server. + 9. Submit an [issue](https://github.com/manforowicz/gday/issues), asking for your server to be added to the [default server list](https://docs.rs/gday_hole_punch/latest/gday_hole_punch/server_connector/constant.DEFAULT_SERVERS.html). ## Related diff --git a/other/demo.sh b/other/demo.sh index a8d97f0..b712592 100755 --- a/other/demo.sh +++ b/other/demo.sh @@ -4,12 +4,18 @@ # Intended for recording gday demos. # Requires: asciinema, tmux # Use ctrl+b to switch between panes. -# Use ctrl+d to end the recording. +# Press ctrl+d multiple times to end the recording. # Create the demo folders if they don't exist yet. mkdir tmp cd tmp + mkdir peer_1 +mkdir peer_1/folder +echo "Hello everyone!" > peer_1/file.mp4 +echo "Testing" > peer_1/folder/img.jpg +echo "Hi there!" > peer_1/folder/word.docx + mkdir peer_2 # Start a new session detached @@ -28,5 +34,9 @@ tmux send-keys -t demo_session:0.1 'clear' C-m # Select the left pane tmux select-pane -t demo_session:0.0 +cd ../ + # Start recording asciinema rec -c "tmux attach -t demo_session" --overwrite demo.cast + +rm -r tmp diff --git a/other/gday_server.service b/other/gday_server.service index 8354163..bf714f1 100644 --- a/other/gday_server.service +++ b/other/gday_server.service @@ -1,6 +1,6 @@ # systemd service that runs a gday_server # -# Put your gday_server executable in ~/gday_server/gday_server. +# Put your gday_server executable in /root/gday_server. # # Save this file as: # '/etc/systemd/system/gday_server.service' @@ -21,7 +21,7 @@ # 'sudo journalctl -u gday_server -f' # # View stderr and stdout log files in real time: -# 'tail -f ~/gday_server/stdout.log' +# 'tail -f /root/logs/stderr.log' [Unit] @@ -35,20 +35,21 @@ After=network.target [Service] # Command to execute (modify as needed) -ExecStart=/home/ubuntu/gday_server/gday_server --key /etc/letsencrypt/live/gday.manforowicz.com/privkey.pem --certificate /etc/letsencrypt/live/gday.manforowicz.com/fullchain.pem +ExecStart=/root/gday_server --key /etc/letsencrypt/live/gday.manforowicz.com/privkey.pem --certificate /etc/letsencrypt/live/gday.manforowicz.com/fullchain.pem # Auto-restart the service if it crashes Restart=always # How long to wait between restarts +# (to avoid wasting resources if inifinite crash loop occurs) RestartSec=20 # Run the service as root, so it can access the certificates User=root # Pipe stdout and stderr into custom log files (modify as needed) -StandardOutput=append:/home/ubuntu/gday_server/stdout.log -StandardError=append:/home/ubuntu/gday_server/stderr.log +StandardOutput=append:/root/logs/stdout.log +StandardError=append:/root/logs/stderr.log [Install]