Skip to content

Commit

Permalink
fix channel_bind
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 16, 2024
1 parent 97487f1 commit 8402c18
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 107 deletions.
12 changes: 11 additions & 1 deletion rtc-turn/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,17 @@ impl Client {
relay.handle_refresh_allocation_response(msg)?;
}
}
METHOD_CHANNEL_BIND => {}
METHOD_CHANNEL_BIND => {
if let TransactionType::ChannelBindRequest(relayed_addr, bind_addr) =
tr.transaction_type
{
let mut relay = Relay {
relayed_addr,
client: self,
};
relay.handle_channel_bind_response(msg, bind_addr)?;
}
}
_ => {}
}
}
Expand Down
185 changes: 83 additions & 102 deletions rtc-turn/src/client/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::permission::*;
use super::transaction::*;
use crate::proto;

use crate::client::binding::BindingState;
use crate::client::{Client, Event, RelayedAddr};
use shared::error::{Error, Result};

Expand Down Expand Up @@ -73,13 +74,6 @@ pub struct Relay<'a> {
}

impl<'a> Relay<'a> {
/// This func-block would block, per destination IP (, or perm), until
/// the perm state becomes "requested". Purpose of this is to guarantee
/// the order of packets (within the same perm).
/// Note that CreatePermission transaction may not be complete before
/// all the data transmission. This is done assuming that the request
/// will be mostly likely successful and we can tolerate some loss of
/// UDP packet (or reorder), inorder to minimize the latency in most cases.
pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<()> {
if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
if !relay.perm_map.contains(&peer_addr) {
Expand Down Expand Up @@ -141,32 +135,42 @@ impl<'a> Relay<'a> {
}
}

pub fn send_to(&mut self, _p: &[u8], peer_addr: SocketAddr) -> Result<()> {
pub fn send_to(&mut self, p: &[u8], peer_addr: SocketAddr) -> Result<()> {
// check if we have a permission for the destination IP addr
if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
let result = if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
if let Some(perm) = relay.perm_map.get_mut(&peer_addr) {
if perm.state() != PermState::Permitted {
Err(Error::ErrNoPermission)
} else {
//TODO:
Ok(())
Ok((relay.integrity.clone(), relay.nonce.clone()))
}
} else {
Err(Error::ErrNoPermission)
}
} else {
Err(Error::ErrConnClosed)
}
};

let (integrity, nonce) = result?;

self.send(p, peer_addr, integrity, nonce)
}

/*TODO:
let number = {
fn send(
&mut self,
p: &[u8],
peer_addr: SocketAddr,
integrity: MessageIntegrity,
nonce: Nonce,
) -> Result<()> {
let channel_number = {
let (bind_st, bind_at, bind_number, bind_addr) = {
let b = if let Some(b) = self.client.binding_mgr.find_by_addr(&addr) {
let b = if let Some(b) = self.client.binding_mgr.find_by_addr(&peer_addr) {
b
} else {
self.client
.binding_mgr
.create(addr)
.create(peer_addr)
.ok_or_else(|| Error::Other("Addr not found".to_owned()))?
};
(b.state(), b.refreshed_at(), b.number, b.addr)
Expand All @@ -180,108 +184,50 @@ impl<'a> Relay<'a> {
// the binding transaction has been complete
// binding state may have been changed while waiting. check again.
if bind_st == BindingState::Idle {
let nonce = self.nonce.clone();
let integrity = self.integrity.clone();
{
if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_state(BindingState::Request);
}
if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_state(BindingState::Request);
}
tokio::spawn(async move {
let result = RelayConnInternal::bind(
rc_obs,
bind_addr,
bind_number,
nonce,
integrity,
)
.await;
{
if let Err(err) = result {
if Error::ErrUnexpectedResponse != err {
self.client.binding_mgr.delete_by_addr(&bind_addr);
} else if let Some(b) =
self.client.binding_mgr.get_by_addr(&bind_addr)
{
b.set_state(BindingState::Failed);
}
// keep going...
warn!("bind() failed: {}", err);
} else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr)
{
b.set_state(BindingState::Ready);
}
}
});
self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
}

// send data using SendIndication
let peer_addr = proto::peeraddr::PeerAddress {
ip: addr.ip(),
port: addr.port(),
};
let mut msg = Message::new();
msg.build(&[
Box::new(TransactionId::new()),
Box::new(MessageType::new(METHOD_SEND, CLASS_INDICATION)),
Box::new(proto::data::Data(p.to_vec())),
Box::new(peer_addr),
Box::new(proto::peeraddr::PeerAddress {
ip: peer_addr.ip(),
port: peer_addr.port(),
}),
Box::new(FINGERPRINT),
])?;

// indication has no transaction (fire-and-forget)
let turn_server_addr = self.client.turn_server_addr();
return Ok(self.client.write_to(&msg.raw, &turn_server_addr)?);
self.client
.write_to(&msg.raw, self.client.turn_server_addr());
return Ok(());
}

// binding is either ready
// binding is ready
// check if the binding needs a refresh
if bind_st == BindingState::Ready
&& Instant::now()
.checked_duration_since(bind_at)
.unwrap_or_else(|| Duration::from_secs(0))
> Duration::from_secs(5 * 60)
{
let nonce = self.nonce.clone();
let integrity = self.integrity.clone();
{
if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_state(BindingState::Refresh);
}
if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_state(BindingState::Refresh);
}
tokio::spawn(async move {
let result =
RelayConnInternal::bind(rc_obs, bind_addr, bind_number, nonce, integrity)
.await;
{
if let Err(err) = result {
if Error::ErrUnexpectedResponse != err {
self.client.binding_mgr.delete_by_addr(&bind_addr);
} else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr)
{
b.set_state(BindingState::Failed);
}
// keep going...
warn!("bind() for refresh failed: {}", err);
} else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_refreshed_at(Instant::now());
b.set_state(BindingState::Ready);
}
}
});
self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
}

bind_number
};

// send via ChannelData
self.send_channel_data(p, number)
*/
self.send_channel_data(p, channel_number)
}

