diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index d4d9767..7f53313 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -362,6 +362,12 @@ defmodule ExICE.ICEAgent do {:noreply, %{state | ice_agent: ice_agent}} end + @impl true + def handle_info({:tr_rtx_timeout, tr_id}, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_tr_rtx_timeout(state.ice_agent, tr_id) + {:noreply, %{state | ice_agent: ice_agent}} + end + @impl true def handle_info(:eoc_timeout, state) do ice_agent = ExICE.Priv.ICEAgent.handle_eoc_timeout(state.ice_agent) diff --git a/lib/ex_ice/priv/gatherer.ex b/lib/ex_ice/priv/gatherer.ex index 3a0023c..a4ecfa4 100644 --- a/lib/ex_ice/priv/gatherer.ex +++ b/lib/ex_ice/priv/gatherer.ex @@ -117,6 +117,8 @@ defmodule ExICE.Priv.Gatherer do Socket: #{inspect(sock_ip)} STUN server: #{inspect(stun_server)} """) + + {:error, :non_matching_addr_family} end {:error, reason} -> diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 432208e..3c24762 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -26,7 +26,7 @@ defmodule ExICE.Priv.ICEAgent do # Transaction timeout in ms. # See appendix B.1. - @hto 500 + @hto 2_000 # Pair timeout in ms. # If we don't receive any data in this time, @@ -38,6 +38,9 @@ defmodule ExICE.Priv.ICEAgent do # we will set it on our own. @eoc_timeout 10_000 + # Connectivity check retransmission timeout in ms. + @tr_rtx_timeout 500 + @conn_check_handler %{ controlling: ConnCheckHandler.Controlling, controlled: ConnCheckHandler.Controlled @@ -73,6 +76,7 @@ defmodule ExICE.Priv.ICEAgent do gathering_transactions: %{}, checklist: %{}, conn_checks: %{}, + tr_rtx: [], keepalives: %{}, gathering_state: :new, eoc: false, @@ -462,31 +466,15 @@ defmodule ExICE.Priv.ICEAgent do if ice_agent.state in [:completed, :failed] do update_ta_timer(ice_agent) else - {transaction_executed, ice_agent} = - case Checklist.get_next_pair(ice_agent.checklist) do - %CandidatePair{} = pair -> - Logger.debug("Sending conn check on pair: #{inspect(pair.id)}") - pair = %CandidatePair{pair | last_seen: now()} - ice_agent = send_conn_check(ice_agent, pair) - {true, ice_agent} - + ice_agent = + case find_next_transaction(ice_agent) do nil -> - # credo:disable-for-lines:3 Credo.Check.Refactor.Nesting - case get_next_gathering_transaction(ice_agent) do - {_t_id, transaction} -> - case execute_gathering_transaction(ice_agent, transaction) do - {:ok, ice_agent} -> {true, ice_agent} - {:error, ice_agent} -> {false, ice_agent} - end - - nil -> - {false, ice_agent} - end - end + Logger.debug("No transaction to execute. Did Ta timer fired without the need?") + ice_agent - unless transaction_executed do - Logger.debug("Couldn't find transaction to execute. Did Ta timer fired without the need?") - end + {type, tr} -> + execute_transaction(ice_agent, type, tr) + end # schedule next check and call update_ta_timer # if the next check is not needed, update_ta_timer will @@ -497,6 +485,131 @@ defmodule ExICE.Priv.ICEAgent do end end + defp find_next_transaction(ice_agent) do + find_next_transaction(ice_agent, :conn_check) + end + + defp find_next_transaction(ice_agent, :conn_check) do + case Checklist.get_next_pair(ice_agent.checklist) do + nil -> find_next_transaction(ice_agent, :gathering) + pair -> {:conn_check, pair} + end + end + + defp find_next_transaction(ice_agent, :gathering) do + case get_next_gathering_transaction(ice_agent) do + nil -> find_next_transaction(ice_agent, :rtx) + {_id, gather_tr} -> {:gathering, gather_tr} + end + end + + defp find_next_transaction(ice_agent, :rtx) do + case List.first(ice_agent.tr_rtx) do + nil -> nil + tr_id -> {:rtx, tr_id} + end + end + + defp execute_transaction(ice_agent, :conn_check, pair) do + Logger.debug("Sending conn check on pair: #{inspect(pair.id)}") + pair = %CandidatePair{pair | last_seen: now()} + send_conn_check(ice_agent, pair) + end + + defp execute_transaction(ice_agent, :gathering, tr) do + {_, ice_agent} = execute_gathering_transaction(ice_agent, tr) + ice_agent + end + + defp execute_transaction(ice_agent, :rtx, t_id) + when is_map_key(ice_agent.gathering_transactions, t_id) do + Logger.debug("Retransmitting srflx gathering transaction: #{t_id}") + + tr_rtx = List.delete(ice_agent.tr_rtx, t_id) + ice_agent = %{ice_agent | tr_rtx: tr_rtx} + tr = Map.fetch!(ice_agent.gathering_transactions, t_id) + + # gather_srflx_candidate will create exactly the same message + case Gatherer.gather_srflx_candidate(ice_agent.gatherer, t_id, tr.socket, tr.stun_server.url) do + :ok -> + Process.send_after(self(), {:tr_rtx_timeout, t_id}, @tr_rtx_timeout) + ice_agent + + {:error, reason} -> + Logger.debug(""" + Failed to retransmit srflx gathering transaction, reason: #{inspect(reason)}. + Transaction id: #{t_id}. + Scheduling next rtx.\ + """) + + Process.send_after(self(), {:tr_rtx_timeout, t_id}, @tr_rtx_timeout) + ice_agent + end + end + + defp execute_transaction(ice_agent, :rtx, t_id) when is_map_key(ice_agent.conn_checks, t_id) do + Logger.debug("Retransmitting conn check: #{t_id}") + + tr_rtx = List.delete(ice_agent.tr_rtx, t_id) + ice_agent = %{ice_agent | tr_rtx: tr_rtx} + conn_check = Map.fetch!(ice_agent.conn_checks, t_id) + + pair = Map.fetch!(ice_agent.checklist, conn_check.pair_id) + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + dst = {remote_cand.address, remote_cand.port} + + case do_send(ice_agent, local_cand, dst, conn_check.raw_req) do + {:ok, ice_agent} -> + Process.send_after(self(), {:tr_rtx_timeout, t_id}, @tr_rtx_timeout) + ice_agent + + {:error, ice_agent} -> + ice_agent + end + end + + defp execute_transaction(ice_agent, :rtx, t_id) do + Logger.debug(""" + Tried to retransmit transaction but it is no longer in-progress. Ignoring. + Transaction id: #{t_id}\ + """) + + tr_rtx = List.delete(ice_agent.tr_rtx, t_id) + %{ice_agent | tr_rtx: tr_rtx} + end + + @spec handle_tr_rtx_timeout(t(), integer()) :: t() + def handle_tr_rtx_timeout(ice_agent, t_id) when is_map_key(ice_agent.conn_checks, t_id) do + # Mark transaction id as ready to be retransmitted. + # We will do this in handle_ta_timeout as it has to be paced. + Logger.debug(""" + Scheduling conn check for retransmission. + Conn check transaction id: #{t_id}\ + """) + + %{ice_agent | tr_rtx: ice_agent.tr_rtx ++ [t_id]} + end + + def handle_tr_rtx_timeout(ice_agent, t_id) + when is_map_key(ice_agent.gathering_transactions, t_id) do + Logger.debug(""" + Scheduling srflx gathering transaction for retransmission. + Transaction id: #{t_id}\ + """) + + %{ice_agent | tr_rtx: ice_agent.tr_rtx ++ [t_id]} + end + + def handle_tr_rtx_timeout(ice_agent, transaction_id) do + Logger.debug(""" + Transaction timeout timer fired but there is no such transaction in progress. Ignoring. + Transaction id: #{transaction_id}\ + """) + + ice_agent + end + @spec handle_eoc_timeout(t()) :: t() def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do Logger.debug("EOC timer fired but we are in the failed state. Ignoring.") @@ -698,15 +811,12 @@ defmodule ExICE.Priv.ICEAgent do :ok = ice_agent.transport_module.send(tr.socket, dst, data) put_in(ice_agent.gathering_transactions[tr_id], tr) - {:error, _reason, client} -> - tr = %{tr | client: client, state: :failed} - - put_in(ice_agent.gathering_transactions[tr_id], tr) - |> update_gathering_state() + {:error, _reason, _client} -> + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[tr_id]) + update_gathering_state(ice_agent) end - # tr_id_tr might be nil or might be present with state == :complete - {_, cand} -> + {nil, cand} -> case ExTURN.Client.handle_message(cand.client, msg) do {:ok, client} -> cand = %{cand | client: client} @@ -840,10 +950,7 @@ defmodule ExICE.Priv.ICEAgent do {:error, reason} -> Logger.debug("Couldn't send binding request, reason: #{reason}") - gathering_transactions = - put_in(ice_agent.gathering_transactions, [tr.t_id, :state], :failed) - - ice_agent = %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[tr.t_id]) ice_agent = update_gathering_state(ice_agent) {:error, ice_agent} @@ -871,8 +978,10 @@ defmodule ExICE.Priv.ICEAgent do {:error, reason} -> Logger.debug("Couldn't send allocate request, reason: #{reason}") - ice_agent = put_in(ice_agent.gathering_transactions[tr.t_id][:state], :failed) + + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[tr.t_id]) ice_agent = update_gathering_state(ice_agent) + {:error, ice_agent} end end @@ -932,14 +1041,13 @@ defmodule ExICE.Priv.ICEAgent do put_in(ice_agent.gathering_transactions[tr_id], tr) {:allocation_created, {alloc_ip, alloc_port}, client} -> - tr = %{tr | client: client, state: :complete} + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[tr_id]) resolved_turn_servers = [ {client.turn_ip, client.turn_port} | ice_agent.resolved_turn_servers ] ice_agent = %{ice_agent | resolved_turn_servers: resolved_turn_servers} - ice_agent = put_in(ice_agent.gathering_transactions[tr_id], tr) relay_cand = Candidate.Relay.new( @@ -949,7 +1057,7 @@ defmodule ExICE.Priv.ICEAgent do base_port: alloc_port, transport_module: ice_agent.transport_module, socket: tr.socket, - client: tr.client + client: client ) Logger.debug("New relay candidate: #{inspect(relay_cand)}") @@ -966,10 +1074,10 @@ defmodule ExICE.Priv.ICEAgent do :ok = ice_agent.transport_module.send(tr.socket, turn_addr, data) put_in(ice_agent.gathering_transactions[tr_id], tr) - {:error, _reason, client} -> + {:error, _reason, _client} -> Logger.debug("Failed to create TURN allocation.") - tr = %{tr | client: client, state: :failed} - put_in(ice_agent.gathering_transactions[tr_id], tr) + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[tr_id]) + ice_agent end end @@ -1470,51 +1578,45 @@ defmodule ExICE.Priv.ICEAgent do ice_agent, %Message{type: %Type{class: :success_response}} = msg ) do - tr = Map.fetch!(ice_agent.gathering_transactions, msg.transaction_id) + {tr, ice_agent} = pop_in(ice_agent.gathering_transactions[msg.transaction_id]) {:ok, %XORMappedAddress{address: xor_addr, port: xor_port}} = Message.get_attribute(msg, XORMappedAddress) - ice_agent = - case find_local_cand(Map.values(ice_agent.local_cands), xor_addr, xor_port) do - nil -> - {:ok, {base_addr, base_port}} = ice_agent.transport_module.sockname(tr.socket) - - c = - Candidate.Srflx.new( - address: xor_addr, - port: xor_port, - base_address: base_addr, - base_port: base_port, - transport_module: ice_agent.transport_module, - socket: tr.socket - ) - - Logger.debug("New srflx candidate: #{inspect(c)}") - notify(ice_agent.on_new_candidate, {:new_candidate, Candidate.Srflx.marshal(c)}) - # don't pair reflexive candidate, it should be pruned anyway - see sec. 6.1.2.4 - put_in(ice_agent.local_cands[c.base.id], c) - - cand -> - Logger.debug(""" - Not adding srflx candidate as we already have a candidate with the same address. - Candidate: #{inspect(cand)} - """) + case find_local_cand(Map.values(ice_agent.local_cands), xor_addr, xor_port) do + nil -> + {:ok, {base_addr, base_port}} = ice_agent.transport_module.sockname(tr.socket) + + cand = + Candidate.Srflx.new( + address: xor_addr, + port: xor_port, + base_address: base_addr, + base_port: base_port, + transport_module: ice_agent.transport_module, + socket: tr.socket + ) - ice_agent - end + Logger.debug("New srflx candidate: #{inspect(cand)}") + notify(ice_agent.on_new_candidate, {:new_candidate, Candidate.Srflx.marshal(cand)}) + # don't pair reflexive candidate, it should be pruned anyway - see sec. 6.1.2.4 + put_in(ice_agent.local_cands[cand.base.id], cand) - gathering_transactions = - Map.update!(ice_agent.gathering_transactions, tr.t_id, fn tr -> %{tr | state: :complete} end) + cand -> + Logger.debug(""" + Not adding srflx candidate as we already have a candidate with the same address. + Candidate: #{inspect(cand)} + """) - %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} + ice_agent + end end defp handle_stun_gathering_transaction_response( ice_agent, %Message{type: %Type{class: :error_response}} = msg ) do - t = Map.fetch!(ice_agent.gathering_transactions, msg.transaction_id) + {_, ice_agent} = pop_in(ice_agent.gathering_transactions[msg.transaction_id]) error_code = case Message.get_attribute(msg, ErrorCode) do @@ -1526,10 +1628,7 @@ defmodule ExICE.Priv.ICEAgent do "Gathering transaction failed, t_id: #{msg.transaction_id}, reason: #{inspect(error_code)}" ) - gathering_transactions = - Map.update!(ice_agent.gathering_transactions, t.t_id, fn t -> %{t | state: :failed} end) - - %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} + ice_agent end # Adds valid pair according to sec 7.2.5.3.2 @@ -2349,15 +2448,20 @@ defmodule ExICE.Priv.ICEAgent do |> Message.with_integrity(ice_agent.remote_pwd) |> Message.with_fingerprint() + raw_req = Message.encode(req) + dst = {remote_cand.address, remote_cand.port} - case do_send(ice_agent, local_cand, dst, Message.encode(req)) do + case do_send(ice_agent, local_cand, dst, raw_req) do {:ok, ice_agent} -> + Process.send_after(self(), {:tr_rtx_timeout, req.transaction_id}, @tr_rtx_timeout) + pair = %CandidatePair{pair | state: :in_progress} conn_check = %{ pair_id: pair.id, - send_time: now() + send_time: now(), + raw_req: raw_req } conn_checks = Map.put(ice_agent.conn_checks, req.transaction_id, conn_check) diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index debe652..0b69cf7 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -907,6 +907,173 @@ defmodule ExICE.Priv.ICEAgentTest do end end + describe "connectivity check rtx" do + setup do + remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) + + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + if_discovery_module: IfDiscovery.Mock, + transport_module: Transport.Mock + ) + |> ICEAgent.set_remote_credentials("someufrag", "somepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(remote_cand) + + %{ice_agent: ice_agent, remote_cand: remote_cand} + end + + test "retransmits cc when there is no response", %{ + ice_agent: ice_agent, + remote_cand: remote_cand + } do + [socket] = ice_agent.sockets + + # trigger binding request + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + raw_req = Transport.Mock.recv(socket) + assert raw_req != nil + {:ok, req} = ExSTUN.Message.decode(raw_req) + + # trigger rtx timeout + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + rtx_raw_req = Transport.Mock.recv(socket) + + # assert this is exactly the same message + assert raw_req == rtx_raw_req + + # provide a response and ensure no more retransmissions are sent + raw_resp = + binding_response( + req.transaction_id, + ice_agent.transport_module, + socket, + ice_agent.remote_pwd + ) + + ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, raw_resp) + + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + _ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil == Transport.Mock.recv(socket) + end + + test "stop retransmissions when pair times out", %{ice_agent: ice_agent} do + [socket] = ice_agent.sockets + + # trigger binding request + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + raw_req = Transport.Mock.recv(socket) + assert raw_req != nil + {:ok, req} = ExSTUN.Message.decode(raw_req) + + # trigger rtx timeout + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil != Transport.Mock.recv(socket) + + # mock cc send time so we can timeout it + [cc] = Map.values(ice_agent.conn_checks) + cc = %{cc | send_time: cc.send_time - 2000} + ice_agent = put_in(ice_agent.conn_checks[req.transaction_id], cc) + + # timeout cc + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert %{} == ice_agent.conn_checks + + # trigger rtx timeout and assert there is no retransmission + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil == Transport.Mock.recv(socket) + assert [] == ice_agent.tr_rtx + end + end + + describe "srflx gathering tr rtx" do + setup do + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + if_discovery_module: IfDiscovery.Mock, + transport_module: Transport.Mock, + ice_servers: [%{urls: ["stun:192.168.0.10:8445"]}] + ) + + %{ice_agent: ice_agent, stun_addr: %{ip: {192, 168, 0, 10}, port: 8445}} + end + + test "retransmits tr when there is no response", %{ice_agent: ice_agent, stun_addr: stun_addr} do + ice_agent = ICEAgent.gather_candidates(ice_agent) + [socket] = ice_agent.sockets + + # trigger binding request + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + raw_req = Transport.Mock.recv(socket) + assert raw_req != nil + {:ok, req} = ExSTUN.Message.decode(raw_req) + + # trigger rtx timeout + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + rtx_raw_req = Transport.Mock.recv(socket) + + # assert this is exactly the same message + assert raw_req == rtx_raw_req + + # provide a response and ensure no more retransmissions are sent + {:ok, {sock_ip, sock_port}} = ice_agent.transport_module.sockname(socket) + + raw_resp = + Message.new(req.transaction_id, %Type{class: :success_response, method: :binding}, [ + %XORMappedAddress{address: sock_ip, port: sock_port} + ]) + |> Message.with_fingerprint() + |> Message.encode() + + ice_agent = ICEAgent.handle_udp(ice_agent, socket, stun_addr.ip, stun_addr.port, raw_resp) + + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + _ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil == Transport.Mock.recv(socket) + end + + test "stop retransmissions when tr times out", %{ice_agent: ice_agent} do + ice_agent = ICEAgent.gather_candidates(ice_agent) + [socket] = ice_agent.sockets + + # trigger binding request + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + raw_req = Transport.Mock.recv(socket) + assert raw_req != nil + {:ok, req} = ExSTUN.Message.decode(raw_req) + + # trigger rtx timeout + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil != Transport.Mock.recv(socket) + + # mock tr send time so we can timeout it + [tr] = Map.values(ice_agent.gathering_transactions) + tr = %{tr | send_time: tr.send_time - 2000} + ice_agent = put_in(ice_agent.gathering_transactions[req.transaction_id], tr) + + # timeout tr + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert %{} == ice_agent.gathering_transactions + + # trigger rtx timeout and assert there is no retransmission + ice_agent = ICEAgent.handle_tr_rtx_timeout(ice_agent, req.transaction_id) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert nil == Transport.Mock.recv(socket) + assert [] == ice_agent.tr_rtx + end + end + test "pair timeout" do # 1. make ice agent connected # 2. mock the time a pair has received something from the peer @@ -1157,8 +1324,7 @@ defmodule ExICE.Priv.ICEAgentTest do assert srflx_cand.base.address == srflx_ip assert srflx_cand.base.port == srflx_port - # assert gathering transaction succeeded - assert ice_agent.gathering_transactions[req.transaction_id].state == :complete + assert ice_agent.gathering_transactions == %{} end test "error response", %{ice_agent: ice_agent} do @@ -1184,8 +1350,7 @@ defmodule ExICE.Priv.ICEAgentTest do |> Map.values() |> Enum.find(&(&1.base.type == :srflx)) - # assert gathering transaction failed - assert ice_agent.gathering_transactions[req.transaction_id].state == :failed + assert ice_agent.gathering_transactions == %{} end end @@ -1255,10 +1420,7 @@ defmodule ExICE.Priv.ICEAgentTest do assert relay_cand.base.address == @turn_relay_ip assert relay_cand.base.port == @turn_relay_port - - # assert gathering transaction succeeded - turn_tr_id = {socket, {@turn_ip, @turn_port}} - assert ice_agent.gathering_transactions[turn_tr_id].state == :complete + assert ice_agent.gathering_transactions == %{} end test "error response", %{ice_agent: ice_agent} do @@ -1291,9 +1453,7 @@ defmodule ExICE.Priv.ICEAgentTest do |> Map.values() |> Enum.find(&(&1.base.type == :relay)) - # assert gathering transaction failed - turn_tr_id = {socket, {@turn_ip, @turn_port}} - assert ice_agent.gathering_transactions[turn_tr_id].state == :failed + assert ice_agent.gathering_transactions == %{} end test "invalid response", %{ice_agent: ice_agent} do @@ -1356,7 +1516,7 @@ defmodule ExICE.Priv.ICEAgentTest do ) # assert gathering transaction failed - assert ice_agent.gathering_transactions[turn_tr_id].state == :failed + assert ice_agent.gathering_transactions == %{} end end