Skip to content

Commit

Permalink
Retransmit conn checks
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed Jul 26, 2024
1 parent 852c73a commit 9038905
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 26 deletions.
8 changes: 8 additions & 0 deletions lib/ex_ice/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,14 @@ defmodule ExICE.ICEAgent do
{:noreply, %{state | ice_agent: ice_agent}}
end

@impl true
def handle_info({:conn_check_rtx_timeout, conn_check_t_id}, state) do
ice_agent =
ExICE.Priv.ICEAgent.handle_conn_check_rtx_timeout(state.ice_agent, conn_check_t_id)

{:noreply, %{state | ice_agent: ice_agent}}
end

@impl true
def handle_info(:eoc_timeout, state) do
ice_agent = ExICE.Priv.ICEAgent.handle_eoc_timeout(state.ice_agent)
Expand Down
141 changes: 115 additions & 26 deletions lib/ex_ice/priv/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule ExICE.Priv.ICEAgent do

# Transaction timeout in ms.
# See appendix B.1.
@hto 500
@hto 2000

# Pair timeout in ms.
# If we don't receive any data in this time,
Expand All @@ -38,6 +38,9 @@ defmodule ExICE.Priv.ICEAgent do
# we will set it on our own.
@eoc_timeout 10_000

# Connectivity check retransmission timeout in ms.
@conn_check_rtx_timeout 500

@conn_check_handler %{
controlling: ConnCheckHandler.Controlling,
controlled: ConnCheckHandler.Controlled
Expand Down Expand Up @@ -73,6 +76,7 @@ defmodule ExICE.Priv.ICEAgent do
gathering_transactions: %{},
checklist: %{},
conn_checks: %{},
conn_checks_rtx: [],
keepalives: %{},
gathering_state: :new,
eoc: false,
Expand Down Expand Up @@ -462,31 +466,16 @@ defmodule ExICE.Priv.ICEAgent do
if ice_agent.state in [:completed, :failed] do
update_ta_timer(ice_agent)
else
{transaction_executed, ice_agent} =
case Checklist.get_next_pair(ice_agent.checklist) do
%CandidatePair{} = pair ->
Logger.debug("Sending conn check on pair: #{inspect(pair.id)}")
pair = %CandidatePair{pair | last_seen: now()}
ice_agent = send_conn_check(ice_agent, pair)
{true, ice_agent}

ice_agent =
case find_next_transaction(ice_agent) do
nil ->
# credo:disable-for-lines:3 Credo.Check.Refactor.Nesting
case get_next_gathering_transaction(ice_agent) do
{_t_id, transaction} ->
case execute_gathering_transaction(ice_agent, transaction) do
{:ok, ice_agent} -> {true, ice_agent}
{:error, ice_agent} -> {false, ice_agent}
end

nil ->
{false, ice_agent}
end
end
Logger.debug("No transaction to execute. Did Ta timer fired without the need?")
ice_agent

unless transaction_executed do
Logger.debug("Couldn't find transaction to execute. Did Ta timer fired without the need?")
end
tr ->
Logger.debug("Tr: #{inspect(tr)}")
execute_transaction(ice_agent, tr)
end

# schedule next check and call update_ta_timer
# if the next check is not needed, update_ta_timer will
Expand All @@ -497,6 +486,97 @@ defmodule ExICE.Priv.ICEAgent do
end
end

defp find_next_transaction(ice_agent) do
find_next_transaction(ice_agent, :pair)
end

defp find_next_transaction(ice_agent, :pair) do
case Checklist.get_next_pair(ice_agent.checklist) do
nil -> find_next_transaction(ice_agent, :gather)
pair -> pair
end
end

defp find_next_transaction(ice_agent, :gather) do
case get_next_gathering_transaction(ice_agent) do
nil -> find_next_transaction(ice_agent, :rtx)
{_id, gather_tr} -> gather_tr
end
end

defp find_next_transaction(ice_agent, :rtx) do
List.first(ice_agent.conn_checks_rtx)
end

defp execute_transaction(ice_agent, %CandidatePair{} = pair) do
Logger.debug("Sending conn check on pair: #{inspect(pair.id)}")
pair = %CandidatePair{pair | last_seen: now()}
send_conn_check(ice_agent, pair)
end

defp execute_transaction(ice_agent, gather_tr) when is_map(gather_tr) do
{_, ice_agent} = execute_gathering_transaction(ice_agent, gather_tr)
ice_agent
end

defp execute_transaction(ice_agent, conn_check_t_id) do
conn_checks_rtx = List.delete_at(ice_agent.conn_checks_rtx, 0)
ice_agent = %{ice_agent | conn_checks_rtx: conn_checks_rtx}

case Map.get(ice_agent.conn_checks, conn_check_t_id) do
nil ->
Logger.debug("""
Tried to retransmit conn check but it is no longer in-progress. Ignoring.
Conn check transaction id: #{conn_check_t_id}\
""")

ice_agent

conn_check ->
Logger.debug("Retransmitting conn check: #{conn_check_t_id}")
pair = Map.fetch!(ice_agent.checklist, conn_check.pair_id)
local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id)
remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id)
dst = {remote_cand.address, remote_cand.port}

