diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 27956caa1..4e20b4791 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -68,23 +68,24 @@ defmodule Phoenix.Tracker.Shard do end @spec list(pid | atom, topic) :: [presence] - def list(server_pid, topic) do + def list(server_pid, topic) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_topic(topic) end - - @doc false - def dirty_list(shard_name, topic) do - State.tracked_values(shard_name, topic, []) + def list(shard_name, topic) when is_atom(shard_name) do + State.get_by_topic(shard_name, topic) end @spec get_by_key(pid | atom, topic, term) :: [presence] - def get_by_key(server_pid, topic, key) do + def get_by_key(server_pid, topic, key) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_key(topic, key) end + def get_by_key(shard_name, topic, key) when is_atom(shard_name) do + State.get_by_key(shard_name, topic, key) + end @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d2d0291d..7d44d4e4b 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -21,15 +21,15 @@ defmodule Phoenix.Tracker.State do @type pid_lookup :: {pid, topic, key} @type t :: %State{ - replica: name, - context: context, - clouds: clouds, - values: values, - pids: ets_id, - mode: :unset | :delta | :normal, - delta: :unset | delta, - replicas: %{name => :up | :down}, - range: {context, context} + replica: name, + context: context, + clouds: clouds, + values: values, + pids: ets_id, + mode: :unset | :delta | :normal, + delta: :unset | delta, + down_replicas: ets_id, + range: {context, context} } defstruct replica: nil, @@ -39,7 +39,7 @@ defmodule Phoenix.Tracker.State do pids: nil, mode: :unset, delta: :unset, - replicas: %{}, + down_replicas: nil, range: {%{}, %{}} @compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2} @@ -61,7 +61,7 @@ defmodule Phoenix.Tracker.State do mode: :normal, values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), - replicas: %{replica => :up}}) + down_replicas: :ets.new(down_replicas_table(shard_name), [:named_table, :protected, :bag])}) end @doc """ @@ -120,6 +120,9 @@ defmodule Phoenix.Tracker.State do def get_by_topic(%State{values: values} = state, topic) do tracked_values(values, topic, down_replicas(state)) end + def get_by_topic(shard_name, topic) do + tracked_values(shard_name, topic, down_replicas(shard_name)) + end @doc """ Returns a list of elements for the topic who belong to an online replica. @@ -131,6 +134,12 @@ defmodule Phoenix.Tracker.State do [_|_] = metas -> metas end end + def get_by_key(shard_name, topic, key) do + case tracked_key(shard_name, topic, key, down_replicas(shard_name)) do + [] -> [] + [_|_] = metas -> metas + end + end @doc """ Performs table lookup for tracked elements in the topic. @@ -393,18 +402,18 @@ defmodule Phoenix.Tracker.State do Marks a replica as up in the set and returns rejoined users. """ @spec replica_up(t, name) :: {t, joins :: [values], leaves :: []} - def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do - {%State{state | - context: Map.put_new(ctx, replica, 0), - replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []} + def replica_up(%State{down_replicas: down_replicas, context: ctx} = state, replica) do + :ets.delete_object(down_replicas, replica) + {%State{state | context: Map.put_new(ctx, replica, 0)}, replica_users(state, replica), []} end @doc """ Marks a replica as down in the set and returns left users. """ @spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]} - def replica_down(%State{replicas: replicas} = state, replica) do - {%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)} + def replica_down(%State{down_replicas: down_replicas} = state, replica) do + :ets.insert(down_replicas, replica) + {state, [], replica_users(state, replica)} end @doc """ @@ -556,9 +565,8 @@ defmodule Phoenix.Tracker.State do end @spec down_replicas(t) :: [name] - defp down_replicas(%State{replicas: replicas}) do - for {replica, :down} <- replicas, do: replica - end + defp down_replicas(%State{down_replicas: down_replicas}), do: :ets.tab2list(down_replicas) + defp down_replicas(shard_name), do: :ets.tab2list(down_replicas_table(shard_name)) @spec replica_users(t, name) :: [value] defp replica_users(%State{values: values}, replica) do @@ -575,4 +583,8 @@ defmodule Phoenix.Tracker.State do defp foldl({objects, cont}, acc, func) do foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func) end + + defp down_replicas_table(shard_name) do + :"#{shard_name}.down_replicas" + end end diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 88473717d..b9e2ddac4 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -42,6 +42,9 @@ defmodule Phoenix.Tracker.ShardReplicationTest do # node1 fulfills tranfer request and sends transfer_ack to primary assert_transfer_ack ref, from: @node1 assert_heartbeat to: @node1, from: @primary + + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}] = list(shard, topic) end @@ -87,6 +90,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_heartbeat from: @node1 assert_heartbeat from: @node2 + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic) end @@ -237,7 +242,6 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_join ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) # nodedown Process.unlink(node_pid) @@ -245,13 +249,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_leave ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) - - :timer.sleep(@permdown + 2*@heartbeat) - assert [{"local1", _}] = dirty_list(shard, topic) end - test "untrack with no tracked topic is a noop", %{shard: shard, topic: topic} do assert Shard.untrack(shard, self(), topic, "foo") == :ok @@ -382,8 +381,4 @@ defmodule Phoenix.Tracker.ShardReplicationTest do defp list(shard, topic) do Enum.sort(Shard.list(shard, topic)) end - - defp dirty_list(shard, topic) do - Enum.sort(Shard.dirty_list(shard, topic)) - end end