Skip to content

Commit

Permalink
Ignore data frome stale sockets, timers, etc. Clean up state when con…
Browse files Browse the repository at this point in the history
…nection is failed or completed (#55)
  • Loading branch information
mickel8 authored Jul 26, 2024
1 parent bed3d9c commit 852c73a
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 40 deletions.
7 changes: 3 additions & 4 deletions lib/ex_ice/priv/conn_check_handler/controlling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,18 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlling do
@impl true
def update_nominated_flag(%ICEAgent{eoc: true} = ice_agent, pair_id, true) do
Logger.debug("Nomination succeeded. Selecting pair: #{inspect(pair_id)}")
ice_agent = ICEAgent.change_connection_state(ice_agent, :completed)

pair = Map.fetch!(ice_agent.checklist, pair_id)
pair = %CandidatePair{pair | nominate?: false, nominated?: true}
checklist = Map.put(ice_agent.checklist, pair.id, pair)
ice_agent = %ICEAgent{ice_agent | checklist: checklist}
ice_agent = put_in(ice_agent.checklist[pair.id], pair)

# the controlling agent could nominate only when eoc was set
# and checklist finished
unless Checklist.finished?(ice_agent.checklist) do
Logger.warning("Nomination succeeded but checklist hasn't finished.")
end

%ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair_id: pair.id}
ice_agent = %ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair_id: pair.id}
ICEAgent.change_connection_state(ice_agent, :completed)
end
end
210 changes: 174 additions & 36 deletions lib/ex_ice/priv/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec set_remote_credentials(t(), binary(), binary()) :: t()
def set_remote_credentials(%__MODULE__{state: :failed} = ice_agent, _, _) do
Logger.debug("Tried to set remote credentials in failed state. ICE restart needed. Ignoring.")
ice_agent
end