case do_send(ice_agent, local_cand, dst, conn_check.raw_req) do
{:ok, ice_agent} ->
Process.send_after(
self(),
{:conn_check_rtx_timeout, conn_check_t_id},
@conn_check_rtx_timeout
)

ice_agent

{:error, ice_agent} ->
ice_agent
end
end
end

@spec handle_conn_check_rtx_timeout(t(), integer()) :: t()
def handle_conn_check_rtx_timeout(ice_agent, transaction_id)
when is_map_key(ice_agent.conn_checks, transaction_id) do
# Mark transaction id as ready to be retransmitted.
# We will do this in handle_ta_timeout as it has to be paced.
Logger.debug("""
Scheduling conn check for retransmission.
Conn check transaction id: #{transaction_id}\
""")

%{ice_agent | conn_checks_rtx: ice_agent.conn_checks_rtx ++ [transaction_id]}
end

def handle_conn_check_rtx_timeout(ice_agent, transaction_id) do
Logger.debug("""
Conn check rtx timeout fired but there is no such conn check in progress.
Conn check transaction id: #{transaction_id}\
""")

ice_agent
end

@spec handle_eoc_timeout(t()) :: t()
def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do
Logger.debug("EOC timer fired but we are in the failed state. Ignoring.")
Expand Down Expand Up @@ -2349,15 +2429,24 @@ defmodule ExICE.Priv.ICEAgent do
|> Message.with_integrity(ice_agent.remote_pwd)
|> Message.with_fingerprint()

raw_req = Message.encode(req)

dst = {remote_cand.address, remote_cand.port}

case do_send(ice_agent, local_cand, dst, Message.encode(req)) do
case do_send(ice_agent, local_cand, dst, raw_req) do
{:ok, ice_agent} ->
Process.send_after(
self(),
{:conn_check_rtx_timeout, req.transaction_id},
@conn_check_rtx_timeout
)

pair = %CandidatePair{pair | state: :in_progress}

conn_check = %{
pair_id: pair.id,
send_time: now()
send_time: now(),
raw_req: raw_req
}

conn_checks = Map.put(ice_agent.conn_checks, req.transaction_id, conn_check)
Expand Down
86 changes: 86 additions & 0 deletions test/priv/ice_agent_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,92 @@ defmodule ExICE.Priv.ICEAgentTest do
end
end

describe "connectivity check rtx" do
setup do
remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445)

ice_agent =
ICEAgent.new(
controlling_process: self(),
role: :controlling,
if_discovery_module: IfDiscovery.Mock,
transport_module: Transport.Mock
)
|> ICEAgent.set_remote_credentials("someufrag", "somepwd")
|> ICEAgent.gather_candidates()
|> ICEAgent.add_remote_candidate(remote_cand)

%{ice_agent: ice_agent, remote_cand: remote_cand}
end

test "retransmits cc when there is no response", %{
ice_agent: ice_agent,
remote_cand: remote_cand
} do
[socket] = ice_agent.sockets

# trigger binding request
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
raw_req = Transport.Mock.recv(socket)
assert raw_req != nil
{:ok, req} = ExSTUN.Message.decode(raw_req)

# trigger rtx timeout
ice_agent = ICEAgent.handle_conn_check_rtx_timeout(ice_agent, req.transaction_id)
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
rtx_raw_req = Transport.Mock.recv(socket)

# assert this is exactly the same message
assert raw_req == rtx_raw_req

# provide a response and ensure no more retransmissions are sent
raw_resp =
binding_response(
req.transaction_id,
ice_agent.transport_module,
socket,
ice_agent.remote_pwd
)

ice_agent =
ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, raw_resp)

ice_agent = ICEAgent.handle_conn_check_rtx_timeout(ice_agent, req.transaction_id)
_ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
assert nil == Transport.Mock.recv(socket)
end

test "stop retransmissions when pair times out", %{ice_agent: ice_agent} do
[socket] = ice_agent.sockets

# trigger binding request
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
raw_req = Transport.Mock.recv(socket)
assert raw_req != nil
{:ok, req} = ExSTUN.Message.decode(raw_req)

# trigger rtx timeout
ice_agent = ICEAgent.handle_conn_check_rtx_timeout(ice_agent, req.transaction_id)
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
assert nil != Transport.Mock.recv(socket)

# mock cc send time so we can timeout it
[cc] = Map.values(ice_agent.conn_checks)
cc = %{cc | send_time: cc.send_time - 2000}
ice_agent = put_in(ice_agent.conn_checks[req.transaction_id], cc)

# timeout cc
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
assert %{} == ice_agent.conn_checks

# trigger rtx timeout and assert there is no retransmission
ice_agent = ICEAgent.handle_conn_check_rtx_timeout(ice_agent, req.transaction_id)
ice_agent = ICEAgent.handle_ta_timeout(ice_agent)
assert nil == Transport.Mock.recv(socket)
assert [] == ice_agent.conn_checks_rtx
end
end

test "pair timeout" do
# 1. make ice agent connected
# 2. mock the time a pair has received something from the peer
Expand Down

0 comments on commit 9038905

Please sign in to comment.