From befbe870a601d2d62dfbc32e24bfa6ba378b6e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 6 May 2024 17:55:59 +0200 Subject: [PATCH 1/2] Timeout pairs after not receiving data for long enough --- README.md | 2 +- lib/ex_ice/ice_agent.ex | 14 +- lib/ex_ice/priv/candidate_pair.ex | 11 +- .../priv/conn_check_handler/controlled.ex | 87 ++++---- .../priv/conn_check_handler/controlling.ex | 23 +- lib/ex_ice/priv/ice_agent.ex | 205 +++++++++++++++--- test/priv/ice_agent_test.exs | 131 +++++++---- 7 files changed, 342 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index 09ad08b..f892e85 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Implements: * role conflict resolution * supports host, prflx, srflx and relay candidates * transaction pacing -* keepalives on valid and selected pairs +* keepalives (both incoming and outgoing) on valid and selected pairs * mDNS client ## Limitations diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index ca54684..563c81f 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -341,7 +341,19 @@ defmodule ExICE.ICEAgent do @impl true def handle_info(:ta_timeout, state) do - ice_agent = ExICE.Priv.ICEAgent.handle_timeout(state.ice_agent) + ice_agent = ExICE.Priv.ICEAgent.handle_ta_timeout(state.ice_agent) + {:noreply, %{state | ice_agent: ice_agent}} + end + + @impl true + def handle_info(:eoc_timeout, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_eoc_timeout(state.ice_agent) + {:noreply, %{state | ice_agent: ice_agent}} + end + + @impl true + def handle_info(:pair_timeout, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_pair_timeout(state.ice_agent) {:noreply, %{state | ice_agent: ice_agent}} end diff --git a/lib/ex_ice/priv/candidate_pair.ex b/lib/ex_ice/priv/candidate_pair.ex index 65c04b8..e53193d 100644 --- a/lib/ex_ice/priv/candidate_pair.ex +++ b/lib/ex_ice/priv/candidate_pair.ex @@ -20,7 +20,8 @@ defmodule ExICE.Priv.CandidatePair do valid?: boolean, succeeded_pair_id: integer() | nil, discovered_pair_id: integer() | nil, - keepalive_timer: reference() | nil + keepalive_timer: reference() | nil, + last_seen: integer() } @enforce_keys [:id, :local_cand_id, :remote_cand_id, :priority] @@ -32,7 +33,10 @@ defmodule ExICE.Priv.CandidatePair do valid?: false, succeeded_pair_id: nil, discovered_pair_id: nil, - keepalive_timer: nil + keepalive_timer: nil, + # Time when this pair has received some data + # or sent conn check. + last_seen: nil ] @doc false @@ -47,7 +51,8 @@ defmodule ExICE.Priv.CandidatePair do remote_cand_id: remote_cand.id, priority: priority, state: state, - valid?: opts[:valid?] || false + valid?: opts[:valid?] || false, + last_seen: opts[:last_seen] } end diff --git a/lib/ex_ice/priv/conn_check_handler/controlled.ex b/lib/ex_ice/priv/conn_check_handler/controlled.ex index 5f140f5..b6b9424 100644 --- a/lib/ex_ice/priv/conn_check_handler/controlled.ex +++ b/lib/ex_ice/priv/conn_check_handler/controlled.ex @@ -15,20 +15,15 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlled do nil -> Logger.debug("Adding new candidate pair: #{inspect(pair)}") checklist = Map.put(ice_agent.checklist, pair.id, pair) - %ICEAgent{ice_agent | checklist: checklist} - - %CandidatePair{} = pair - when ice_agent.selected_pair != nil and - pair.discovered_pair_id == ice_agent.selected_pair.id -> - # to be honest this might also be a retransmission - Logger.debug("Keepalive on selected pair: #{pair.discovered_pair_id}") - ice_agent - - %CandidatePair{} -> - # keepalive/retransmission? - ice_agent + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, pair, msg) + + %CandidatePair{} = checklist_pair -> + checklist_pair = %CandidatePair{checklist_pair | last_seen: pair.last_seen} + checklist = Map.put(ice_agent.checklist, checklist_pair.id, checklist_pair) + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, checklist_pair, msg) end - |> ICEAgent.send_binding_success_response(pair, msg) end @impl true @@ -43,32 +38,42 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlled do pair = %CandidatePair{pair | nominate?: true} checklist = Map.put(ice_agent.checklist, pair.id, pair) - %ICEAgent{ice_agent | checklist: checklist} - %CandidatePair{} = pair - when ice_agent.selected_pair != nil and - pair.discovered_pair_id == ice_agent.selected_pair.id -> - Logger.debug("Keepalive on selected pair: #{pair.id}") - ice_agent - - %CandidatePair{} = pair -> - if pair.state == :succeeded do - Logger.debug("Nomination request on pair: #{pair.id}.") - update_nominated_flag(ice_agent, pair.discovered_pair_id, true) + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, pair, msg) + + %CandidatePair{} = checklist_pair -> + if checklist_pair.state == :succeeded do + discovered_pair = Map.fetch!(ice_agent.checklist, checklist_pair.discovered_pair_id) + discovered_pair = %CandidatePair{discovered_pair | last_seen: pair.last_seen} + ice_agent = put_in(ice_agent.checklist[discovered_pair.id], discovered_pair) + + if ice_agent.selected_pair_id == nil do + Logger.debug("Nomination request on pair: #{discovered_pair.id}.") + update_nominated_flag(ice_agent, discovered_pair.id, true) + else + ice_agent + end + |> ICEAgent.send_binding_success_response(discovered_pair, msg) else # TODO should we check if this pair is not in failed? Logger.debug(""" Nomination request on pair that hasn't been verified yet. We will nominate pair once conn check passes. - Pair: #{inspect(pair.id)} + Pair: #{inspect(checklist_pair.id)} """) - pair = %CandidatePair{pair | nominate?: true} - checklist = Map.put(ice_agent.checklist, pair.id, pair) - %ICEAgent{ice_agent | checklist: checklist} + checklist_pair = %CandidatePair{ + checklist_pair + | nominate?: true, + last_seen: pair.last_seen + } + + checklist = Map.put(ice_agent.checklist, checklist_pair.id, checklist_pair) + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, checklist_pair, msg) end end - |> ICEAgent.send_binding_success_response(pair, msg) end @impl true @@ -85,20 +90,26 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlled do ice_agent = %ICEAgent{ice_agent | checklist: checklist} cond do - ice_agent.selected_pair == nil -> + ice_agent.selected_pair_id == nil -> Logger.debug("Selecting pair: #{pair_id}") - %ICEAgent{ice_agent | selected_pair: pair} + %ICEAgent{ice_agent | selected_pair_id: pair.id} - ice_agent.selected_pair != nil and pair.priority >= ice_agent.selected_pair.priority -> - Logger.debug(""" - Selecting new pair with higher priority. \ - New pair: #{pair_id}, old pair: #{ice_agent.selected_pair.id}.\ - """) + ice_agent.selected_pair_id != nil and pair.id != ice_agent.selected_pair_id -> + selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id) - %ICEAgent{ice_agent | selected_pair: pair} + if pair.priority >= selected_pair.priority do + Logger.debug(""" + Selecting new pair with higher priority. \ + New pair: #{pair_id}, old pair: #{ice_agent.selected_pair_id}.\ + """) + + %ICEAgent{ice_agent | selected_pair_id: pair.id} + else + ice_agent + end true -> - Logger.debug("Not selecting a new pair as it has lower priority") + Logger.debug("Not selecting a new pair as it has lower priority or has the same id") ice_agent end end diff --git a/lib/ex_ice/priv/conn_check_handler/controlling.ex b/lib/ex_ice/priv/conn_check_handler/controlling.ex index 70b15d0..582cd65 100644 --- a/lib/ex_ice/priv/conn_check_handler/controlling.ex +++ b/lib/ex_ice/priv/conn_check_handler/controlling.ex @@ -26,20 +26,15 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlling do nil -> Logger.debug("Adding new candidate pair: #{inspect(pair)}") checklist = Map.put(ice_agent.checklist, pair.id, pair) - %ICEAgent{ice_agent | checklist: checklist} - - %CandidatePair{} = pair - when ice_agent.selected_pair != nil and - pair.discovered_pair_id == ice_agent.selected_pair.id -> - # to be honest this might also be a retransmission - Logger.debug("Keepalive on selected pair: #{pair.discovered_pair_id}") - ice_agent - - %CandidatePair{} -> - # keepalive/retransmission? - ice_agent + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, pair, msg) + + %CandidatePair{} = checklist_pair -> + checklist_pair = %CandidatePair{checklist_pair | last_seen: pair.last_seen} + checklist = Map.put(ice_agent.checklist, checklist_pair.id, checklist_pair) + ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ICEAgent.send_binding_success_response(ice_agent, checklist_pair, msg) end - |> ICEAgent.send_binding_success_response(pair, msg) end @impl true @@ -61,6 +56,6 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlling do Logger.warning("Nomination succeeded but checklist hasn't finished.") end - %ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair: pair} + %ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair_id: pair.id} end end diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 258dc07..40b1dab 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -20,13 +20,23 @@ defmodule ExICE.Priv.ICEAgent do alias ExSTUN.Message.Type alias ExSTUN.Message.Attribute.{ErrorCode, Username, XORMappedAddress} - # Ta timeout in ms + # Ta timeout in ms. @ta_timeout 50 - # transaction timeout in ms - # see appendix B.1 + # Transaction timeout in ms. + # See appendix B.1. @hto 500 + # Pair timeout in ms. + # If we don't receive any data in this time, + # a pair is marked as faield. + @pair_timeout 5_000 + + # End-of-candidates timeout in ms. + # If we don't receive end-of-candidates indication in this time, + # we will set it on our own. + @eoc_timeout 10_000 + @conn_check_handler %{ controlling: ConnCheckHandler.Controlling, controlled: ConnCheckHandler.Controlled @@ -50,9 +60,10 @@ defmodule ExICE.Priv.ICEAgent do :gatherer, :ice_transport_policy, :ta_timer, + :eoc_timer, :role, :tiebreaker, - :selected_pair, + :selected_pair_id, :local_ufrag, :local_pwd, :remote_ufrag, @@ -90,6 +101,8 @@ defmodule ExICE.Priv.ICEAgent do transport_module = opts[:transport_module] || Transport.UDP ip_filter = opts[:ip_filter] || fn _ -> true end + start_pair_timer() + %__MODULE__{ controlling_process: controlling_process, on_connection_state_change: opts[:on_connection_state_change] || controlling_process, @@ -181,6 +194,10 @@ defmodule ExICE.Priv.ICEAgent do pwd ) do Logger.debug("Setting remote credentials: #{inspect(ufrag)}:#{inspect(pwd)}") + # This is very loosely based on RFC 8863, sec. 4. + # We can start eoc timer after sending and receiving ICE credentials. + # In our case, we do this just after receiving remote credentials. + ice_agent = start_eoc_timer(ice_agent) %__MODULE__{ice_agent | remote_ufrag: ufrag, remote_pwd: pwd} end @@ -358,7 +375,9 @@ defmodule ExICE.Priv.ICEAgent do def send_data(%__MODULE__{state: state} = ice_agent, data) when state in [:connected, :completed] do %CandidatePair{} = - pair = ice_agent.selected_pair || Checklist.get_valid_pair(ice_agent.checklist) + pair = + Map.get(ice_agent.checklist, ice_agent.selected_pair_id) || + Checklist.get_valid_pair(ice_agent.checklist) local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) @@ -393,8 +412,8 @@ defmodule ExICE.Priv.ICEAgent do do_restart(ice_agent) end - @spec handle_timeout(t()) :: t() - def handle_timeout(%__MODULE__{state: state} = ice_agent) + @spec handle_ta_timeout(t()) :: t() + def handle_ta_timeout(%__MODULE__{state: state} = ice_agent) when state.state in [:completed, :failed] do Logger.warning(""" Ta timer fired in unexpected state: #{state}. @@ -407,7 +426,7 @@ defmodule ExICE.Priv.ICEAgent do |> update_ta_timer() end - def handle_timeout(ice_agent) do + def handle_ta_timeout(ice_agent) do ice_agent = ice_agent |> timeout_pending_transactions() @@ -422,6 +441,7 @@ defmodule ExICE.Priv.ICEAgent do case Checklist.get_next_pair(ice_agent.checklist) do %CandidatePair{} = pair -> Logger.debug("Sending conn check on pair: #{inspect(pair.id)}") + pair = %CandidatePair{pair | last_seen: now()} ice_agent = send_conn_check(ice_agent, pair) {true, ice_agent} @@ -452,17 +472,74 @@ defmodule ExICE.Priv.ICEAgent do end end + @spec handle_eoc_timeout(t()) :: t() + def handle_eoc_timeout(%{eoc: true} = ice_agent) do + Logger.debug("EOC timer fired but EOC flag is already set. Ignoring.") + %{ice_agent | eoc_timer: nil} + end + + def handle_eoc_timeout(ice_agent) do + Logger.debug("EOC timer fired. Setting EOC flag.") + ice_agent = %{ice_agent | eoc_timer: nil, eoc: true} + update_connection_state(ice_agent) + end + + @spec handle_pair_timeout(t()) :: t() + def handle_pair_timeout(ice_agent) do + start_pair_timer() + + # only take final pairs i.e. those that are actually used + pairs = + ice_agent.checklist + |> Map.values() + |> Stream.filter(fn pair -> pair.state == :succeeded end) + |> Enum.filter(fn pair -> pair.id == pair.discovered_pair_id end) + + timeout_pairs(ice_agent, pairs, now()) + |> update_connection_state() + end + + defp timeout_pairs(ice_agent, [], _now), do: ice_agent + + defp timeout_pairs(ice_agent, [%{last_seen: nil} | pairs], now) do + timeout_pairs(ice_agent, pairs, now) + end + + defp timeout_pairs(ice_agent, [pair | pairs], now) do + if now - pair.last_seen >= @pair_timeout do + Logger.debug(""" + Pair: #{pair.id} didn't receive any data in #{@pair_timeout}ms. \ + Marking as failed.\ + """) + + checklist = Checklist.timeout_pairs(ice_agent.checklist, [pair.id, pair.succeeded_pair_id]) + ice_agent = %{ice_agent | checklist: checklist} + + ice_agent = + if ice_agent.selected_pair_id == pair.id do + %{ice_agent | selected_pair_id: nil} + else + ice_agent + end + + timeout_pairs(ice_agent, pairs, now) + else + timeout_pairs(ice_agent, pairs, now) + end + end + @spec handle_keepalive(t(), integer()) :: t() - def handle_keepalive(%__MODULE__{selected_pair: s_pair} = ice_agent, id) - when not is_nil(s_pair) and s_pair.id == id do + def handle_keepalive(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, id) + when s_pair_id == id do # if pair was selected, send keepalives only on that pair + s_pair = Map.fetch!(ice_agent.checklist, id) pair = CandidatePair.schedule_keepalive(s_pair) ice_agent = %__MODULE__{ice_agent | checklist: Map.put(ice_agent.checklist, id, pair)} send_keepalive(ice_agent, ice_agent.checklist[id]) end - def handle_keepalive(%__MODULE__{selected_pair: s_pair} = ice_agent, _id) - when not is_nil(s_pair) do + def handle_keepalive(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, _id) + when not is_nil(s_pair_id) do # note: current implementation assumes that, if selected pair exists, none of the already existing # valid pairs will ever become selected (only new appearing valid pairs) # that's why there's no call to `CandidatePair.schedule_keepalive/1` @@ -509,7 +586,26 @@ defmodule ExICE.Priv.ICEAgent do handle_stun_message(ice_agent, socket, src_ip, src_port, packet) true -> - handle_data_message(ice_agent, packet) + local_cand = find_host_cand(Map.values(ice_agent.local_cands), socket) + remote_cand = find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) + + %CandidatePair{} = + pair = Checklist.find_pair(ice_agent.checklist, local_cand.base.id, remote_cand.id) + + case pair.state do + :succeeded -> + pair = Map.fetch!(ice_agent.checklist, pair.discovered_pair_id) + handle_data_message(ice_agent, pair, packet) + + :failed -> + Logger.error("Received data on failed pair! Ignoring. Pair id: #{pair.id}") + ice_agent + + _other -> + # We might receive data on a pair that we haven't checked + # on our side yet. + handle_data_message(ice_agent, pair, packet) + end end end @@ -656,8 +752,7 @@ defmodule ExICE.Priv.ICEAgent do case Gatherer.gather_srflx_candidate(ice_agent.gatherer, tr.t_id, tr.socket, stun_server.url) do :ok -> - now = System.monotonic_time(:millisecond) - tr = %{tr | state: :in_progress, send_time: now} + tr = %{tr | state: :in_progress, send_time: now()} gathering_transactions = Map.put(ice_agent.gathering_transactions, tr.t_id, tr) ice_agent = %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} {:ok, ice_agent} @@ -689,8 +784,7 @@ defmodule ExICE.Priv.ICEAgent do case ice_agent.transport_module.send(tr.socket, turn_addr, data) do :ok -> - now = System.monotonic_time(:millisecond) - tr = %{tr | state: :in_progress, send_time: now} + tr = %{tr | state: :in_progress, send_time: now()} ice_agent = put_in(ice_agent.gathering_transactions[tr.t_id], tr) ice_agent = update_gathering_state(ice_agent) {:ok, ice_agent} @@ -704,7 +798,7 @@ defmodule ExICE.Priv.ICEAgent do end defp timeout_pending_transactions(ice_agent) do - now = System.monotonic_time(:millisecond) + now = now() ice_agent = timeout_gathering_transactions(ice_agent, now) timeout_conn_checks(ice_agent, now) end @@ -819,7 +913,9 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end else - handle_data_message(ice_agent, packet) + remote_cand = find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) + pair = Checklist.find_pair(ice_agent.checklist, cand.base.id, remote_cand.id) + handle_data_message(ice_agent, pair, packet) end {:error, _reason, cand} -> @@ -846,7 +942,10 @@ defmodule ExICE.Priv.ICEAgent do end end - defp handle_data_message(ice_agent, packet) do + defp handle_data_message(ice_agent, pair, packet) do + pair = %{pair | last_seen: now()} + ice_agent = put_in(ice_agent.checklist[pair.id], pair) + notify(ice_agent.on_data, {:data, packet}) %{ @@ -945,7 +1044,8 @@ defmodule ExICE.Priv.ICEAgent do {remote_cand, ice_agent} = get_or_create_remote_cand(ice_agent, src_ip, src_port, prio_attr) - pair = CandidatePair.new(local_cand, remote_cand, ice_agent.role, :waiting) + pair = + CandidatePair.new(local_cand, remote_cand, ice_agent.role, :waiting, last_seen: now()) @conn_check_handler[ice_agent.role].handle_conn_check_request( ice_agent, @@ -1144,6 +1244,7 @@ defmodule ExICE.Priv.ICEAgent do add_valid_pair(ice_agent, valid_pair, conn_check_pair, checklist_pair) pair = CandidatePair.schedule_keepalive(ice_agent.checklist[pair_id]) + pair = %CandidatePair{pair | last_seen: now()} checklist = Map.put(ice_agent.checklist, pair_id, pair) ice_agent = %__MODULE__{ice_agent | checklist: checklist} @@ -1542,11 +1643,18 @@ defmodule ExICE.Priv.ICEAgent do defp close_candidate(ice_agent, local_cand) do local_cands = Map.delete(ice_agent.local_cands, local_cand.base.id) - selected_pair = - if ice_agent.selected_pair != nil and - ice_agent.selected_pair.local_cand_id == local_cand.base.id, - do: nil, - else: ice_agent.selected_pair + selected_pair_id = + if ice_agent.selected_pair_id != nil do + selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id) + + if selected_pair.local_cand_id == local_cand.base.id do + nil + else + ice_agent.selected_pair_id + end + else + ice_agent.selected_pair_id + end nominating? = case ice_agent.nominating? do @@ -1566,7 +1674,7 @@ defmodule ExICE.Priv.ICEAgent do %{ ice_agent | local_cands: local_cands, - selected_pair: selected_pair, + selected_pair_id: selected_pair_id, checklist: Checklist.prune(ice_agent.checklist, local_cand), nominating?: nominating? } @@ -1639,12 +1747,16 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end - ice_agent = change_gathering_state(ice_agent, :new) + ice_agent = + ice_agent + |> change_gathering_state(:new) + |> cancel_eoc_timer() + |> start_eoc_timer() %__MODULE__{ ice_agent | gathering_transactions: %{}, - selected_pair: nil, + selected_pair_id: nil, conn_checks: %{}, checklist: %{}, local_cands: %{}, @@ -1845,7 +1957,7 @@ defmodule ExICE.Priv.ICEAgent do # This seems to be compliant with libwebrtc. ice_agent.role == :controlled and ice_agent.eoc == true and ice_agent.gathering_state == :complete and - ice_agent.selected_pair != nil and Checklist.finished?(ice_agent.checklist) -> + ice_agent.selected_pair_id != nil and Checklist.finished?(ice_agent.checklist) -> Logger.debug(""" Finished all conn checks, there won't be any further local or remote candidates and we have selected pair. Changing connection state to completed. @@ -1853,7 +1965,7 @@ defmodule ExICE.Priv.ICEAgent do change_connection_state(ice_agent, :completed) - ice_agent.role == :controlling and ice_agent.selected_pair != nil -> + ice_agent.role == :controlling and ice_agent.selected_pair_id != nil -> change_connection_state(ice_agent, :completed) ice_agent.role == :controlling and match?({true, _pair_id}, ice_agent.nominating?) and @@ -1886,10 +1998,10 @@ defmodule ExICE.Priv.ICEAgent do end defp update_connection_state(%__MODULE__{state: :completed} = ice_agent) do - if ice_agent.selected_pair == nil do + if ice_agent.selected_pair_id == nil do Logger.debug(""" No selected pair in state completed. Looks like we lost the selected pair. - Changing connection state to failed. + Changing connection state to failed.\ """) change_connection_state(ice_agent, :failed) @@ -1923,6 +2035,29 @@ defmodule ExICE.Priv.ICEAgent do end end + defp start_pair_timer() do + Process.send_after(self(), :pair_timeout, div(@pair_timeout, 2)) + end + + defp start_eoc_timer(ice_agent) do + timer = Process.send_after(self(), :eoc_timeout, @eoc_timeout) + %{ice_agent | eoc_timer: timer} + end + + defp cancel_eoc_timer(%{eoc_timer: nil} = ice_agent), do: ice_agent + + defp cancel_eoc_timer(ice_agent) do + Process.cancel_timer(ice_agent.eoc_timer) + + receive do + :eoc_timeout -> :ok + after + 0 -> :ok + end + + %{ice_agent | eoc_timer: nil} + end + defp work_to_do?(ice_agent) when ice_agent.state in [:completed, :failed], do: false defp work_to_do?(ice_agent) do @@ -2014,7 +2149,7 @@ defmodule ExICE.Priv.ICEAgent do conn_check = %{ pair_id: pair.id, - send_time: System.monotonic_time(:millisecond) + send_time: now() } conn_checks = Map.put(ice_agent.conn_checks, req.transaction_id, conn_check) @@ -2044,4 +2179,6 @@ defmodule ExICE.Priv.ICEAgent do defp notify(nil, _msg), do: :ok defp notify(dst, msg), do: send(dst, {:ex_ice, self(), msg}) + + defp now(), do: System.monotonic_time(:millisecond) end diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index 68936f6..dc1028c 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -425,7 +425,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "request", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) assert packet = Transport.Mock.recv(socket) assert is_binary(packet) @@ -447,7 +447,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "success response", %{ice_agent: ice_agent, remote_cand: remote_cand} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_binding_request(socket, ice_agent.remote_pwd) @@ -477,7 +477,7 @@ defmodule ExICE.Priv.ICEAgentTest do } do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) <> = ice_agent.remote_pwd invalid_remote_pwd = <> @@ -509,7 +509,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "bad request error response", %{ice_agent: ice_agent, remote_cand: remote_cand} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_binding_request(socket, ice_agent.remote_pwd) @@ -535,7 +535,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "unauthenticated error response", %{ice_agent: ice_agent, remote_cand: remote_cand} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_binding_request(socket, ice_agent.remote_pwd) @@ -561,7 +561,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "response from non-symmetric address", %{ice_agent: ice_agent, remote_cand: remote_cand} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_binding_request(socket, ice_agent.remote_pwd) @@ -595,6 +595,51 @@ defmodule ExICE.Priv.ICEAgentTest do end end + test "pair timeout" do + # 1. make ice agent connected + # 2. mock the time a pair has received something from the peer + # 3. trigger pair timeout + # 4. assert that the pair has been marked as failed + # 5. trigger eoc timeout and assert that ice agent moved to the failed state + remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) + + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + if_discovery_module: IfDiscovery.Mock, + transport_module: Transport.Mock + ) + |> ICEAgent.set_remote_credentials("someufrag", "somepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(ExICE.Candidate.marshal(remote_cand)) + + # Make sure we are not gathering local candidates. + # That's important for moving to the failed state later on. + assert ice_agent.gathering_state == :complete + + # make ice_agent connected + ice_agent = connect(ice_agent) + + # mock last_seen field + [pair] = Map.values(ice_agent.checklist) + last_seen = System.monotonic_time(:millisecond) - 5_000 + pair = %{pair | last_seen: last_seen} + ice_agent = put_in(ice_agent.checklist[pair.id], pair) + + # trigger pair timeout + ice_agent = ICEAgent.handle_pair_timeout(ice_agent) + + # assert that the pair is marked as failed + assert [%CandidatePair{state: :failed}] = Map.values(ice_agent.checklist) + + # trigger eoc timeout + ice_agent = ICEAgent.handle_eoc_timeout(ice_agent) + + # assert ice agent moved to the failed state + assert ice_agent.state == :failed + end + @stun_ip {192, 168, 0, 3} @stun_ip_str :inet.ntoa(@stun_ip) @stun_port 19_302 @@ -614,7 +659,7 @@ defmodule ExICE.Priv.ICEAgentTest do [socket] = ice_agent.sockets - # assert no transactions are started until handle_timeout is called + # assert no transactions are started until handle_ta_timeout is called assert nil == Transport.Mock.recv(socket) %{ice_agent: ice_agent} @@ -627,7 +672,7 @@ defmodule ExICE.Priv.ICEAgentTest do srflx_port = sock_port + 1 # assert ice agent started gathering transaction by sending a binding request - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) assert packet = Transport.Mock.recv(socket) assert {:ok, req} = ExSTUN.Message.decode(packet) assert req.type.class == :request @@ -658,7 +703,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "error response", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) assert packet = Transport.Mock.recv(socket) assert {:ok, req} = ExSTUN.Message.decode(packet) @@ -714,7 +759,7 @@ defmodule ExICE.Priv.ICEAgentTest do [socket] = ice_agent.sockets - # assert no transactions are started until handle_timeout is called + # assert no transactions are started until handle_ta_timeout is called assert nil == Transport.Mock.recv(socket) %{ice_agent: ice_agent} @@ -724,7 +769,7 @@ defmodule ExICE.Priv.ICEAgentTest do [socket] = ice_agent.sockets # assert ice agent started gathering transaction by sending an allocate request - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_allocate_request(socket) # TURN uses long-term authentication mechanism @@ -758,7 +803,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "error response", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_allocate_request(socket) @@ -793,7 +838,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "invalid response", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_allocate_request(socket) @@ -829,7 +874,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "ex_turn timeout", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_allocate_request(socket) @@ -891,33 +936,10 @@ defmodule ExICE.Priv.ICEAgentTest do |> ICEAgent.gather_candidates() |> ICEAgent.add_remote_candidate(ExICE.Candidate.marshal(remote_cand)) - [socket] = ice_agent.sockets - assert ice_agent.gathering_state == :complete # make ice_agent connected - ice_agent = ICEAgent.handle_timeout(ice_agent) - req = read_binding_request(socket, ice_agent.remote_pwd) - - resp = - binding_response( - req.transaction_id, - ice_agent.transport_module, - socket, - ice_agent.remote_pwd - ) - - ice_agent = - ICEAgent.handle_udp( - ice_agent, - socket, - remote_cand.address, - remote_cand.port, - resp - ) - - assert [%CandidatePair{state: :succeeded}] = Map.values(ice_agent.checklist) - assert ice_agent.state == :connected + ice_agent = connect(ice_agent) # replace candidate with the mock one [local_cand] = Map.values(ice_agent.local_cands) @@ -960,7 +982,7 @@ defmodule ExICE.Priv.ICEAgentTest do [socket] = ice_agent.sockets # create relay candidate - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) req = read_allocate_request(socket) resp = allocate_error_response(req.transaction_id) ice_agent = ICEAgent.handle_udp(ice_agent, socket, @turn_ip, @turn_port, resp) @@ -975,7 +997,7 @@ defmodule ExICE.Priv.ICEAgentTest do |> Enum.find(&(&1.base.type == :relay)) # assert client sends create permission request - ice_agent = ICEAgent.handle_timeout(ice_agent) + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) assert packet = Transport.Mock.recv(socket) assert {:ok, req} = ExSTUN.Message.decode(packet) assert req.type.class == :request @@ -1072,6 +1094,35 @@ defmodule ExICE.Priv.ICEAgentTest do assert <<_channel_number::16, _len::16, "somedata">> = packet end + defp connect(ice_agent) do + [socket] = ice_agent.sockets + [remote_cand] = Map.values(ice_agent.remote_cands) + + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + req = read_binding_request(socket, ice_agent.remote_pwd) + + resp = + binding_response( + req.transaction_id, + ice_agent.transport_module, + socket, + ice_agent.remote_pwd + ) + + ice_agent = + ICEAgent.handle_udp( + ice_agent, + socket, + remote_cand.address, + remote_cand.port, + resp + ) + + assert [%CandidatePair{state: :succeeded}] = Map.values(ice_agent.checklist) + assert ice_agent.state == :connected + ice_agent + end + defp binding_response(t_id, transport_module, socket, remote_pwd) do {:ok, {sock_ip, sock_port}} = transport_module.sockname(socket) From 7ed7cac4ec4d5abf1eac977e8ee6293dc11fe458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Tue, 7 May 2024 10:02:09 +0200 Subject: [PATCH 2/2] Improve log --- lib/ex_ice/priv/ice_agent.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 40b1dab..e8ab3eb 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -506,9 +506,11 @@ defmodule ExICE.Priv.ICEAgent do end defp timeout_pairs(ice_agent, [pair | pairs], now) do - if now - pair.last_seen >= @pair_timeout do + diff = now - pair.last_seen + + if diff >= @pair_timeout do Logger.debug(""" - Pair: #{pair.id} didn't receive any data in #{@pair_timeout}ms. \ + Pair: #{pair.id} didn't receive any data in #{diff}ms. \ Marking as failed.\ """)