def set_remote_credentials(
%__MODULE__{remote_ufrag: nil, remote_pwd: nil} = ice_agent,
ufrag,
Expand Down Expand Up @@ -249,6 +254,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec gather_candidates(t()) :: t()
def gather_candidates(%__MODULE__{state: :failed} = ice_agent) do
Logger.warning("Can't gather candidates in state failed. ICE restart needed. Ignoring.")
ice_agent
end

def gather_candidates(%__MODULE__{gathering_state: :gathering} = ice_agent) do
Logger.warning("Can't gather candidates. Gathering already in progress. Ignoring.")
ice_agent
Expand Down Expand Up @@ -311,6 +321,12 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec add_remote_candidate(t(), Candidate.t()) :: t()
def add_remote_candidate(%__MODULE__{state: :failed} = ice_agent, _) do
# Completed state will be caught by the next clause
Logger.debug("Can't add remote candidate in state failed. ICE restart needed. Ignoring.")
ice_agent
end

def add_remote_candidate(%__MODULE__{eoc: true} = ice_agent, remote_cand) do
Logger.warning("""
Received remote candidate after end-of-candidates. Ignoring.
Expand Down Expand Up @@ -361,6 +377,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec end_of_candidates(t()) :: t()
def end_of_candidates(%__MODULE__{state: :failed} = ice_agent) do
Logger.debug("Can't set end-of-candidates flag in state failed. Ignoring.")
ice_agent
end

def end_of_candidates(%__MODULE__{role: :controlled} = ice_agent) do
Logger.debug("Setting end-of-candidates flag.")
ice_agent = %{ice_agent | eoc: true}
Expand Down Expand Up @@ -418,7 +439,7 @@ defmodule ExICE.Priv.ICEAgent do

@spec handle_ta_timeout(t()) :: t()
def handle_ta_timeout(%__MODULE__{state: state} = ice_agent)
when state.state in [:completed, :failed] do
when state in [:completed, :failed] do
Logger.warning("""
Ta timer fired in unexpected state: #{state}.
Trying to update gathering and connection states.
Expand Down Expand Up @@ -477,6 +498,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_eoc_timeout(t()) :: t()
def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do
Logger.debug("EOC timer fired but we are in the failed state. Ignoring.")
%{ice_agent | eoc_timer: nil}
end

def handle_eoc_timeout(%{eoc: true} = ice_agent) do
Logger.debug("EOC timer fired but EOC flag is already set. Ignoring.")
%{ice_agent | eoc_timer: nil}
Expand Down Expand Up @@ -564,8 +590,16 @@ defmodule ExICE.Priv.ICEAgent do
{:ok, _pair} ->
ice_agent

:error when ice_agent.state in [:failed, :completed] ->
Logger.warning("""
Received keepalive request for non-existant candidate pair but we are in state: #{ice_agent.state}. \
Ignoring.\
""")

ice_agent

:error ->
Logger.warning("Received keepalive request for non-existant candidate pair")
Logger.warning("Received keepalive request for non-existant candidate pair. Ignoring.")
ice_agent
end
end
Expand All @@ -588,19 +622,58 @@ defmodule ExICE.Priv.ICEAgent do
handle_turn_gathering_transaction_response(ice_agent, turn_tr_id, turn_tr, packet)

from_turn?(ice_agent, src_ip, src_port) ->
handle_turn_message(ice_agent, socket, src_ip, src_port, packet)
local_cands = Map.values(ice_agent.local_cands)

case find_relay_cand_by_socket(local_cands, socket) do
nil ->
Logger.debug("""
Couldn't find relay candidate for:
socket: #{inspect(socket)}
src address: #{inspect({src_ip, src_port})}.
Ignoring incoming TURN message.
""")

ice_agent

relay_cand ->
handle_turn_message(ice_agent, relay_cand, src_ip, src_port, packet)
end

ExSTUN.stun?(packet) ->
handle_stun_message(ice_agent, socket, src_ip, src_port, packet)
local_cands = Map.values(ice_agent.local_cands)

case find_host_cand(local_cands, socket) do
nil ->
Logger.debug("""
Couldn't find host candidate for #{inspect(src_ip)}:#{src_port}. \
Ignoring incoming STUN message.\
""")

ice_agent

host_cand ->
handle_stun_message(ice_agent, host_cand, src_ip, src_port, packet)
end

true ->
local_cand = find_host_cand(Map.values(ice_agent.local_cands), socket)
remote_cand = find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port)
with remote_cands <- Map.values(ice_agent.remote_cands),
%_{} = local_cand <- find_host_cand(Map.values(ice_agent.local_cands), socket),
%_{} = remote_cand <- find_remote_cand(remote_cands, src_ip, src_port) do
%CandidatePair{} =
pair = Checklist.find_pair(ice_agent.checklist, local_cand.base.id, remote_cand.id)

%CandidatePair{} =
pair = Checklist.find_pair(ice_agent.checklist, local_cand.base.id, remote_cand.id)
handle_data_message(ice_agent, pair, packet)
else
_ ->
Logger.debug("""
Couldn't find host or remote candidate for:
socket: #{inspect(socket)}
src address: #{inspect({src_ip, src_port})}.
Ignoring incoming data message.
""")

handle_data_message(ice_agent, pair, packet)
ice_agent
end
end
end

Expand Down Expand Up @@ -900,9 +973,7 @@ defmodule ExICE.Priv.ICEAgent do
end
end

defp handle_turn_message(ice_agent, socket, src_ip, src_port, packet) do
%cand_mod{} = cand = find_relay_cand_by_socket(Map.values(ice_agent.local_cands), socket)

defp handle_turn_message(ice_agent, %cand_mod{} = cand, src_ip, src_port, packet) do
case cand_mod.receive_data(cand, src_ip, src_port, packet) do
{:ok, cand} ->
put_in(ice_agent.local_cands[cand.base.id], cand)
Expand Down Expand Up @@ -935,23 +1006,10 @@ defmodule ExICE.Priv.ICEAgent do
end
end

defp handle_stun_message(ice_agent, socket, src_ip, src_port, packet) do
defp handle_stun_message(ice_agent, host_cand, src_ip, src_port, packet) do
case ExSTUN.Message.decode(packet) do
{:ok, msg} ->
local_cands = Map.values(ice_agent.local_cands)

case find_host_cand(local_cands, socket) do
nil ->
Logger.debug("""
Couldn't find host candidate for #{inspect(src_ip)}:#{src_port}. \
Ignoring incoming STUN message.\
""")

ice_agent

local_cand ->
do_handle_stun_message(ice_agent, local_cand, src_ip, src_port, msg)
end
do_handle_stun_message(ice_agent, host_cand, src_ip, src_port, msg)

{:error, reason} ->
Logger.warning("Couldn't decode stun message: #{inspect(reason)}")
Expand Down Expand Up @@ -1787,14 +1845,10 @@ defmodule ExICE.Priv.ICEAgent do
end

defp do_restart(ice_agent) do
ice_agent.local_cands
|> Enum.uniq_by(fn {_id, cand} -> cand.base.socket end)
|> Enum.each(fn {_id, cand} ->
Logger.debug("""
Closing local candidate's socket: #{inspect(cand.base.base_address)}:#{cand.base.base_port}.
""")

:ok = ice_agent.transport_module.close(cand.base.socket)
Enum.each(ice_agent.sockets, fn socket ->
{:ok, {ip, port}} = ice_agent.transport_module.sockname(socket)
Logger.debug("Closing socket: #{inspect(ip)}:#{port}.")
:ok = ice_agent.transport_module.close(socket)
end)

{ufrag, pwd} = generate_credentials()
Expand All @@ -1816,7 +1870,8 @@ defmodule ExICE.Priv.ICEAgent do

%__MODULE__{
ice_agent
| gathering_transactions: %{},
| sockets: [],
gathering_transactions: %{},
selected_pair_id: nil,
conn_checks: %{},
checklist: %{},
Expand Down Expand Up @@ -1957,7 +2012,88 @@ defmodule ExICE.Priv.ICEAgent do

@doc false
@spec change_connection_state(t(), atom()) :: t()
def change_connection_state(ice_agent, :failed) do
Enum.each(ice_agent.sockets, fn socket ->
:ok = ice_agent.transport_module.close(socket)
end)

%{
ice_agent
| sockets: [],
gathering_transactions: %{},
selected_pair_id: nil,
conn_checks: %{},
checklist: %{},
local_cands: %{},
remote_cands: %{},
local_ufrag: nil,
local_pwd: nil,
remote_ufrag: nil,
remote_pwd: nil,
eoc: false,
nominating?: {false, nil}
}
|> disable_timer()
|> do_change_connection_state(:failed)
end

def change_connection_state(ice_agent, :completed) do
selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id)
succeeded_pair = Map.fetch!(ice_agent.checklist, selected_pair.succeeded_pair_id)

if selected_pair.id != selected_pair.discovered_pair_id do
raise """
Selected pair isn't also discovered pair. This should never happen.
Selected pair: #{inspect(selected_pair)}\
"""
end

local_cand = Map.fetch!(ice_agent.local_cands, succeeded_pair.local_cand_id)

Enum.each(ice_agent.sockets, fn socket ->
unless socket == local_cand.base.socket do
:ok = ice_agent.transport_module.close(socket)
end
end)

# We need to use Map.filter as selected_pair might not
# be the same as succeeded pair
local_cands =
Map.filter(
ice_agent.local_cands,
fn {cand_id, _cand} ->
cand_id in [selected_pair.local_cand_id, succeeded_pair.local_cand_id]
end
)

remote_cands =
Map.filter(
ice_agent.remote_cands,
fn {cand_id, _cand} ->
cand_id in [selected_pair.remote_cand_id, succeeded_pair.remote_cand_id]
end
)

checklist =
Map.filter(ice_agent.checklist, fn {pair_id, _pair} ->
pair_id in [selected_pair.id, succeeded_pair.id]
end)

%{
ice_agent
| sockets: [local_cand.base.socket],
local_cands: local_cands,
remote_cands: remote_cands,
checklist: checklist
}
|> do_change_connection_state(:completed)
end

def change_connection_state(ice_agent, new_conn_state) do
do_change_connection_state(ice_agent, new_conn_state)
end

defp do_change_connection_state(ice_agent, new_conn_state) do
Logger.debug("Connection state change: #{ice_agent.state} -> #{new_conn_state}")
notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state})
%__MODULE__{ice_agent | state: new_conn_state}
Expand Down Expand Up @@ -2135,6 +2271,8 @@ defmodule ExICE.Priv.ICEAgent do
%{ice_agent | ta_timer: timer}
end

defp disable_timer(%{ta_timer: nil} = ice_agent), do: ice_agent

defp disable_timer(ice_agent) do
Process.cancel_timer(ice_agent.ta_timer)

Expand Down
Loading

0 comments on commit 852c73a

Please sign in to comment.