From 3ebba0b3cd13f0e32964667662f2274a8aacd276 Mon Sep 17 00:00:00 2001 From: yngrtc Date: Sat, 16 Mar 2024 10:35:01 -0700 Subject: [PATCH] fix turn_client_udp.rs --- rtc-turn/Cargo.toml | 4 +- rtc-turn/examples/turn_client_udp.rs | 280 ++++++++++++++++++--------- rtc-turn/src/client/mod.rs | 14 +- rtc-turn/src/client/relay.rs | 10 +- 4 files changed, 209 insertions(+), 99 deletions(-) diff --git a/rtc-turn/Cargo.toml b/rtc-turn/Cargo.toml index 19c18c7..00e10ee 100644 --- a/rtc-turn/Cargo.toml +++ b/rtc-turn/Cargo.toml @@ -24,8 +24,10 @@ thiserror = "1.0.57" env_logger = "0.11.3" chrono = "0.4.35" hex = "0.4.3" -clap = "4.5.2" +clap = { version = "4.5.3", features = ["derive"] } criterion = "0.5.1" +crossbeam-channel = "0.5" +ctrlc = "3.4" [features] metrics = [] diff --git a/rtc-turn/examples/turn_client_udp.rs b/rtc-turn/examples/turn_client_udp.rs index 6023259..4dd22f7 100644 --- a/rtc-turn/examples/turn_client_udp.rs +++ b/rtc-turn/examples/turn_client_udp.rs @@ -1,17 +1,32 @@ +use bytes::BytesMut; use clap::Parser; +use log::trace; use rtc_turn::client::*; -use shared::error::Result; +use shared::error::{Error, Result}; +use shared::{Protocol, Transmit, TransportContext}; +use std::io::{ErrorKind, Write}; +use std::net::UdpSocket; +use std::str::FromStr; +use std::time::{Duration, Instant}; -// RUST_LOG=trace cargo run --color=always --package turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping +// RUST_LOG=trace cargo run --color=always --package rtc-turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping #[derive(Parser)] -#[command(name = "ICE Ping Pong")] +#[command(name = "TURN Client UDP")] #[command(author = "Rusty Rain ")] #[command(version = "0.1.0")] -#[command(about = "An example of ICE", long_about = None)] +#[command(about = "An example of TURN Client UDP", long_about = None)] struct Cli { - #[arg(short, long)] - controlling: bool, + #[arg(long, default_value_t = format!("127.0.0.1"))] + host: String, + #[arg(long, default_value_t = 3478)] + port: u16, + #[arg(long)] + user: String, + #[arg(long, default_value_t = format!("webrtc.rs"))] + realm: String, + #[arg(long)] + ping: bool, #[arg(short, long)] debug: bool, @@ -20,128 +35,207 @@ struct Cli { } fn main() -> Result<()> { - env_logger::init(); - - let mut app = App::new("TURN Client UDP") - .version("0.1.0") - .author("Rain Liu ") - .about("An example of TURN Client UDP") - .setting(AppSettings::DeriveDisplayOrder) - .setting(AppSettings::SubcommandsNegateReqs) - .arg( - Arg::with_name("FULLHELP") - .help("Prints more detailed help information") - .long("fullhelp"), - ) - .arg( - Arg::with_name("host") - .required_unless("FULLHELP") - .takes_value(true) - .long("host") - .help("TURN Server name."), - ) - .arg( - Arg::with_name("user") - .required_unless("FULLHELP") - .takes_value(true) - .long("user") - .help("A pair of username and password (e.g. \"user=pass\")"), - ) - .arg( - Arg::with_name("realm") - .default_value("webrtc.rs") - .takes_value(true) - .long("realm") - .help("Realm (defaults to \"webrtc.rs\")"), - ) - .arg( - Arg::with_name("port") - .takes_value(true) - .default_value("3478") - .long("port") - .help("Listening port."), - ) - .arg( - Arg::with_name("ping") - .long("ping") - .takes_value(false) - .help("Run ping test"), - ); - - let matches = app.clone().get_matches(); - - if matches.is_present("FULLHELP") { - app.print_long_help().unwrap(); - std::process::exit(0); + let cli = Cli::parse(); + if cli.debug { + let log_level = log::LevelFilter::from_str(&cli.log_level).unwrap(); + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(None, log_level) + .init(); } - let host = matches.value_of("host").unwrap(); - let port = matches.value_of("port").unwrap(); - let user = matches.value_of("user").unwrap(); + let host = cli.host; + let port = cli.port; + let user = cli.user; let cred: Vec<&str> = user.splitn(2, '=').collect(); - let ping = matches.is_present("ping"); - let realm = matches.value_of("realm").unwrap(); + let _ping = cli.ping; + let realm = cli.realm; // TURN client won't create a local listening socket by itself. - let conn = UdpSocket::bind("0.0.0.0:0").await?; + let socket = UdpSocket::bind("0.0.0.0:0")?; + let pinger = UdpSocket::bind("0.0.0.0:0")?; + + let local_addr = socket.local_addr()?; + let peer_addr = pinger.local_addr()?; let turn_server_addr = format!("{host}:{port}"); let cfg = ClientConfig { stun_serv_addr: turn_server_addr.clone(), turn_serv_addr: turn_server_addr, + local_addr, + protocol: Protocol::UDP, username: cred[0].to_string(), password: cred[1].to_string(), realm: realm.to_string(), software: String::new(), rto_in_ms: 0, - conn: Arc::new(conn), - vnet: None, }; - let client = Client::new(cfg).await?; + let mut client = Client::new(cfg)?; + + // Allocate a relay socket on the TURN server. + let allocate_tid = client.allocate()?; + let mut relayed_addr = None; + let mut create_permission_tid = None; + // Send BindingRequest to learn our external IP + //let binding_tid = client.send_binding_request()?; + + let (stop_tx, stop_rx) = crossbeam_channel::bounded::<()>(1); + println!("Press Ctrl-C to stop"); + std::thread::spawn(move || { + let mut stop_tx = Some(stop_tx); + ctrlc::set_handler(move || { + if let Some(stop_tx) = stop_tx.take() { + let _ = stop_tx.send(()); + } + }) + .expect("Error setting Ctrl-C handler"); + }); + + let mut buf = vec![0u8; 2048]; + loop { + match stop_rx.try_recv() { + Ok(_) => break, + Err(err) => { + if err.is_disconnected() { + break; + } + } + }; + + while let Some(transmit) = client.poll_transmit() { + socket.send_to(&transmit.message, transmit.transport.peer_addr)?; + trace!( + "socket.sent {} to {}", + transmit.message.len(), + transmit.transport.peer_addr + ); + } + + while let Some(event) = client.poll_event() { + match event { + Event::TransactionTimeout(_) => return Err(Error::ErrTimeout), + Event::BindingResponse(_, reflexive_addr) => { + println!("reflexive address {}", reflexive_addr); + } + Event::BindingError(_, err) => return Err(err), + Event::AllocateResponse(tid, addr) => { + println!("relayed address {}", addr); + if relayed_addr.is_none() { + assert_eq!(tid, allocate_tid); + relayed_addr = Some(addr); + if let Some(id) = client.relay(addr)?.create_permission(peer_addr)? { + create_permission_tid = Some(id); + } else { + assert!(false, "create_permission failed"); + } + } else { + assert!(false, "relayed address is not none"); + } + } + Event::AllocateError(_, err) => return Err(err), + Event::CreatePermissionResponse(tid, peer_addr) => { + println!("CreatePermission for peer addr {} is granted", peer_addr); + if let Some(id) = create_permission_tid { + assert_eq!(tid, id); + } else { + assert!(false, "create_permission_tid is none"); + } + } + Event::CreatePermissionError(_, err) => return Err(err), + Event::DataIndicationOrChannelData(_, _, _) => {} + } + } - // Start listening on the conn provided. - client.listen().await?; + let mut eto = Instant::now() + Duration::from_millis(100); + if let Some(to) = client.poll_timout() { + if to < eto { + eto = to; + } + } - // Allocate a relay socket on the TURN server. On success, it - // will return a net.PacketConn which represents the remote - // socket. - let relay_conn = client.allocate().await?; + let delay_from_now = eto + .checked_duration_since(Instant::now()) + .unwrap_or(Duration::from_secs(0)); + if delay_from_now.is_zero() { + client.handle_timeout(Instant::now()); + continue; + } - // The relayConn's local address is actually the transport - // address assigned on the TURN server. - println!("relayed-address={}", relay_conn.local_addr()?); + socket + .set_read_timeout(Some(delay_from_now)) + .expect("setting socket read timeout"); + + if let Some(transmit) = read_socket_input(&socket, &mut buf) { + trace!( + "read_socket_input {} from {}", + transmit.message.len(), + transmit.transport.peer_addr + ); + client.handle_transmit(transmit)?; + } - // If you provided `-ping`, perform a ping test agaist the - // relayConn we have just allocated. - if ping { - do_ping_test(&client, relay_conn).await?; + // Drive time forward in all clients. + client.handle_timeout(Instant::now()); } - client.close().await?; + client.close(); Ok(()) } -async fn do_ping_test( - client: &Client, - relay_conn: impl Conn + std::marker::Send + std::marker::Sync + 'static, -) -> Result<(), Error> { - // Send BindingRequest to learn our external IP - let mapped_addr = client.send_binding_request().await?; +fn read_socket_input(socket: &UdpSocket, buf: &mut [u8]) -> Option> { + match socket.recv_from(buf) { + Ok((n, peer_addr)) => { + return Some(Transmit { + now: Instant::now(), + transport: TransportContext { + local_addr: socket.local_addr().unwrap(), + peer_addr, + protocol: Protocol::UDP, + ecn: None, + }, + message: BytesMut::from(&buf[..n]), + }); + } + Err(e) => match e.kind() { + // Expected error for set_read_timeout(). One for windows, one for the rest. + ErrorKind::WouldBlock | ErrorKind::TimedOut => None, + _ => panic!("UdpSocket read failed: {e:?}"), + }, + } +} + +/* +fn do_ping_test( + client: &mut Client, + relayed_addr: RelayedAddr, + //reflexive_addr: ReflexiveAddr, +) -> Result<()> { // Set up pinger socket (pingerConn) //println!("bind..."); - let pinger_conn_tx = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); // Punch a UDP hole for the relay_conn by sending a data to the mapped_addr. // This will trigger a TURN client to generate a permission request to the // TURN server. After this, packets from the IP address will be accepted by // the TURN server. //println!("relay_conn send hello to mapped_addr {}", mapped_addr); - relay_conn.send_to("Hello".as_bytes(), mapped_addr).await?; - let relay_addr = relay_conn.local_addr()?; + /*client + .relay(relayed_addr) + .send_to("Hello".as_bytes(), reflexive_addr)?; + */ let pinger_conn_rx = Arc::clone(&pinger_conn_tx); @@ -195,7 +289,7 @@ async fn do_ping_test( for _ in 0..2 { let msg = "12345678910".to_owned(); //format!("{:?}", tokio::time::Instant::now()); println!("sending msg={} with size={}", msg, msg.as_bytes().len()); - pinger_conn_tx.send_to(msg.as_bytes(), relay_addr).await?; + pinger_conn_tx.send_to(msg.as_bytes(), relayed_addr).await?; // For simplicity, this example does not wait for the pong (reply). // Instead, sleep 1 second. @@ -203,4 +297,4 @@ async fn do_ping_test( } Ok(()) -} +}*/ diff --git a/rtc-turn/src/client/mod.rs b/rtc-turn/src/client/mod.rs index bc576a5..53c1eeb 100644 --- a/rtc-turn/src/client/mod.rs +++ b/rtc-turn/src/client/mod.rs @@ -44,6 +44,7 @@ pub type RelayedAddr = SocketAddr; pub type ReflexiveAddr = SocketAddr; pub type PeerAddr = SocketAddr; +#[derive(Debug)] pub enum Event { TransactionTimeout(TransactionId), @@ -53,7 +54,7 @@ pub enum Event { AllocateResponse(TransactionId, RelayedAddr), AllocateError(TransactionId, Error), - CreatePermissionResponse(TransactionId), + CreatePermissionResponse(TransactionId, PeerAddr), CreatePermissionError(TransactionId, Error), DataIndicationOrChannelData(Option, PeerAddr, BytesMut), @@ -507,11 +508,12 @@ impl Client { ])?; debug!("client.Allocate call PerformTransaction 1"); - let tid = self.perform_transaction( + let mut tid = self.perform_transaction( &msg, self.turn_server_addr()?, TransactionType::AllocateAttempt, ); + tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1); Ok(tid) } @@ -547,9 +549,15 @@ impl Client { ); let mut msg = Message::new(); + + // make it same as allocate() return value so that client can retrieve it + // from Event::AllocateResponse + let mut tid = response.transaction_id; + tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1); + // Trying to authorize. msg.build(&[ - Box::new(TransactionId::new()), + Box::new(tid), Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)), Box::new(RequestedTransport { protocol: if self.protocol == Protocol::UDP { diff --git a/rtc-turn/src/client/relay.rs b/rtc-turn/src/client/relay.rs index 33a2b39..eed3468 100644 --- a/rtc-turn/src/client/relay.rs +++ b/rtc-turn/src/client/relay.rs @@ -20,6 +20,9 @@ use crate::client::{Client, Event, RelayedAddr}; use shared::error::{Error, Result}; const PERM_REFRESH_INTERVAL: Duration = Duration::from_secs(120); +// https://datatracker.ietf.org/doc/html/rfc8656#name-permissions-2 +// The Permission Lifetime MUST be 300 seconds (= 5 minutes). +const PERM_LIFETIME: Duration = Duration::from_secs(300); const MAX_RETRY_ATTEMPTS: u16 = 3; // RelayState is a set of params use by Relay @@ -214,7 +217,7 @@ impl<'a> Relay<'a> { && Instant::now() .checked_duration_since(bind_at) .unwrap_or_else(|| Duration::from_secs(0)) - > Duration::from_secs(5 * 60) + > PERM_LIFETIME { if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) { b.set_state(BindingState::Refresh); @@ -304,7 +307,10 @@ impl<'a> Relay<'a> { perm.set_state(PermState::Permitted); self.client .events - .push_back(Event::CreatePermissionResponse(res.transaction_id)); + .push_back(Event::CreatePermissionResponse( + res.transaction_id, + peer_addr, + )); } }