// Close closes the connection.
Expand Down Expand Up @@ -440,8 +386,9 @@ impl<'a> Relay<'a> {
}
}

/*TODO: fn bind(
fn channel_bind(
&mut self,
relayed_addr: RelayedAddr,
bind_addr: SocketAddr,
bind_number: u16,
nonce: Nonce,
Expand Down Expand Up @@ -470,28 +417,62 @@ impl<'a> Relay<'a> {
};

debug!("UDPConn.bind call PerformTransaction 1");
let tr_res = self.client.perform_transaction(
let _ = self.client.perform_transaction(
&msg,
turn_server_addr,
TransactionType::ChannelBindRequest,
TransactionType::ChannelBindRequest(relayed_addr, bind_addr),
);

let res = tr_res.msg;
Ok(())
}

if res.typ != MessageType::new(METHOD_CHANNEL_BIND, CLASS_SUCCESS_RESPONSE) {
return Err(Error::ErrUnexpectedResponse);
}
pub(super) fn handle_channel_bind_response(
&mut self,
res: Message,
bind_addr: SocketAddr,
) -> Result<()> {
if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
let result = if res.typ.class == CLASS_ERROR_RESPONSE {
let mut code = ErrorCodeAttribute::default();
let result = code.get_from(&res);
if result.is_err() {
Err(Error::Other(format!("{}", res.typ)))
} else if code.code == CODE_STALE_NONCE {
relay.set_nonce_from_msg(&res);
Err(Error::ErrTryAgain)
} else {
Err(Error::Other(format!("{} (error {})", res.typ, code)))
}
} else if res.typ != MessageType::new(METHOD_CHANNEL_BIND, CLASS_SUCCESS_RESPONSE) {
Err(Error::ErrUnexpectedResponse)
} else {
Ok(())
};

debug!("channel binding successful: {} {}", bind_addr, bind_number);
if let Err(err) = result {
if Error::ErrUnexpectedResponse != err {
self.client.binding_mgr.delete_by_addr(&bind_addr);
} else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_state(BindingState::Failed);
}

// Success.
Ok(())
}*/
// keep going...
warn!("bind() failed: {}", err);
} else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
b.set_refreshed_at(Instant::now());
b.set_state(BindingState::Ready);
debug!("channel binding successful: {}", bind_addr);
}
Ok(())
} else {
Err(Error::ErrConnClosed)
}
}

fn send_channel_data(&mut self, data: &[u8], ch_num: u16) -> Result<()> {
fn send_channel_data(&mut self, data: &[u8], channel_number: u16) -> Result<()> {
let mut ch_data = proto::chandata::ChannelData {
data: data.to_vec(),
number: proto::channum::ChannelNumber(ch_num),
number: proto::channum::ChannelNumber(channel_number),
..Default::default()
};
ch_data.encode();
Expand Down
8 changes: 4 additions & 4 deletions rtc-turn/src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::{Duration, Instant};

use stun::message::*;

use crate::client::Event;
use crate::client::{Event, RelayedAddr};
use shared::{Protocol, Transmit, TransportContext};
use stun::textattrs::TextAttribute;

Expand All @@ -17,9 +17,9 @@ pub(crate) enum TransactionType {
BindingRequest,
AllocateAttempt,
AllocateRequest(TextAttribute),
CreatePermissionRequest(SocketAddr, Option<SocketAddr>),
RefreshRequest(SocketAddr),
ChannelBindRequest,
CreatePermissionRequest(RelayedAddr, Option<SocketAddr>),
RefreshRequest(RelayedAddr),
ChannelBindRequest(RelayedAddr, SocketAddr),
}

// TransactionConfig is a set of config params used by NewTransaction
Expand Down

0 comments on commit 8402c18

Please sign in to comment.