From c3538f8406504ddeaf5f18128477d7dbf2f86fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 19 Sep 2024 13:04:02 +0200 Subject: [PATCH] Fix sending and handling keepalives --- lib/ex_ice/ice_agent.ex | 4 +- lib/ex_ice/priv/candidate_pair.ex | 2 +- lib/ex_ice/priv/ice_agent.ex | 166 ++++++++++++++++++++---------- test/priv/ice_agent_test.exs | 144 ++++++++++++++++++++++++-- 4 files changed, 251 insertions(+), 65 deletions(-) diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index c65149e..686900f 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -381,8 +381,8 @@ defmodule ExICE.ICEAgent do end @impl true - def handle_info({:keepalive, id}, state) do - ice_agent = ExICE.Priv.ICEAgent.handle_keepalive(state.ice_agent, id) + def handle_info({:keepalive_timeout, id}, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_keepalive_timeout(state.ice_agent, id) {:noreply, %{state | ice_agent: ice_agent}} end diff --git a/lib/ex_ice/priv/candidate_pair.ex b/lib/ex_ice/priv/candidate_pair.ex index eeabb33..51f19cf 100644 --- a/lib/ex_ice/priv/candidate_pair.ex +++ b/lib/ex_ice/priv/candidate_pair.ex @@ -66,7 +66,7 @@ defmodule ExICE.Priv.CandidatePair do end def schedule_keepalive(pair, dest) do - ref = Process.send_after(dest, {:keepalive, pair.id}, @tr_timeout) + ref = Process.send_after(dest, {:keepalive_timeout, pair.id}, @tr_timeout) %{pair | keepalive_timer: ref} end diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 979cc88..b6ad6dc 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -673,8 +673,8 @@ defmodule ExICE.Priv.ICEAgent do end end - @spec handle_keepalive(t(), integer()) :: t() - def handle_keepalive(%__MODULE__{selected_pair_id: id} = ice_agent, id) do + @spec handle_keepalive_timeout(t(), integer()) :: t() + def handle_keepalive_timeout(%__MODULE__{selected_pair_id: id} = ice_agent, id) do # if pair was selected, send keepalives only on that pair s_pair = Map.fetch!(ice_agent.checklist, id) pair = CandidatePair.schedule_keepalive(s_pair) @@ -682,7 +682,7 @@ defmodule ExICE.Priv.ICEAgent do send_keepalive(ice_agent, ice_agent.checklist[id]) end - def handle_keepalive(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, _id) + def handle_keepalive_timeout(%__MODULE__{selected_pair_id: s_pair_id} = ice_agent, _id) when not is_nil(s_pair_id) do # note: current implementation assumes that, if selected pair exists, none of the already existing # valid pairs will ever become selected (only new appearing valid pairs) @@ -690,7 +690,7 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end - def handle_keepalive(ice_agent, id) do + def handle_keepalive_timeout(ice_agent, id) do # TODO: keepalives should be sent only if no data has been sent for @tr_timeout # atm, we send keepalives anyways, also it might be better to pace them with ta_timer # TODO: candidates not in a valid pair also should be kept alive (RFC 8445, sect 5.1.1.4) @@ -1246,16 +1246,7 @@ defmodule ExICE.Priv.ICEAgent do %Type{class: class, method: :binding} when is_response(class) and is_map_key(ice_agent.keepalives, msg.transaction_id) -> # TODO: this a good basis to implement consent freshness - Logger.debug(""" - Received keepalive response from from #{inspect({src_ip, src_port})}, \ - on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})} \ - """) - - {pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id]) - - pair = Map.fetch!(ice_agent.checklist, pair_id) - pair = %CandidatePair{pair | last_seen: now()} - put_in(ice_agent.checklist[pair.id], pair) + handle_keepalive_response(ice_agent, local_cand, src_ip, src_port, msg) %Type{class: class, method: :binding} when is_response(class) -> Logger.warning(""" @@ -1475,7 +1466,6 @@ defmodule ExICE.Priv.ICEAgent do end ## BINDING RESPONSE HANDLING ## - defp handle_conn_check_response(ice_agent, local_cand, src_ip, src_port, msg) do {%{pair_id: pair_id}, conn_checks} = Map.pop!(ice_agent.conn_checks, msg.transaction_id) ice_agent = %__MODULE__{ice_agent | conn_checks: conn_checks} @@ -1647,6 +1637,66 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end + defp handle_keepalive_response( + ice_agent, + local_cand, + src_ip, + src_port, + %Message{type: %Type{class: :success_response}} = msg + ) do + {pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id]) + pair = Map.fetch!(ice_agent.checklist, pair_id) + + with true <- symmetric?(ice_agent, local_cand.base.socket, {src_ip, src_port}, pair), + :ok <- authenticate_msg(msg, ice_agent.remote_pwd) do + Logger.debug("Received keepalive success response on: #{pair_info(ice_agent, pair)}") + pair = %CandidatePair{pair | last_seen: now()} + put_in(ice_agent.checklist[pair.id], pair) + else + false -> + ka_local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + ka_remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + + Logger.warning(""" + Ignoring keepalive success response, non-symmetric src and dst addresses. + Sent from: #{inspect({ka_local_cand.base.base_address, ka_local_cand.base.base_port})}, \ + to: #{inspect({ka_remote_cand.address, ka_remote_cand.port})} + Recv from: #{inspect({src_ip, src_port})}, on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})} \ + Not refreshing last_seen time. \ + """) + + ice_agent + + {:error, reason} -> + Logger.debug(""" + Couldn't authenticate keepalive success response, reason: #{reason}. \ + Not refreshing last_seen time.\ + """) + + ice_agent + end + end + + defp handle_keepalive_response( + ice_agent, + local_cand, + src_ip, + src_port, + %Message{type: %Type{class: :error_response}} = msg + ) do + {pair_id, ice_agent} = pop_in(ice_agent.keepalives[msg.transaction_id]) + pair = Map.fetch!(ice_agent.checklist, pair_id) + + Logger.debug(""" + Received keepalive error response from #{inspect({src_ip, src_port})}, \ + on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})}. \ + pair: #{pair_info(ice_agent, pair)} \ + Not refreshing last_seen time. \ + """) + + ice_agent + end + # Adds valid pair according to sec 7.2.5.3.2 # TODO sec. 7.2.5.3.3 # The agent MUST set the states for all other Frozen candidate pairs in @@ -2090,8 +2140,19 @@ defmodule ExICE.Priv.ICEAgent do {ufrag, pwd} end - defp authenticate_msg(msg, local_pwd) do - with :ok <- Message.authenticate(msg, local_pwd), + defp pair_info(ice_agent, pair) do + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + + """ + #{pair.id} \ + l: #{:inet.ntoa(local_cand.base.address)}:#{local_cand.base.port} \ + r: #{:inet.ntoa(remote_cand.address)}:#{remote_cand.port} \ + """ + end + + defp authenticate_msg(msg, pwd) do + with :ok <- Message.authenticate(msg, pwd), :ok <- Message.check_fingerprint(msg) do :ok else @@ -2402,17 +2463,11 @@ defmodule ExICE.Priv.ICEAgent do end defp send_keepalive(ice_agent, pair) do - Logger.debug("Sending keepalive") + Logger.debug("Sending keepalive on #{pair_info(ice_agent, pair)}") local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) - type = %Type{class: :request, method: :binding} - - req = - type - |> Message.new() - |> Message.with_integrity(ice_agent.remote_pwd) - |> Message.with_fingerprint() + req = binding_request(ice_agent, false) dst = {remote_cand.address, remote_cand.port} @@ -2430,39 +2485,10 @@ defmodule ExICE.Priv.ICEAgent do local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) - type = %Type{class: :request, method: :binding} - - role_attr = - if ice_agent.role == :controlling do - %ICEControlling{tiebreaker: ice_agent.tiebreaker} - else - %ICEControlled{tiebreaker: ice_agent.tiebreaker} - end - - # priority sent to the other side has to be - # computed with the candidate type preference of - # peer-reflexive; refer to sec 7.1.1 - priority = Candidate.priority(:prflx) - - attrs = [ - %Username{value: "#{ice_agent.remote_ufrag}:#{ice_agent.local_ufrag}"}, - %Priority{priority: priority}, - role_attr - ] - # we can nominate only when being the controlling agent # the controlled agent uses nominate? flag according to 7.3.1.5 - attrs = - if pair.nominate? and ice_agent.role == :controlling do - attrs ++ [%UseCandidate{}] - else - attrs - end - - req = - Message.new(type, attrs) - |> Message.with_integrity(ice_agent.remote_pwd) - |> Message.with_fingerprint() + nominate = pair.nominate? and ice_agent.role == :controlling + req = binding_request(ice_agent, nominate) raw_req = Message.encode(req) @@ -2489,6 +2515,34 @@ defmodule ExICE.Priv.ICEAgent do end end + defp binding_request(ice_agent, nominate) do + type = %Type{class: :request, method: :binding} + + role_attr = + if ice_agent.role == :controlling do + %ICEControlling{tiebreaker: ice_agent.tiebreaker} + else + %ICEControlled{tiebreaker: ice_agent.tiebreaker} + end + + # priority sent to the other side has to be + # computed with the candidate type preference of + # peer-reflexive; refer to sec 7.1.1 + priority = Candidate.priority(:prflx) + + attrs = [ + %Username{value: "#{ice_agent.remote_ufrag}:#{ice_agent.local_ufrag}"}, + %Priority{priority: priority}, + role_attr + ] + + attrs = if nominate, do: attrs ++ [%UseCandidate{}], else: attrs + + Message.new(type, attrs) + |> Message.with_integrity(ice_agent.remote_pwd) + |> Message.with_fingerprint() + end + defp do_send(ice_agent, %cand_mod{} = local_cand, dst, data, retry \\ true) do {dst_ip, dst_port} = dst diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index 1cc627a..2b663a3 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -277,7 +277,7 @@ defmodule ExICE.Priv.ICEAgentTest do end end - describe "sends keepalives" do + describe "keepalive" do setup do remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) @@ -295,27 +295,159 @@ defmodule ExICE.Priv.ICEAgentTest do %{ice_agent: ice_agent} end - test "on connected pair", %{ice_agent: ice_agent} do + test "timeout on connected pair", %{ice_agent: ice_agent} do ice_agent = connect(ice_agent) [socket] = ice_agent.sockets [pair] = Map.values(ice_agent.checklist) - ice_agent = ICEAgent.handle_keepalive(ice_agent, pair.id) + ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) assert packet = Transport.Mock.recv(socket) assert {:ok, msg} = ExSTUN.Message.decode(packet) assert msg.type == %ExSTUN.Message.Type{class: :request, method: :binding} + + # assert there are required attributes + username = "#{ice_agent.remote_ufrag}:#{ice_agent.local_ufrag}" + assert length(msg.attributes) == 5 + + assert {:ok, %Username{value: ^username}} = ExSTUN.Message.get_attribute(msg, Username) + assert {:ok, %ICEControlling{}} = ExSTUN.Message.get_attribute(msg, ICEControlling) + assert {:ok, %Priority{}} = ExSTUN.Message.get_attribute(msg, Priority) + + # authenticate and check fingerprint assert :ok == ExSTUN.Message.check_fingerprint(msg) assert :ok == ExSTUN.Message.authenticate(msg, ice_agent.remote_pwd) end - test "on unconnected pair", %{ice_agent: ice_agent} do + test "timeout on unconnected pair", %{ice_agent: ice_agent} do [socket] = ice_agent.sockets [pair] = Map.values(ice_agent.checklist) - ICEAgent.handle_keepalive(ice_agent, pair.id) + ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) assert nil == Transport.Mock.recv(socket) end + + test "success response", %{ice_agent: ice_agent} do + ice_agent = connect(ice_agent) + + [socket] = ice_agent.sockets + [remote_cand] = Map.values(ice_agent.remote_cands) + [pair] = Map.values(ice_agent.checklist) + + # trigger keepalive request + ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) + assert packet = Transport.Mock.recv(socket) + assert {:ok, req} = ExSTUN.Message.decode(packet) + + # create a response + resp = + binding_response( + req.transaction_id, + ice_agent.transport_module, + socket, + ice_agent.remote_pwd + ) + + # wait so that we can observe a change in last_seen later on + Process.sleep(1) + + ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, resp) + + [new_pair] = Map.values(ice_agent.checklist) + assert new_pair.last_seen > pair.last_seen + end + + test "invalid success response", %{ice_agent: ice_agent} do + ice_agent = connect(ice_agent) + + [socket] = ice_agent.sockets + [remote_cand] = Map.values(ice_agent.remote_cands) + [pair] = Map.values(ice_agent.checklist) + + # trigger keepalive request + ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) + assert packet = Transport.Mock.recv(socket) + assert {:ok, req} = ExSTUN.Message.decode(packet) + + # create a response using wrong password + resp = + binding_response( + req.transaction_id, + ice_agent.transport_module, + socket, + ice_agent.local_pwd + ) + + # wait so there will be a change in last_seen if something went wrong + Process.sleep(1) + + ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, resp) + + [new_pair] = Map.values(ice_agent.checklist) + assert new_pair.last_seen == pair.last_seen + end + + test "non-symmetric success response", %{ice_agent: ice_agent} do + ice_agent = connect(ice_agent) + + [socket] = ice_agent.sockets + [remote_cand] = Map.values(ice_agent.remote_cands) + [pair] = Map.values(ice_agent.checklist) + + # trigger keepalive request + ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) + assert packet = Transport.Mock.recv(socket) + assert {:ok, req} = ExSTUN.Message.decode(packet) + + # create a response using wrong password + resp = + binding_response( + req.transaction_id, + ice_agent.transport_module, + socket, + ice_agent.remote_pwd + ) + + # wait so there will be a change in last_seen if something went wrong + Process.sleep(1) + + # modify port so that addresses are non-symmetic + ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port + 1, resp) + + [new_pair] = Map.values(ice_agent.checklist) + assert new_pair.last_seen == pair.last_seen + end + + test "error response", %{ice_agent: ice_agent} do + ice_agent = connect(ice_agent) + + [socket] = ice_agent.sockets + [remote_cand] = Map.values(ice_agent.remote_cands) + [pair] = Map.values(ice_agent.checklist) + + # trigger keepalive request + ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) + assert packet = Transport.Mock.recv(socket) + assert {:ok, req} = ExSTUN.Message.decode(packet) + + resp = + Message.new(req.transaction_id, %Type{class: :error_response, method: :binding}, [ + %ErrorCode{code: 400} + ]) + |> Message.encode() + + # wait so there will be a change in last_seen if something went wrong + Process.sleep(1) + + ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, resp) + + [new_pair] = Map.values(ice_agent.checklist) + assert new_pair.last_seen == pair.last_seen + end end describe "incoming binding request" do @@ -1274,7 +1406,7 @@ defmodule ExICE.Priv.ICEAgentTest do assert ice_agent == new_ice_agent # try to handle keepalive on the srflx pair - new_ice_agent = ICEAgent.handle_keepalive(ice_agent, srflx_pair.id) + new_ice_agent = ICEAgent.handle_keepalive_timeout(ice_agent, srflx_pair.id) assert ice_agent == new_ice_agent end