From 1c9ab71e871796bc03a491bb20b2a9ff7d3bb7eb Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Tue, 14 Mar 2017 15:40:35 +0000 Subject: [PATCH] Tracker consists of shard-pool; exposes dirty_list operation --- lib/phoenix/tracker.ex | 542 +++--------------- lib/phoenix/tracker/shard.ex | 497 ++++++++++++++++ lib/phoenix/tracker/state.ex | 25 +- test/phoenix/pubsub/pg2_test.exs | 18 +- .../phoenix/tracker/delta_generation_test.exs | 24 +- test/phoenix/tracker/integration_test.exs | 390 ------------- test/phoenix/tracker/pool_test.exs | 133 +++++ .../tracker/shard_replication_test.exs | 389 +++++++++++++ test/phoenix/tracker/state_test.exs | 85 ++- test/phoenix/tracker_test.exs | 8 +- test/support/node_case.ex | 59 +- 11 files changed, 1231 insertions(+), 939 deletions(-) create mode 100644 lib/phoenix/tracker/shard.ex delete mode 100644 test/phoenix/tracker/integration_test.exs create mode 100644 test/phoenix/tracker/pool_test.exs create mode 100644 test/phoenix/tracker/shard_replication_test.exs diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 92daca8a0..f4694fb09 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -2,42 +2,48 @@ defmodule Phoenix.Tracker do @moduledoc ~S""" Provides distributed Presence tracking to processes. - Tracker servers use a heartbeat protocol and CRDT to replicate presence + The `Tracker` API is used as a facade for a pool of `Phoenix.Tracker.Shard`s. + The responsibility of which calls go to which `Shard` is determined based on + the topic, on which a given function is called. + + Tracker shards use a heartbeat protocol and CRDT to replicate presence information across a cluster in an eventually consistent, conflict-free manner. Under this design, there is no single source of truth or global - process. Instead, each node runs one or more `Phoenix.Tracker` servers and - node-local changes are replicated across the cluster and handled locally as - a diff of changes. + process. Each node runs a pool of `Phoenix.Tracker.Shard`s and node-local + changes are replicated across the cluster and handled locally as a diff of + changes. * `tracker` - The name of the tracker handler module implementing the `Phoenix.Tracker` behaviour * `tracker_opts` - The list of options to pass to the tracker handler - * `server_opts` - The list of options to pass to the tracker server + * `pool_opts` - The list of options used to construct the shard pool - ## Required `server_opts`: + ## Required `pool_opts`: * `:name` - The name of the server, such as: `MyApp.Tracker` + This will also form the common prefix for all shard names * `:pubsub_server` - The name of the PubSub server, such as: `MyApp.PubSub` - ## Optional `server_opts`: + ## Optional `pool_opts`: - * `broadcast_period` - The interval in milliseconds to send delta broadcats + * `:broadcast_period` - The interval in milliseconds to send delta broadcats across the cluster. Default `1500` - * `max_silent_periods` - The max integer of broadcast periods for which no + * `:max_silent_periods` - The max integer of broadcast periods for which no delta broadcasts have been sent. Defaults `10` (15s heartbeat) - * `down_period` - The interval in milliseconds to flag a replica + * `:down_period` - The interval in milliseconds to flag a replica as down temporarily down. Default `broadcast_period * max_silent_periods * 2` (30s down detection). Note: This must be at least 2x the `broadcast_period`. - * `permdown_period` - The interval in milliseconds to flag a replica + * `:permdown_period` - The interval in milliseconds to flag a replica as permanently down, and discard its state. Note: This must be at least greater than the `down_period`. Default `1_200_000` (20 minutes) - * `clock_sample_periods` - The numbers of heartbeat windows to sample + * `:clock_sample_periods` - The numbers of heartbeat windows to sample remote clocks before collapsing and requesting transfer. Default `2` - * `max_delta_sizes` - The list of delta generation sizes to keep before + * `:max_delta_sizes` - The list of delta generation sizes to keep before falling back to sending entire state. Defaults `[100, 1000, 10_000]`. - * log_level - The log level to log events, defaults `:debug` and can be + * `:log_level` - The log level to log events, defaults `:debug` and can be disabled with `false` + * `:pool_size` - The number of tracker shards to launch. Default `1` ## Implementing a Tracker @@ -95,8 +101,9 @@ defmodule Phoenix.Tracker do crash the tracker server, so operations that may crash the server should be offloaded with a `Task.Supervisor` spawned process. """ - use GenServer - alias Phoenix.Tracker.{Clock, State, Replica, DeltaGeneration} + use Supervisor + import Supervisor.Spec + alias Phoenix.Tracker.Shard require Logger @type presence :: {key :: String.t, meta :: Map.t} @@ -128,8 +135,9 @@ defmodule Phoenix.Tracker do {:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}} """ @spec track(atom, pid, topic, term, Map.t) :: {:ok, ref :: binary} | {:error, reason :: term} - def track(server_name, pid, topic, key, meta) when is_pid(pid) and is_map(meta) do - GenServer.call(server_name, {:track, pid, topic, key, meta}) + def track(tracker_name, pid, topic, key, meta) when is_pid(pid) and is_map(meta) do + Shard.name_for_topic(tracker_name, topic, pool_size(tracker_name)) + |> GenServer.call({:track, pid, topic, key, meta}) end @doc """ @@ -151,11 +159,16 @@ defmodule Phoenix.Tracker do :ok """ @spec untrack(atom, pid, topic, term) :: :ok - def untrack(server_name, pid, topic, key) when is_pid(pid) do - GenServer.call(server_name, {:untrack, pid, topic, key}) - end - def untrack(server_name, pid) when is_pid(pid) do - GenServer.call(server_name, {:untrack, pid}) + def untrack(tracker_name, pid, topic, key) when is_pid(pid) do + Shard.name_for_topic(tracker_name, topic, pool_size(tracker_name)) + |> GenServer.call({:untrack, pid, topic, key}) + end + def untrack(tracker_name, pid) when is_pid(pid) do + for shard_number <- 0..(pool_size(tracker_name)-1) do + Shard.name_for_number(tracker_name, shard_number) + |> GenServer.call({:untrack, pid}) + end + :ok end @doc """ @@ -175,8 +188,9 @@ defmodule Phoenix.Tracker do {:ok, "1WpAofWYIAA="} """ @spec update(atom, pid, topic, term, Map.t | (Map.t -> Map.t)) :: {:ok, ref :: binary} | {:error, reason :: term} - def update(server_name, pid, topic, key, meta) when is_pid(pid) and (is_map(meta) or is_function(meta)) do - GenServer.call(server_name, {:update, pid, topic, key, meta}) + def update(tracker_name, pid, topic, key, meta) when is_pid(pid) and (is_map(meta) or is_function(meta)) do + Shard.name_for_topic(tracker_name, topic, pool_size(tracker_name)) + |> GenServer.call({:update, pid, topic, key, meta}) end @doc """ @@ -193,456 +207,78 @@ defmodule Phoenix.Tracker do [{123, %{name: "user 123"}}, {456, %{name: "user 456"}}] """ @spec list(atom, topic) :: [presence] - def list(server_name, topic) do - # TODO avoid extra map (ideally crdt does an ets select only returning {key, meta}) - server_name - |> GenServer.call({:list, topic}) - |> State.get_by_topic(topic) - |> Enum.map(fn {{_topic, _pid, key}, meta, _tag} -> {key, meta} end) + def list(tracker_name, topic) do + Shard.name_for_topic(tracker_name, topic, pool_size(tracker_name)) + |> Phoenix.Tracker.Shard.list(topic) end @doc """ - Gracefully shuts down by broadcasting permdown to all replicas. - - ## Examples - - iex> Phoenix.Tracker.graceful_permdown(MyTracker) - :ok - """ - @spec graceful_permdown(atom) :: :ok - def graceful_permdown(server_name) do - GenServer.call(server_name, :graceful_permdown) - end - - - ## Server - - def start_link(tracker, tracker_opts, server_opts) do - name = Keyword.fetch!(server_opts, :name) - GenServer.start_link(__MODULE__, [tracker, tracker_opts, server_opts], name: name) - end - - def init([tracker, tracker_opts, opts]) do - Process.flag(:trap_exit, true) - pubsub_server = Keyword.fetch!(opts, :pubsub_server) - server_name = Keyword.fetch!(opts, :name) - broadcast_period = opts[:broadcast_period] || 1500 - max_silent_periods = opts[:max_silent_periods] || 10 - down_period = opts[:down_period] || (broadcast_period * max_silent_periods * 2) - permdown_period = opts[:permdown_period] || 1_200_000 - clock_sample_periods = opts[:clock_sample_periods] || 2 - log_level = Keyword.get(opts, :log_level, false) - max_delta_sizes = opts[:max_delta_sizes] || [100, 1000, 10_000] - - with :ok <- validate_down_period(down_period, broadcast_period), - :ok <- validate_permdown_period(permdown_period, down_period), - {:ok, tracker_state} <- tracker.init(tracker_opts) do - - node_name = Phoenix.PubSub.node_name(pubsub_server) - namespaced_topic = namespaced_topic(server_name) - replica = Replica.new(node_name) - - subscribe(pubsub_server, namespaced_topic) - send_stuttered_heartbeat(self(), broadcast_period) - - {:ok, %{server_name: server_name, - pubsub_server: pubsub_server, - tracker: tracker, - tracker_state: tracker_state, - replica: replica, - report_events_to: opts[:report_events_to], - namespaced_topic: namespaced_topic, - log_level: log_level, - replicas: %{}, - pending_clockset: [], - presences: State.new(Replica.ref(replica)), - broadcast_period: broadcast_period, - max_silent_periods: max_silent_periods, - silent_periods: max_silent_periods, - down_period: down_period, - permdown_period: permdown_period, - clock_sample_periods: clock_sample_periods, - deltas: [], - max_delta_sizes: max_delta_sizes, - current_sample_count: clock_sample_periods}} - end - end - - def validate_down_period(d_period, b_period) when d_period < (2 * b_period) do - {:error, "down_period must be at least twice as large as the broadcast_period"} - end - def validate_down_period(_d_period, _b_period), do: :ok - - def validate_permdown_period(p_period, d_period) when p_period <= d_period do - {:error, "permdown_period must be at least larger than the down_period"} - end - def validate_permdown_period(_p_period, _d_period), do: :ok - - - defp send_stuttered_heartbeat(pid, interval) do - Process.send_after(pid, :heartbeat, Enum.random(0..trunc(interval * 0.25))) - end - - def handle_info(:heartbeat, state) do - {:noreply, state - |> broadcast_delta_heartbeat() - |> request_transfer_from_replicas_needing_synced() - |> detect_downs() - |> schedule_next_heartbeat()} - end - - def handle_info({:pub, :heartbeat, {name, vsn}, :empty, clocks}, state) do - {:noreply, state - |> put_pending_clock(clocks) - |> handle_heartbeat({name, vsn})} - end - def handle_info({:pub, :heartbeat, {name, vsn}, delta, clocks}, state) do - state = handle_heartbeat(state, {name, vsn}) - {presences, joined, left} = State.merge(state.presences, delta) - - {:noreply, state - |> report_diff(joined, left) - |> put_presences(presences) - |> put_pending_clock(clocks) - |> push_delta_generation(delta)} - end - - def handle_info({:pub, :transfer_req, ref, {name, _vsn}, {_, clocks}}, state) do - log state, fn -> "#{state.replica.name}: transfer_req from #{inspect name}" end - delta = DeltaGeneration.extract(state.presences, state.deltas, name, clocks) - msg = {:pub, :transfer_ack, ref, Replica.ref(state.replica), delta} - direct_broadcast(state, name, msg) - - {:noreply, state} - end - - def handle_info({:pub, :transfer_ack, _ref, {name, _vsn}, remote_presences}, state) do - log(state, fn -> "#{state.replica.name}: transfer_ack from #{inspect name}" end) - {presences, joined, left} = State.merge(state.presences, remote_presences) - - {:noreply, state - |> report_diff(joined, left) - |> push_delta_generation(remote_presences) - |> put_presences(presences)} - end - - def handle_info({:pub, :graceful_permdown, {_name, _vsn} = ref}, state) do - case Replica.fetch_by_ref(state.replicas, ref) do - {:ok, replica} -> {:noreply, state |> down(replica) |> permdown(replica)} - :error -> {:noreply, state} - end - end - - def handle_info({:EXIT, pid, _reason}, state) do - {:noreply, drop_presence(state, pid)} - end - - def handle_call(:values, _from, state) do - {:reply, :ets.match(state.presences.values, :"$1"), state} - end - - def handle_call({:track, pid, topic, key, meta}, _from, state) do - case State.get_by_pid(state.presences, pid, topic, key) do - nil -> - {state, ref} = put_presence(state, pid, topic, key, meta) - {:reply, {:ok, ref}, state} - _ -> - {:reply, {:error, {:already_tracked, pid, topic, key}}, state} - end - end - - def handle_call({:untrack, pid, topic, key}, _from, state) do - new_state = drop_presence(state, pid, topic, key) - if State.get_by_pid(new_state.presences, pid) == [] do - Process.unlink(pid) - end - {:reply, :ok, new_state} - end - - def handle_call({:untrack, pid}, _from, state) do - Process.unlink(pid) - {:reply, :ok, drop_presence(state, pid)} - end - - def handle_call({:update, pid, topic, key, meta_updater}, _from, state) when is_function(meta_updater) do - handle_update({pid, topic, key, meta_updater}, state) - end - - def handle_call({:update, pid, topic, key, new_meta}, _from, state) do - handle_update({pid, topic, key, fn _ -> new_meta end}, state) - end - - def handle_call(:graceful_permdown, _from, state) do - broadcast_from(state, self(), {:pub, :graceful_permdown, Replica.ref(state.replica)}) - {:stop, :normal, :ok, state} - end - - def handle_call({:list, _topic}, _from, state) do - {:reply, state.presences, state} - end - - def handle_call(:replicas, _from, state) do - {:reply, state.replicas, state} - end - - def handle_call(:unsubscribe, _from, state) do - Phoenix.PubSub.unsubscribe(state.pubsub_server, state.namespaced_topic) - {:reply, :ok, state} - end - - def handle_call(:resubscribe, _from, state) do - subscribe(state.pubsub_server, state.namespaced_topic) - {:reply, :ok, state} - end - - defp subscribe(pubsub_server, namespaced_topic) do - Phoenix.PubSub.subscribe(pubsub_server, namespaced_topic, link: true) - end - - defp put_update(state, pid, topic, key, meta, %{phx_ref: ref} = prev_meta) do - state - |> put_presences(State.leave(state.presences, pid, topic, key)) - |> put_presence(pid, topic, key, Map.put(meta, :phx_ref_prev, ref), prev_meta) - end - defp put_presence(state, pid, topic, key, meta, prev_meta \\ nil) do - Process.link(pid) - ref = random_ref() - meta = Map.put(meta, :phx_ref, ref) - new_state = - state - |> report_diff_join(topic, key, meta, prev_meta) - |> put_presences(State.join(state.presences, pid, topic, key, meta)) - - {new_state, ref} - end - - defp put_presences(state, %State{} = presences), do: %{state | presences: presences} - - defp drop_presence(state, pid, topic, key) do - if leave = State.get_by_pid(state.presences, pid, topic, key) do - state - |> report_diff([], [leave]) - |> put_presences(State.leave(state.presences, pid, topic, key)) - else - state - end - end - defp drop_presence(state, pid) do - leaves = State.get_by_pid(state.presences, pid) - - state - |> report_diff([], leaves) - |> put_presences(State.leave(state.presences, pid)) - end - - defp handle_heartbeat(state, {name, vsn}) do - case Replica.put_heartbeat(state.replicas, {name, vsn}) do - {replicas, nil, %Replica{status: :up} = upped} -> - up(%{state | replicas: replicas}, upped) - - {replicas, %Replica{vsn: ^vsn, status: :up}, %Replica{vsn: ^vsn, status: :up}} -> - %{state | replicas: replicas} - - {replicas, %Replica{vsn: ^vsn, status: :down}, %Replica{vsn: ^vsn, status: :up} = upped} -> - up(%{state | replicas: replicas}, upped) - - {replicas, %Replica{vsn: old, status: :up} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> - %{state | replicas: replicas} |> down(downed) |> permdown(downed) |> up(upped) - - {replicas, %Replica{vsn: old, status: :down} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> - %{state | replicas: replicas} |> permdown(downed) |> up(upped) - end - end - - defp request_transfer_from_replicas_needing_synced(%{current_sample_count: 1} = state) do - needs_synced = clockset_to_sync(state) - for replica <- needs_synced, do: request_transfer(state, replica) - - %{state | pending_clockset: [], current_sample_count: state.clock_sample_periods} - end - defp request_transfer_from_replicas_needing_synced(state) do - %{state | current_sample_count: state.current_sample_count - 1} - end - - defp request_transfer(state, {name, _vsn}) do - log state, fn -> "#{state.replica.name}: transfer_req from #{name}" end - ref = make_ref() - msg = {:pub, :transfer_req, ref, Replica.ref(state.replica), clock(state)} - direct_broadcast(state, name, msg) - end - - defp detect_downs(%{permdown_period: perm_int, down_period: temp_int} = state) do - Enum.reduce(state.replicas, state, fn {_name, replica}, acc -> - case Replica.detect_down(acc.replicas, replica, temp_int, perm_int) do - {replicas, %Replica{status: :up}, %Replica{status: :permdown} = down_rep} -> - %{acc | replicas: replicas} |> down(down_rep) |> permdown(down_rep) - - {replicas, %Replica{status: :down}, %Replica{status: :permdown} = down_rep} -> - permdown(%{acc | replicas: replicas}, down_rep) - - {replicas, %Replica{status: :up}, %Replica{status: :down} = down_rep} -> - down(%{acc | replicas: replicas}, down_rep) - - {replicas, %Replica{status: unchanged}, %Replica{status: unchanged}} -> - %{acc | replicas: replicas} - end - end) - end - - defp schedule_next_heartbeat(state) do - Process.send_after(self(), :heartbeat, state.broadcast_period) - state - end - - defp clock(state), do: State.clocks(state.presences) - - @spec clockset_to_sync(%{pending_clockset: [State.replica_context]}) :: [State.replica_name] - defp clockset_to_sync(state) do - my_ref = Replica.ref(state.replica) - - state.pending_clockset - |> Clock.append_clock(clock(state)) - |> Clock.clockset_replicas() - |> Enum.filter(fn ref -> ref != my_ref end) - end - - defp put_pending_clock(state, clocks) do - %{state | pending_clockset: Clock.append_clock(state.pending_clockset, clocks)} - end + Lists all presences tracked under a given topic, by directly consulting + the underlying tracker_state's ets table. - defp up(state, remote_replica) do - report_event(state, {:replica_up, remote_replica.name}) - log state, fn -> "#{state.replica.name}: replica up from #{inspect remote_replica.name}" end - {presences, joined, []} = State.replica_up(state.presences, Replica.ref(remote_replica)) + This function may return records from replicas that are temporarily + down, but is guaranteed not to return records from permdown replicas. - state - |> report_diff(joined, []) - |> put_presences(presences) - end - - defp down(state, remote_replica) do - report_event(state, {:replica_down, remote_replica.name}) - log state, fn -> "#{state.replica.name}: replica down from #{inspect remote_replica.name}" end - {presences, [], left} = State.replica_down(state.presences, Replica.ref(remote_replica)) - - state - |> report_diff([], left) - |> put_presences(presences) - end - - defp permdown(state, %Replica{name: name} = remote_replica) do - report_event(state, {:replica_permdown, name}) - log state, fn -> "#{state.replica.name}: permanent replica down detected #{name}" end - replica_ref = Replica.ref(remote_replica) - presences = State.remove_down_replicas(state.presences, replica_ref) - deltas = DeltaGeneration.remove_down_replicas(state.deltas, replica_ref) - - case Replica.fetch_by_ref(state.replicas, replica_ref) do - {:ok, _replica} -> - replicas = Map.delete(state.replicas, name) - %{state | presences: presences, replicas: replicas, deltas: deltas} - _ -> - %{state | presences: presences, deltas: deltas} - end - end - - defp report_event(%{report_events_to: nil}, _event), do: :ok - defp report_event(%{report_events_to: pid} = state, event) do - send(pid, {event, state.replica.name}) - end + * `server_name` - The registered name of the tracker server + * `topic` - The `Phoenix.PubSub` topic to update for this presence - defp namespaced_topic(server_name) do - "phx_presence:#{server_name}" - end + Returns a lists of presences in key/metadata tuple pairs. - defp broadcast_from(state, from, msg) do - Phoenix.PubSub.broadcast_from!(state.pubsub_server, from, state.namespaced_topic, msg) - end + ## Examples - defp direct_broadcast(state, target_node, msg) do - Phoenix.PubSub.direct_broadcast!(target_node, state.pubsub_server, state.namespaced_topic, msg) + iex> Phoenix.Tracker.list(MyTracker, "lobby") + [{123, %{name: "user 123"}}, {456, %{name: "user 456"}}] + """ + @spec dirty_list(atom, topic) :: [presence] + def dirty_list(tracker_name, topic) do + Shard.name_for_topic(tracker_name, topic, pool_size(tracker_name)) + |> Shard.dirty_list(topic) end - defp broadcast_delta_heartbeat(%{presences: presences} = state) do - cond do - State.has_delta?(presences) -> - delta = presences.delta - new_presences = presences |> State.reset_delta() |> State.compact() - - broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), delta, clock(state)}) - %{state | presences: new_presences, silent_periods: 0} - |> push_delta_generation(delta) + @doc """ + Gracefully shuts down by broadcasting permdown to all replicas. - state.silent_periods >= state.max_silent_periods -> - broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), :empty, clock(state)}) - %{state | silent_periods: 0} + ## Examples - true -> update_in(state.silent_periods, &(&1 + 1)) + iex> Phoenix.Tracker.graceful_permdown(MyTracker) + :ok + """ + @spec graceful_permdown(atom) :: :ok + def graceful_permdown(tracker_name) do + for shard_number <- 0..(pool_size(tracker_name)-1) do + Shard.name_for_number(tracker_name, shard_number) + |> GenServer.call(:graceful_permdown) end + :ok end - defp report_diff(state, [], []), do: state - defp report_diff(state, joined, left) do - join_diff = Enum.reduce(joined, %{}, fn {{topic, _pid, key}, meta, _}, acc -> - Map.update(acc, topic, {[{key, meta}], []}, fn {joins, leaves} -> - {[{key, meta} | joins], leaves} - end) - end) - full_diff = Enum.reduce(left, join_diff, fn {{topic, _pid, key}, meta, _}, acc -> - Map.update(acc, topic, {[], [{key, meta}]}, fn {joins, leaves} -> - {joins, [{key, meta} | leaves]} - end) - end) - - full_diff - |> state.tracker.handle_diff(state.tracker_state) - |> handle_tracker_result(state) + def start_link(tracker, tracker_opts, pool_opts) do + name = Keyword.fetch!(pool_opts, :name) + Supervisor.start_link(__MODULE__, + [tracker, tracker_opts, pool_opts, name], + name: name) end - defp report_diff_join(state, topic, key, meta, nil = _prev_meta) do - %{topic => {[{key, meta}], []}} - |> state.tracker.handle_diff(state.tracker_state) - |> handle_tracker_result(state) - end - defp report_diff_join(state, topic, key, meta, prev_meta) do - %{topic => {[{key, meta}], [{key, prev_meta}]}} - |> state.tracker.handle_diff(state.tracker_state) - |> handle_tracker_result(state) - end - - defp handle_tracker_result({:ok, tracker_state}, state) do - %{state | tracker_state: tracker_state} - end - defp handle_tracker_result(other, state) do - raise ArgumentError, """ - expected #{state.tracker} to return {:ok, state}, but got: + def init([tracker, tracker_opts, opts, name]) do + pool_size = Keyword.get(opts, :pool_size, 1) + ^name = :ets.new(name, [:set, :named_table, read_concurrency: true]) + true = :ets.insert(name, {:pool_size, pool_size}) - #{inspect other} - """ - end - - defp handle_update({pid, topic, key, meta_updater}, state) do - case State.get_by_pid(state.presences, pid, topic, key) do - nil -> - {:reply, {:error, :nopresence}, state} - {{_topic, _pid, ^key}, prev_meta, {_replica, _}} -> - {state, ref} = put_update(state, pid, topic, key, meta_updater.(prev_meta), prev_meta) - {:reply, {:ok, ref}, state} + shards = for n <- 0..(pool_size-1) do + shard_name = Shard.name_for_number(name, n) + shard_opts = Keyword.put(opts, :shard_number, n) + worker(Phoenix.Tracker.Shard, [tracker, tracker_opts, shard_opts], + id: shard_name) end - end - defp push_delta_generation(state, {%State{mode: :normal}, _}) do - %{state | deltas: []} - end - defp push_delta_generation(%{deltas: deltas} = state, %State{mode: :delta} = delta) do - new_deltas = DeltaGeneration.push(state.presences, deltas, delta, state.max_delta_sizes) - %{state | deltas: new_deltas} + supervise(shards, strategy: :one_for_one, + max_restarts: pool_size * 2, + max_seconds: 1) end - defp random_ref() do - :crypto.strong_rand_bytes(8) |> Base.encode64() + defp pool_size(tracker_name) do + [{:pool_size, n}] = :ets.lookup(tracker_name, :pool_size) + n end - defp log(%{log_level: false}, _msg_func), do: :ok - defp log(%{log_level: level}, msg), do: Logger.log(level, msg) end diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex new file mode 100644 index 000000000..e26cc91f6 --- /dev/null +++ b/lib/phoenix/tracker/shard.ex @@ -0,0 +1,497 @@ +defmodule Phoenix.Tracker.Shard do + @moduledoc """ + Keeps track of presences in a single shard + """ + use GenServer + alias Phoenix.Tracker.{Clock, State, Replica, DeltaGeneration} + require Logger + + @type presence :: {key :: String.t, meta :: Map.t} + @type topic :: String.t + + @callback init(Keyword.t) :: {:ok, pid} | {:error, reason :: term} + @callback handle_diff(%{topic => {joins :: [presence], leaves :: [presence]}}, state :: term) :: {:ok, state :: term} + + + ## Used by Phoenix.Tracker for dispatching to appropriate shard + @spec name_for_number(atom, non_neg_integer) :: atom + def name_for_number(prefix, n) when is_number(n) do + :"#{prefix}_shard#{n}" + end + + @spec name_for_topic(atom, topic, non_neg_integer) :: atom + def name_for_topic(prefix, topic, pool_size) do + shard_number = :erlang.phash2(topic, pool_size) + name_for_number(prefix, shard_number) + end + + ## Client + + @spec track(pid, pid, topic, term, Map.t) :: {:ok, ref :: binary} | {:error, reason :: term} + def track(server_pid, pid, topic, key, meta) when is_pid(pid) and is_map(meta) do + GenServer.call(server_pid, {:track, pid, topic, key, meta}) + end + + @spec untrack(pid, pid, topic, term) :: :ok + def untrack(server_pid, pid, topic, key) when is_pid(pid) do + GenServer.call(server_pid, {:untrack, pid, topic, key}) + end + def untrack(server_pid, pid) when is_pid(pid) do + GenServer.call(server_pid, {:untrack, pid}) + end + + @spec update(pid, pid, topic, term, Map.t | (Map.t -> Map.t)) :: {:ok, ref :: binary} | {:error, reason :: term} + def update(server_pid, pid, topic, key, meta) when is_pid(pid) and (is_map(meta) or is_function(meta)) do + GenServer.call(server_pid, {:update, pid, topic, key, meta}) + end + + @spec list(pid, topic) :: [presence] + def list(server_pid, topic) do + GenServer.call(server_pid, {:list, topic}) + |> State.get_by_topic(topic) + end + def dirty_list(shard_name, topic) do + State.tracked_values(shard_name, topic, []) + end + + @spec graceful_permdown(pid) :: :ok + def graceful_permdown(server_pid) do + GenServer.call(server_pid, :graceful_permdown) + end + + ## Server + + def start_link(tracker, tracker_opts, pool_opts) do + number = Keyword.fetch!(pool_opts, :shard_number) + tracker_name = Keyword.fetch!(pool_opts, :name) + name = name_for_number(tracker_name, number) + shard_opts = Keyword.put(pool_opts, :name, name) + GenServer.start_link(__MODULE__, + [tracker, tracker_opts, shard_opts], name: name) + end + + def init([tracker, tracker_opts, shard_opts]) do + Process.flag(:trap_exit, true) + shard_name = Keyword.fetch!(shard_opts, :name) + pubsub_server = Keyword.fetch!(shard_opts, :pubsub_server) + broadcast_period = shard_opts[:broadcast_period] || 1500 + max_silent_periods = shard_opts[:max_silent_periods] || 10 + down_period = shard_opts[:down_period] + || (broadcast_period * max_silent_periods * 2) + permdown_period = shard_opts[:permdown_period] || 1_200_000 + clock_sample_periods = shard_opts[:clock_sample_periods] || 2 + log_level = Keyword.get(shard_opts, :log_level, false) + max_delta_sizes = shard_opts[:max_delta_sizes] || [100, 1000, 10_000] + + with :ok <- validate_down_period(down_period, broadcast_period), + :ok <- validate_permdown_period(permdown_period, down_period), + {:ok, tracker_state} <- tracker.init(tracker_opts) do + + node_name = Phoenix.PubSub.node_name(pubsub_server) + namespaced_topic = namespaced_topic(shard_name) + replica = Replica.new(node_name) + + subscribe(pubsub_server, namespaced_topic) + send_stuttered_heartbeat(self(), broadcast_period) + + {:ok, %{shard_name: shard_name, + pubsub_server: pubsub_server, + tracker: tracker, + tracker_state: tracker_state, + replica: replica, + report_events_to: shard_opts[:report_events_to], + namespaced_topic: namespaced_topic, + log_level: log_level, + replicas: %{}, + pending_clockset: [], + presences: State.new(Replica.ref(replica), shard_name), + broadcast_period: broadcast_period, + max_silent_periods: max_silent_periods, + silent_periods: max_silent_periods, + down_period: down_period, + permdown_period: permdown_period, + clock_sample_periods: clock_sample_periods, + deltas: [], + max_delta_sizes: max_delta_sizes, + current_sample_count: clock_sample_periods}} + end + end + + def validate_down_period(d_period, b_period) when d_period < (2 * b_period) do + {:error, "down_period must be at least twice as large as the broadcast_period"} + end + def validate_down_period(_d_period, _b_period), do: :ok + + def validate_permdown_period(p_period, d_period) when p_period <= d_period do + {:error, "permdown_period must be at least larger than the down_period"} + end + def validate_permdown_period(_p_period, _d_period), do: :ok + + + defp send_stuttered_heartbeat(pid, interval) do + Process.send_after(pid, :heartbeat, Enum.random(0..trunc(interval * 0.25))) + end + + def handle_info(:heartbeat, state) do + {:noreply, state + |> broadcast_delta_heartbeat() + |> request_transfer_from_replicas_needing_synced() + |> detect_downs() + |> schedule_next_heartbeat()} + end + + def handle_info({:pub, :heartbeat, {name, vsn}, :empty, clocks}, state) do + {:noreply, state + |> put_pending_clock(clocks) + |> handle_heartbeat({name, vsn})} + end + def handle_info({:pub, :heartbeat, {name, vsn}, delta, clocks}, state) do + state = handle_heartbeat(state, {name, vsn}) + {presences, joined, left} = State.merge(state.presences, delta) + + {:noreply, state + |> report_diff(joined, left) + |> put_presences(presences) + |> put_pending_clock(clocks) + |> push_delta_generation(delta)} + end + + def handle_info({:pub, :transfer_req, ref, {name, _vsn}, {_, clocks}}, state) do + log state, fn -> "#{state.replica.name}: transfer_req from #{inspect name}" end + delta = DeltaGeneration.extract(state.presences, state.deltas, name, clocks) + msg = {:pub, :transfer_ack, ref, Replica.ref(state.replica), delta} + direct_broadcast(state, name, msg) + + {:noreply, state} + end + + def handle_info({:pub, :transfer_ack, _ref, {name, _vsn}, remote_presences}, state) do + log(state, fn -> "#{state.replica.name}: transfer_ack from #{inspect name}" end) + {presences, joined, left} = State.merge(state.presences, remote_presences) + + {:noreply, state + |> report_diff(joined, left) + |> push_delta_generation(remote_presences) + |> put_presences(presences)} + end + + def handle_info({:pub, :graceful_permdown, {_name, _vsn} = ref}, state) do + case Replica.fetch_by_ref(state.replicas, ref) do + {:ok, replica} -> {:noreply, state |> down(replica) |> permdown(replica)} + :error -> {:noreply, state} + end + end + + def handle_info({:EXIT, pid, _reason}, state) do + {:noreply, drop_presence(state, pid)} + end + + def handle_call(:values, _from, state) do + {:reply, :ets.match(state.presences.values, :"$1"), state} + end + + def handle_call({:track, pid, topic, key, meta}, _from, state) do + case State.get_by_pid(state.presences, pid, topic, key) do + nil -> + {state, ref} = put_presence(state, pid, topic, key, meta) + {:reply, {:ok, ref}, state} + _ -> + {:reply, {:error, {:already_tracked, pid, topic, key}}, state} + end + end + + def handle_call({:untrack, pid, topic, key}, _from, state) do + new_state = drop_presence(state, pid, topic, key) + if State.get_by_pid(new_state.presences, pid) == [] do + Process.unlink(pid) + end + {:reply, :ok, new_state} + end + + def handle_call({:untrack, pid}, _from, state) do + Process.unlink(pid) + {:reply, :ok, drop_presence(state, pid)} + end + + def handle_call({:update, pid, topic, key, meta_updater}, _from, state) when is_function(meta_updater) do + handle_update({pid, topic, key, meta_updater}, state) + end + + def handle_call({:update, pid, topic, key, new_meta}, _from, state) do + handle_update({pid, topic, key, fn _ -> new_meta end}, state) + end + + def handle_call(:graceful_permdown, _from, state) do + broadcast_from(state, self(), {:pub, :graceful_permdown, Replica.ref(state.replica)}) + {:stop, :normal, :ok, state} + end + + def handle_call({:list, _topic}, _from, state) do + {:reply, state.presences, state} + end + + def handle_call(:replicas, _from, state) do + {:reply, state.replicas, state} + end + + def handle_call(:unsubscribe, _from, state) do + Phoenix.PubSub.unsubscribe(state.pubsub_server, state.namespaced_topic) + {:reply, :ok, state} + end + + def handle_call(:resubscribe, _from, state) do + subscribe(state.pubsub_server, state.namespaced_topic) + {:reply, :ok, state} + end + + defp subscribe(pubsub_server, namespaced_topic) do + Phoenix.PubSub.subscribe(pubsub_server, namespaced_topic, link: true) + end + + defp put_update(state, pid, topic, key, meta, %{phx_ref: ref} = prev_meta) do + state + |> put_presences(State.leave(state.presences, pid, topic, key)) + |> put_presence(pid, topic, key, Map.put(meta, :phx_ref_prev, ref), prev_meta) + end + defp put_presence(state, pid, topic, key, meta, prev_meta \\ nil) do + Process.link(pid) + ref = random_ref() + meta = Map.put(meta, :phx_ref, ref) + new_state = + state + |> report_diff_join(topic, key, meta, prev_meta) + |> put_presences(State.join(state.presences, pid, topic, key, meta)) + + {new_state, ref} + end + + defp put_presences(state, %State{} = presences), do: %{state | presences: presences} + + defp drop_presence(state, pid, topic, key) do + if leave = State.get_by_pid(state.presences, pid, topic, key) do + state + |> report_diff([], [leave]) + |> put_presences(State.leave(state.presences, pid, topic, key)) + else + state + end + end + defp drop_presence(state, pid) do + leaves = State.get_by_pid(state.presences, pid) + + state + |> report_diff([], leaves) + |> put_presences(State.leave(state.presences, pid)) + end + + defp handle_heartbeat(state, {name, vsn}) do + case Replica.put_heartbeat(state.replicas, {name, vsn}) do + {replicas, nil, %Replica{status: :up} = upped} -> + up(%{state | replicas: replicas}, upped) + + {replicas, %Replica{vsn: ^vsn, status: :up}, %Replica{vsn: ^vsn, status: :up}} -> + %{state | replicas: replicas} + + {replicas, %Replica{vsn: ^vsn, status: :down}, %Replica{vsn: ^vsn, status: :up} = upped} -> + up(%{state | replicas: replicas}, upped) + + {replicas, %Replica{vsn: old, status: :up} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> + %{state | replicas: replicas} |> down(downed) |> permdown(downed) |> up(upped) + + {replicas, %Replica{vsn: old, status: :down} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> + %{state | replicas: replicas} |> permdown(downed) |> up(upped) + end + end + + defp request_transfer_from_replicas_needing_synced(%{current_sample_count: 1} = state) do + needs_synced = clockset_to_sync(state) + for replica <- needs_synced, do: request_transfer(state, replica) + + %{state | pending_clockset: [], current_sample_count: state.clock_sample_periods} + end + defp request_transfer_from_replicas_needing_synced(state) do + %{state | current_sample_count: state.current_sample_count - 1} + end + + defp request_transfer(state, {name, _vsn}) do + log state, fn -> "#{state.replica.name}: transfer_req from #{name}" end + ref = make_ref() + msg = {:pub, :transfer_req, ref, Replica.ref(state.replica), clock(state)} + direct_broadcast(state, name, msg) + end + + defp detect_downs(%{permdown_period: perm_int, down_period: temp_int} = state) do + Enum.reduce(state.replicas, state, fn {_name, replica}, acc -> + case Replica.detect_down(acc.replicas, replica, temp_int, perm_int) do + {replicas, %Replica{status: :up}, %Replica{status: :permdown} = down_rep} -> + %{acc | replicas: replicas} |> down(down_rep) |> permdown(down_rep) + + {replicas, %Replica{status: :down}, %Replica{status: :permdown} = down_rep} -> + permdown(%{acc | replicas: replicas}, down_rep) + + {replicas, %Replica{status: :up}, %Replica{status: :down} = down_rep} -> + down(%{acc | replicas: replicas}, down_rep) + + {replicas, %Replica{status: unchanged}, %Replica{status: unchanged}} -> + %{acc | replicas: replicas} + end + end) + end + + defp schedule_next_heartbeat(state) do + Process.send_after(self(), :heartbeat, state.broadcast_period) + state + end + + defp clock(state), do: State.clocks(state.presences) + + @spec clockset_to_sync(%{pending_clockset: [State.replica_context]}) :: [State.replica_name] + defp clockset_to_sync(state) do + my_ref = Replica.ref(state.replica) + + state.pending_clockset + |> Clock.append_clock(clock(state)) + |> Clock.clockset_replicas() + |> Enum.filter(fn ref -> ref != my_ref end) + end + + defp put_pending_clock(state, clocks) do + %{state | pending_clockset: Clock.append_clock(state.pending_clockset, clocks)} + end + + defp up(state, remote_replica) do + report_event(state, {:replica_up, remote_replica.name}) + log state, fn -> "#{state.replica.name}: replica up from #{inspect remote_replica.name}" end + {presences, joined, []} = State.replica_up(state.presences, Replica.ref(remote_replica)) + + state + |> report_diff(joined, []) + |> put_presences(presences) + end + + defp down(state, remote_replica) do + report_event(state, {:replica_down, remote_replica.name}) + log state, fn -> "#{state.replica.name}: replica down from #{inspect remote_replica.name}" end + {presences, [], left} = State.replica_down(state.presences, Replica.ref(remote_replica)) + + state + |> report_diff([], left) + |> put_presences(presences) + end + + defp permdown(state, %Replica{name: name} = remote_replica) do + report_event(state, {:replica_permdown, name}) + log state, fn -> "#{state.replica.name}: permanent replica down detected #{name}" end + replica_ref = Replica.ref(remote_replica) + presences = State.remove_down_replicas(state.presences, replica_ref) + deltas = DeltaGeneration.remove_down_replicas(state.deltas, replica_ref) + + case Replica.fetch_by_ref(state.replicas, replica_ref) do + {:ok, _replica} -> + replicas = Map.delete(state.replicas, name) + %{state | presences: presences, replicas: replicas, deltas: deltas} + _ -> + %{state | presences: presences, deltas: deltas} + end + end + + defp report_event(%{report_events_to: nil}, _event), do: :ok + defp report_event(%{report_events_to: pid} = state, event) do + send(pid, {event, state.replica.name}) + end + + defp namespaced_topic(shard_name) do + "phx_presence:#{shard_name}" + end + + defp broadcast_from(state, from, msg) do + Phoenix.PubSub.broadcast_from!(state.pubsub_server, from, state.namespaced_topic, msg) + end + + defp direct_broadcast(state, target_node, msg) do + Phoenix.PubSub.direct_broadcast!(target_node, state.pubsub_server, state.namespaced_topic, msg) + end + + defp broadcast_delta_heartbeat(%{presences: presences} = state) do + cond do + State.has_delta?(presences) -> + delta = presences.delta + new_presences = presences |> State.reset_delta() |> State.compact() + + broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), delta, clock(state)}) + %{state | presences: new_presences, silent_periods: 0} + |> push_delta_generation(delta) + + state.silent_periods >= state.max_silent_periods -> + broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), :empty, clock(state)}) + %{state | silent_periods: 0} + + true -> update_in(state.silent_periods, &(&1 + 1)) + end + end + + defp report_diff(state, [], []), do: state + defp report_diff(state, joined, left) do + join_diff = Enum.reduce(joined, %{}, fn {{topic, _pid, key}, meta, _}, acc -> + Map.update(acc, topic, {[{key, meta}], []}, fn {joins, leaves} -> + {[{key, meta} | joins], leaves} + end) + end) + full_diff = Enum.reduce(left, join_diff, fn {{topic, _pid, key}, meta, _}, acc -> + Map.update(acc, topic, {[], [{key, meta}]}, fn {joins, leaves} -> + {joins, [{key, meta} | leaves]} + end) + end) + + full_diff + |> state.tracker.handle_diff(state.tracker_state) + |> handle_tracker_result(state) + end + + defp report_diff_join(state, topic, key, meta, nil = _prev_meta) do + %{topic => {[{key, meta}], []}} + |> state.tracker.handle_diff(state.tracker_state) + |> handle_tracker_result(state) + end + defp report_diff_join(state, topic, key, meta, prev_meta) do + %{topic => {[{key, meta}], [{key, prev_meta}]}} + |> state.tracker.handle_diff(state.tracker_state) + |> handle_tracker_result(state) + end + + defp handle_tracker_result({:ok, tracker_state}, state) do + %{state | tracker_state: tracker_state} + end + defp handle_tracker_result(other, state) do + raise ArgumentError, """ + expected #{state.tracker} to return {:ok, state}, but got: + + #{inspect other} + """ + end + + defp handle_update({pid, topic, key, meta_updater}, state) do + case State.get_by_pid(state.presences, pid, topic, key) do + nil -> + {:reply, {:error, :nopresence}, state} + {{_topic, _pid, ^key}, prev_meta, {_replica, _}} -> + {state, ref} = put_update(state, pid, topic, key, meta_updater.(prev_meta), prev_meta) + {:reply, {:ok, ref}, state} + end + end + + defp push_delta_generation(state, {%State{mode: :normal}, _}) do + %{state | deltas: []} + end + defp push_delta_generation(%{deltas: deltas} = state, %State{mode: :delta} = delta) do + new_deltas = DeltaGeneration.push(state.presences, deltas, delta, state.max_delta_sizes) + %{state | deltas: new_deltas} + end + + defp random_ref() do + :crypto.strong_rand_bytes(8) |> Base.encode64() + end + + defp log(%{log_level: false}, _msg_func), do: :ok + defp log(%{log_level: level}, msg), do: Logger.log(level, msg) +end diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 0173134f1..5360a9f25 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -16,6 +16,7 @@ defmodule Phoenix.Tracker.State do @type context :: %{name => clock} @type values :: ets_id | :extracted | %{tag => {pid, topic, key, meta}} @type value :: {{topic, pid, key}, meta, tag} + @type key_meta :: {key, meta} @type delta :: %State{mode: :delta} @type pid_lookup :: {pid, topic, key} @@ -50,13 +51,13 @@ defmodule Phoenix.Tracker.State do %Phoenix.Tracker.State{...} """ - @spec new(name) :: t - def new(replica) do + @spec new(name, atom) :: t + def new(replica, shard_name) do reset_delta(%State{ replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(:values, [:ordered_set]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end @@ -113,12 +114,22 @@ defmodule Phoenix.Tracker.State do @doc """ Returns a list of elements for the topic who belong to an online replica. """ - @spec get_by_topic(t, topic) :: [value] + @spec get_by_topic(t, topic) :: [key_meta] def get_by_topic(%State{values: values} = state, topic) do - replicas = down_replicas(state) - :ets.select(values, [{ {{topic, :_, :_}, :_, {:"$1", :_}}, - not_in(:"$1", replicas), [:"$_"]}]) + tracked_values(values, topic, down_replicas(state)) end + + @doc """ + Performs table lookup for tracked elements in the topic, filtering out + those present on downed replicas. + """ + def tracked_values(table, topic, down_replicas) do + :ets.select(table, + [{{{topic, :_, :"$1"}, :"$2", {:"$3", :_}}, + not_in(:"$3", down_replicas), + [{{:"$1", :"$2"}}]}]) + end + defp not_in(_pos, []), do: [] defp not_in(pos, replicas), do: [not: ors(pos, replicas)] defp ors(pos, [rep]), do: {:"==", pos, {rep}} diff --git a/test/phoenix/pubsub/pg2_test.exs b/test/phoenix/pubsub/pg2_test.exs index cca2ec447..03e53b583 100644 --- a/test/phoenix/pubsub/pg2_test.exs +++ b/test/phoenix/pubsub/pg2_test.exs @@ -11,6 +11,8 @@ defmodule Phoenix.PubSub.PG2Test do @node1 :"node1@127.0.0.1" @node2 :"node2@127.0.0.1" + @receive_timeout 500 + setup config do size = config[:pool_size] || 1 if config[:pool_size] do @@ -29,15 +31,15 @@ defmodule Phoenix.PubSub.PG2Test do PubSub.subscribe(config.pubsub, config.topic) :ok = PubSub.direct_broadcast(@node1, config.pubsub, config.topic, :ping) - assert_receive {@node1, :ping} + assert_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast!(@node1, config.pubsub, config.topic, :ping) - assert_receive {@node1, :ping} + assert_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast(@node2, config.pubsub, config.topic, :ping) - refute_receive {@node1, :ping} + refute_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast!(@node2, config.pubsub, config.topic, :ping) - refute_receive {@node1, :ping} + refute_receive {@node1, :ping}, @receive_timeout end @tag pool_size: size, topic: topic @@ -46,15 +48,15 @@ defmodule Phoenix.PubSub.PG2Test do PubSub.subscribe(config.pubsub, config.topic) :ok = PubSub.direct_broadcast_from(@node1, config.pubsub, self(), config.topic, :ping) - assert_receive {@node1, :ping} + assert_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast_from!(@node1, config.pubsub, self(), config.topic, :ping) - assert_receive {@node1, :ping} + assert_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast_from(@node2, config.pubsub, self(), config.topic, :ping) - refute_receive {@node1, :ping} + refute_receive {@node1, :ping}, @receive_timeout :ok = PubSub.direct_broadcast_from!(@node2, config.pubsub, self(), config.topic, :ping) - refute_receive {@node1, :ping} + refute_receive {@node1, :ping}, @receive_timeout end end diff --git a/test/phoenix/tracker/delta_generation_test.exs b/test/phoenix/tracker/delta_generation_test.exs index 6c15e53e4..30894a47a 100644 --- a/test/phoenix/tracker/delta_generation_test.exs +++ b/test/phoenix/tracker/delta_generation_test.exs @@ -9,8 +9,8 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do |> Enum.sort() end - defp new(node) do - State.new(node) + defp new(node, config) do + State.new(node, :"#{node} #{config.test}") end defp new_pid() do @@ -23,9 +23,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do |> Enum.sort() end - test "generations" do - s1 = new(:s1) - s2 = new(:s2) + test "generations", config do + s1 = new(:s1, config) + s2 = new(:s2, config) s1 = State.join(s1, new_pid(), "lobby", "user1", %{}) assert [gen1, gen1, gen1] = gens = push(s1, [], s1.delta, [2, 5, 6]) assert keys(gen1) == ["user1"] @@ -86,9 +86,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do assert sorted_clouds(gen3.clouds) == [{:s1, 3}, {:s1, 4}, {:s2, 1}, {:s2, 2}] end - test "does not include non-contiguous deltas" do - s1 = new(:s1) - s3 = new(:s3) + test "does not include non-contiguous deltas", config do + s1 = new(:s1, config) + s3 = new(:s3, config) s1 = State.join(s1, new_pid(), "lobby", "user1", %{}) old_s3 = s3 = State.join(s3, new_pid(), "lobby", "user3", %{}) s3 = State.reset_delta(s3) @@ -99,10 +99,10 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do assert [^gen1, ^gen1, ^gen1] = push(s1, gens, s3.delta, [5, 10, 15]) end - test "remove_down_replicas" do - s1 = new(:s1) - s2 = new(:s2) - s3 = new(:s3) + test "remove_down_replicas", config do + s1 = new(:s1, config) + s2 = new(:s2, config) + s3 = new(:s3, config) s2 = State.join(s2, new_pid(), "lobby", "user2", %{}) assert [gen1, gen1, gen1] = gens = push(s1, [], s2.delta, [5, 10, 15]) assert [pruned_gen1, pruned_gen1, pruned_gen1] = DeltaGeneration.remove_down_replicas(gens, :s2) diff --git a/test/phoenix/tracker/integration_test.exs b/test/phoenix/tracker/integration_test.exs deleted file mode 100644 index 9597e3f8d..000000000 --- a/test/phoenix/tracker/integration_test.exs +++ /dev/null @@ -1,390 +0,0 @@ -defmodule Phoenix.Tracker.IntegrationTest do - use Phoenix.PubSub.NodeCase - alias Phoenix.Tracker - alias Phoenix.Tracker.{Replica, State} - - @primary :"primary@127.0.0.1" - @node1 :"node1@127.0.0.1" - @node2 :"node2@127.0.0.1" - - setup config do - tracker = config.test - {:ok, tracker_pid} = start_tracker(name: tracker) - {:ok, topic: to_string(config.test), tracker: tracker, tracker_pid: tracker_pid} - end - - test "heartbeats", %{tracker: tracker} do - subscribe_to_tracker(tracker) - assert_heartbeat from: @primary - flush() - assert_heartbeat from: @primary - flush() - assert_heartbeat from: @primary - end - - test "gossip from unseen node triggers nodeup and transfer request", - %{tracker: tracker, topic: topic} do - - assert list(tracker, topic) == [] - subscribe_to_tracker(tracker) - drop_gossips(tracker) - spy_on_tracker(@node1, self(), tracker) - start_tracker(@node1, name: tracker) - track_presence(@node1, tracker, spawn_pid(), topic, "node1", %{}) - flush() - assert_heartbeat from: @node1 - - resume_gossips(tracker) - # primary sends transfer_req to node1 after seeing behind - ref = assert_transfer_req to: @node1, from: @primary - # node1 fulfills tranfer request and sends transfer_ack to primary - assert_transfer_ack ref, from: @node1 - assert_heartbeat to: @node1, from: @primary - assert [{"node1", _}] = list(tracker, topic) - end - - test "requests for transfer collapses clocks", - %{tracker: tracker, topic: topic} do - - subscribe_to_tracker(tracker) - subscribe(topic) - for node <- [@node1, @node2] do - spy_on_tracker(node, self(), tracker) - start_tracker(node, name: tracker) - assert_receive {{:replica_up, ^node}, @primary}, @timeout - assert_receive {{:replica_up, @primary}, ^node}, @timeout - end - assert_receive {{:replica_up, @node2}, @node1}, @timeout - assert_receive {{:replica_up, @node1}, @node2}, @timeout - - flush() - drop_gossips(tracker) - track_presence(@node1, tracker, spawn_pid(), topic, "node1", %{}) - track_presence(@node1, tracker, spawn_pid(), topic, "node1.2", %{}) - track_presence(@node2, tracker, spawn_pid(), topic, "node2", %{}) - - # node1 sends delta broadcast to node2 - assert_receive {@node1, {:pub, :heartbeat, {@node2, _vsn}, %State{mode: :delta}, _clocks}}, @timeout - - # node2 sends delta broadcast to node1 - assert_receive {@node2, {:pub, :heartbeat, {@node1, _vsn}, %State{mode: :delta}, _clocks}}, @timeout - - flush() - resume_gossips(tracker) - # primary sends transfer_req to node with most dominance - assert_receive {node, {:pub, :transfer_req, ref, {@primary, _vsn}, _state}}, @timeout * 2 - # primary does not send transfer_req to other node, since in dominant node's future - refute_received {_other, {:pub, :transfer_req, _ref, {@primary, _vsn}, _state}}, @timeout * 2 - # dominant node fulfills transfer request and sends transfer_ack to primary - assert_receive {:pub, :transfer_ack, ^ref, {^node, _vsn}, _state}, @timeout - - # wait for local sync - assert_join ^topic, "node1", %{} - assert_join ^topic, "node1.2", %{} - assert_join ^topic, "node2", %{} - assert_heartbeat from: @node1 - assert_heartbeat from: @node2 - - assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(tracker, topic) - end - - test "old pids from a node are permdowned when the node comes back up", - %{tracker: tracker, tracker_pid: tracker_pid, topic: topic} do - track_presence(@primary, tracker, self(), topic, @primary, %{}) - {node1_node, {:ok, node1_tracker}} = start_tracker(@node1, name: tracker) - track_presence(@node1, tracker, node1_node, topic, @node1, %{}) - - spy_on_tracker(@node1, self(), tracker) - assert_heartbeat to: @node1, from: @primary - - {node2_node, {:ok, node2_tracker}} = start_tracker(@node2, name: tracker) - track_presence(@node2, tracker, node2_node, topic, @node2, %{}) - - Process.unlink(node1_node) - Process.exit(node1_tracker, :kill) - assert_receive {{:replica_permdown, @node1}, @primary}, @permdown * 2 - - spy_on_tracker(@node2, self(), tracker) - - {node1_node_new, {:ok, node1_tracker}} = start_tracker(@node1, name: tracker) - track_presence(@node1, tracker, node1_node_new, topic, @node1, %{}) - - flush() - spy_on_tracker(@node1, self(), tracker) - assert_heartbeat to: @node2, from: @primary - assert_heartbeat to: @node1, from: @node2 - - refute {@node1, node1_node} in get_values(@primary, tracker_pid) - refute {@node1, node1_node} in get_values(@node1, node1_tracker) - refute {@node1, node1_node} in get_values(@node2, node2_tracker) - end - - # TODO split into multiple testscases - test "tempdowns with nodeups of new vsn, and permdowns", - %{tracker: tracker, topic: topic} do - - subscribe_to_tracker(tracker) - subscribe(topic) - - {node1_node, {:ok, node1_tracker}} = start_tracker(@node1, name: tracker) - {_node2_node, {:ok, _node2_tracker}} = start_tracker(@node2, name: tracker) - assert_receive {{:replica_up, @node1}, @node2}, @timeout - assert_receive {{:replica_up, @node2}, @node1}, @timeout - for node <- [@node1, @node2] do - track_presence(node, tracker, spawn_pid(), topic, node, %{}) - assert_join ^topic, ^node, %{} - end - assert_map %{@node1 => %Replica{status: :up, vsn: vsn_before}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - - # tempdown netsplit - flush() - :ok = :sys.suspend(node1_tracker) - assert_leave ^topic, @node1, %{} - assert_map %{@node1 => %Replica{status: :down, vsn: ^vsn_before}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - flush() - :ok = :sys.resume(node1_tracker) - assert_join ^topic, @node1, %{} - assert_heartbeat from: @node1 - assert_map %{@node1 => %Replica{status: :up, vsn: ^vsn_before}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - - # tempdown crash - Process.unlink(node1_node) - Process.exit(node1_tracker, :kill) - assert_leave ^topic, @node1, %{} - assert_map %{@node1 => %Replica{status: :down}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - - # tempdown => nodeup with new vsn - flush() - {node1_node, {:ok, node1_tracker}} = start_tracker(@node1, name: tracker) - assert_receive {{:replica_up, @node1}, @primary}, @timeout - assert_receive {{:replica_up, @node1}, @node2}, @timeout - track_presence(@node1, tracker, spawn_pid(), topic, "node1-back", %{}) - assert_join ^topic, "node1-back", %{} - assert [{@node2, _}, {"node1-back", _}] = list(tracker, topic) - assert_map %{@node1 => %Replica{status: :up, vsn: new_vsn}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - assert vsn_before != new_vsn - - # tempdown again - Process.unlink(node1_node) - Process.exit(node1_tracker, :kill) - assert_receive {{:replica_down, @node1}, @primary}, @permdown - assert_leave ^topic, "node1-back", %{} - assert_map %{@node1 => %Replica{status: :down}, - @node2 => %Replica{status: :up}}, replicas(tracker), 2 - - # tempdown => permdown - flush() - for _ <- 0..trunc(@permdown / @heartbeat), do: assert_heartbeat(from: @primary) - assert_map %{@node2 => %Replica{status: :up}}, replicas(tracker), 1 - end - - test "node detects and locally broadcasts presence_join/leave", - %{tracker: tracker, topic: topic} do - - local_presence = spawn_pid() - remote_pres = spawn_pid() - - # local joins - subscribe(topic) - assert list(tracker, topic) == [] - {:ok, _ref} = Tracker.track(tracker, self(), topic, "me", %{name: "me"}) - assert_join ^topic, "me", %{name: "me"} - assert [{"me", %{name: "me", phx_ref: _}}] = list(tracker, topic) - - {:ok, _ref} = Tracker.track(tracker, local_presence , topic, "me2", %{name: "me2"}) - assert_join ^topic, "me2", %{name: "me2"} - assert [{"me", %{name: "me", phx_ref: _}}, - {"me2",%{name: "me2", phx_ref: _}}] = - list(tracker, topic) - - # remote joins - assert replicas(tracker) == %{} - start_tracker(@node1, name: tracker) - assert_receive {{:replica_up, @node1}, @primary}, @timeout - assert_receive {{:replica_up, @primary}, @node1}, @timeout - track_presence(@node1, tracker, remote_pres, topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} - assert_map %{@node1 => %Replica{status: :up}}, replicas(tracker), 1 - assert [{"me", %{name: "me", phx_ref: _}}, - {"me2",%{name: "me2", phx_ref: _}}, - {"node1", %{name: "s1", phx_ref: _}}] = - list(tracker, topic) - - # local leaves - Process.exit(local_presence, :kill) - assert_leave ^topic, "me2", %{name: "me2"} - assert [{"me", %{name: "me", phx_ref: _}}, - {"node1", %{name: "s1", phx_ref: _}}] = - list(tracker, topic) - - # remote leaves - Process.exit(remote_pres, :kill) - assert_leave ^topic, "node1", %{name: "s1"} - assert [{"me", %{name: "me", phx_ref: _}}] = list(tracker, topic) - end - - test "detects nodedown and locally broadcasts leaves", - %{tracker: tracker, topic: topic} do - - local_presence = spawn_pid() - subscribe(topic) - {node_pid, {:ok, node1_tracker}} = start_tracker(@node1, name: tracker) - assert_receive {{:replica_up, @node1}, @primary}, @timeout - assert_receive {{:replica_up, @primary}, @node1}, @timeout - assert list(tracker, topic) == [] - - {:ok, _ref} = Tracker.track(tracker, local_presence , topic, "local1", %{name: "l1"}) - assert_join ^topic, "local1", %{} - - track_presence(@node1, tracker, spawn_pid(), topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} - assert %{@node1 => %Replica{status: :up}} = replicas(tracker) - assert [{"local1", _}, {"node1", _}] = list(tracker, topic) - - # nodedown - Process.unlink(node_pid) - Process.exit(node1_tracker, :kill) - assert_receive {{:replica_down, @node1}, @primary}, @permdown - assert_leave ^topic, "node1", %{name: "s1"} - assert %{@node1 => %Replica{status: :down}} = replicas(tracker) - assert [{"local1", _}] = list(tracker, topic) - end - - test "untrack with no tracked topic is a noop", - %{tracker: tracker, topic: topic} do - assert Tracker.untrack(tracker, self(), topic, "foo") == :ok - end - - test "untrack with topic", - %{tracker: tracker, topic: topic} do - - Tracker.track(tracker, self(), topic, "user1", %{name: "user1"}) - Tracker.track(tracker, self(), "another:topic", "user2", %{name: "user2"}) - assert [{"user1", %{name: "user1"}}] = list(tracker, topic) - assert [{"user2", %{name: "user2"}}] = list(tracker, "another:topic") - assert Tracker.untrack(tracker, self(), topic, "user1") == :ok - assert [] = list(tracker, topic) - assert [{"user2", %{name: "user2"}}] = list(tracker, "another:topic") - assert Process.whereis(tracker) in Process.info(self())[:links] - assert Tracker.untrack(tracker, self(), "another:topic", "user2") == :ok - assert [] = list(tracker, "another:topic") - refute Process.whereis(tracker) in Process.info(self())[:links] - end - - test "untrack from all topics", - %{tracker: tracker, topic: topic} do - - Tracker.track(tracker, self(), topic, "user1", %{name: "user1"}) - Tracker.track(tracker, self(), "another:topic", "user2", %{name: "user2"}) - assert [{"user1", %{name: "user1"}}] = list(tracker, topic) - assert [{"user2", %{name: "user2"}}] = list(tracker, "another:topic") - assert Process.whereis(tracker) in Process.info(self())[:links] - assert Tracker.untrack(tracker, self()) == :ok - assert [] = list(tracker, topic) - assert [] = list(tracker, "another:topic") - refute Process.whereis(tracker) in Process.info(self())[:links] - end - - test "updating presence sends join/leave and phx_ref_prev", - %{tracker: tracker, topic: topic} do - - subscribe(topic) - {:ok, _ref} = Tracker.track(tracker, self(), topic, "u1", %{name: "u1"}) - assert [{"u1", %{name: "u1", phx_ref: ref}}] = list(tracker, topic) - {:ok, _ref} = Tracker.update(tracker, self(), topic, "u1", %{name: "u1-updated"}) - assert_leave ^topic, "u1", %{name: "u1", phx_ref: ^ref} - assert_join ^topic, "u1", %{name: "u1-updated", phx_ref_prev: ^ref} - end - - test "updating presence sends join/leave and phx_ref_prev with profer diffs if function for update used", - %{tracker: tracker, topic: topic} do - - subscribe(topic) - {:ok, _ref} = Tracker.track(tracker, self(), topic, "u1", %{browser: "Chrome", status: "online"}) - assert [{"u1", %{browser: "Chrome", status: "online", phx_ref: ref}}] = list(tracker, topic) - {:ok, _ref} = Tracker.update(tracker, self(), topic, "u1", fn meta -> Map.put(meta, :status, "away") end) - assert_leave ^topic, "u1", %{browser: "Chrome", status: "online", phx_ref: ^ref} - assert_join ^topic, "u1", %{browser: "Chrome", status: "away", phx_ref_prev: ^ref} - end - - test "updating with no prior presence", %{tracker: tracker, topic: topic} do - assert {:error, :nopresence} = Tracker.update(tracker, self(), topic, "u1", %{}) - end - - test "duplicate tracking", %{tracker: tracker, topic: topic} do - pid = self() - assert {:ok, _ref} = Tracker.track(tracker, pid, topic, "u1", %{}) - assert {:error, {:already_tracked, ^pid, ^topic, "u1"}} = - Tracker.track(tracker, pid, topic, "u1", %{}) - assert {:ok, _ref} = Tracker.track(tracker, pid, "another:topic", "u1", %{}) - assert {:ok, _ref} = Tracker.track(tracker, pid, topic, "anotherkey", %{}) - - assert :ok = Tracker.untrack(tracker, pid, topic, "u1") - assert :ok = Tracker.untrack(tracker, pid, "another:topic", "u1") - assert :ok = Tracker.untrack(tracker, pid, topic, "anotherkey") - end - - test "graceful exits with permdown", %{tracker: tracker, topic: topic} do - subscribe(topic) - {_node_pid, {:ok, _node1_tracker}} = start_tracker(@node1, name: tracker) - assert_receive {{:replica_up, @node1}, @primary}, @timeout - assert_receive {{:replica_up, @primary}, @node1}, @timeout - track_presence(@node1, tracker, spawn_pid(), topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} - assert %{@node1 => %Replica{status: :up}} = replicas(tracker) - assert [{"node1", _}] = list(tracker, topic) - - # graceful permdown - {_, :ok} = graceful_permdown(@node1, tracker) - assert_leave ^topic, "node1", %{name: "s1"} - assert [] = list(tracker, topic) - assert replicas(tracker) == %{} - end - - - ## Helpers - - def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end) - - def replicas(tracker), do: GenServer.call(tracker, :replicas) - - def refute_transfer_req(opts) do - to = Keyword.fetch!(opts, :to) - from = Keyword.fetch!(opts, :from) - refute_receive {^to, {:pub, :transfer_req, _, {^from, _vsn}, _}}, @timeout * 2 - end - - def assert_transfer_req(opts) do - to = Keyword.fetch!(opts, :to) - from = Keyword.fetch!(opts, :from) - assert_receive {^to, {:pub, :transfer_req, ref, {^from, _vsn}, _}}, @timeout * 2 - ref - end - - def assert_transfer_ack(ref, opts) do - from = Keyword.fetch!(opts, :from) - if to = opts[:to] do - assert_receive {^to, {:pub, :transfer_ack, ^ref, {^from, _vsn}, _state}}, @timeout - else - assert_receive {:pub, :transfer_ack, ^ref, {^from, _vsn}, _state}, @timeout - end - end - - def assert_heartbeat(opts) do - from = Keyword.fetch!(opts, :from) - if to = opts[:to] do - assert_receive {^to, {:pub, :heartbeat, {^from, _vsn}, _delta, _clocks}}, @timeout - else - assert_receive {:pub, :heartbeat, {^from, _vsn}, _delta, _clocks}, @timeout - end - end - - defp list(tracker, topic) do - Enum.sort(Tracker.list(tracker, topic)) - end -end diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs new file mode 100644 index 000000000..d5362dd38 --- /dev/null +++ b/test/phoenix/tracker/pool_test.exs @@ -0,0 +1,133 @@ +defmodule Phoenix.Tracker.PoolTest do + use Phoenix.PubSub.NodeCase + alias Phoenix.Tracker + + setup config do + server = config.test + {:ok, _pid} = start_pool(name: server, pool_size: config.pool_size) + {:ok, server: server} + end + + for n <- [1,2,8,512] do + + @tag pool_size: n + test "pool #{n}: A track/5 call results in the id being tracked", + %{server: server} do + {:ok, ref} = Tracker.track(server, self(), "topic", "me", %{name: "me"}) + assert [{"me", %{name: "me", phx_ref: ^ref}}] + = Tracker.list(server, "topic") + end + + @tag pool_size: n + test "pool #{n}: dirty_list/2 returns tracked ids", %{server: server} do + {:ok, ref} = Tracker.track(server, self(), "topic", "me", %{name: "me"}) + assert [{"me", %{name: "me", phx_ref: ^ref}}] + = Tracker.dirty_list(server, "topic") + end + + @tag pool_size: n + test "pool #{n}: Track/5 results in all ids being tracked", + %{server: server} do + + topics = for i <- 1..100, do: "topic_#{i}" + + refs = for topic <- topics do + {:ok, ref} = Tracker.track(server, self(), topic, "me", %{name: "me"}) + ref + end + + for {t, ref} <- List.zip([topics, refs]) do + assert Tracker.list(server, t) == [{"me", %{name: "me", phx_ref: ref}}] + end + end + + @tag pool_size: n + test "pool #{n}: Untrack/4 results in all ids being untracked", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + for t <- topics do + {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + end + for t <- topics, do: :ok = Tracker.untrack(server, self(), t, "me") + + for t <- topics, do: assert Tracker.list(server, t) == [] + end + + @tag pool_size: n + test "pool #{n}: Untrack/2 results in all ids being untracked", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + for t <- topics do + {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + end + :ok = Tracker.untrack(server, self()) + + for t <- topics, do: assert Tracker.list(server, t) == [] + end + + + @tag pool_size: n + test "pool #{n}: Update/5 updates a given trackees metas", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + old_refs = for t <- topics do + {:ok, ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + ref + end + new_refs = for t <- topics do + {:ok, new_ref} = Tracker.update(server, self(), t, "me", %{new: "thing"}) + new_ref + end + + expected_changes = List.zip([topics, old_refs, new_refs]) + + for {t, old_ref, new_ref} <- expected_changes do + assert [{"me", %{new: "thing", + phx_ref: ^new_ref, + phx_ref_prev: ^old_ref}}] + = Tracker.list(server, t) + end + end + + @tag pool_size: n + test "pool #{n}: Update/5 applies fun to given trackees metas", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + old_refs = for t <- topics do + {:ok, ref} = Tracker.track(server, self(), t, "me", %{a: "oldval"}) + ref + end + + update_fun = fn(m) -> Map.put(m, :a, "newval") end + + new_refs = for t <- topics do + {:ok, new_ref} = Tracker.update(server, self(), t, "me", update_fun) + new_ref + end + + expected_changes = List.zip([topics, old_refs, new_refs]) + + for {t, old_ref, new_ref} <- expected_changes do + assert [{"me", %{a: "newval", + phx_ref: ^new_ref, + phx_ref_prev: ^old_ref}}] + = Tracker.list(server, t) + end + end + + @tag pool_size: n + test "pool #{n}: Graceful_permdown/2 results in all ids being untracked", + %{server: server, pool_size: n} do + topics = for i <- 1..100, do: "topic_#{i}" + for t <- topics do + {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + end + :ok = Tracker.graceful_permdown(server) + :timer.sleep(n) + + for t <- topics, do: assert Tracker.list(server, t) == [] + end + + end + +end diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs new file mode 100644 index 000000000..88473717d --- /dev/null +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -0,0 +1,389 @@ +defmodule Phoenix.Tracker.ShardReplicationTest do + use Phoenix.PubSub.NodeCase + alias Phoenix.Tracker.{Replica, Shard, State} + + @primary :"primary@127.0.0.1" + @node1 :"node1@127.0.0.1" + @node2 :"node2@127.0.0.1" + + setup config do + tracker = config.test + {:ok, shard_pid} = start_shard(name: tracker) + {:ok, topic: to_string(config.test), + shard: shard_name(tracker), + shard_pid: shard_pid, + tracker: tracker} + end + + test "heartbeats", %{shard: shard} do + subscribe_to_server(shard) + assert_heartbeat from: @primary + flush() + assert_heartbeat from: @primary + flush() + assert_heartbeat from: @primary + end + + test "gossip from unseen node triggers nodeup and transfer request", + %{shard: shard, topic: topic, tracker: tracker} do + + assert list(shard, topic) == [] + subscribe_to_server(shard) + drop_gossips(shard) + spy_on_server(@node1, self(), shard) + start_shard(@node1, name: tracker) + track_presence(@node1, shard, spawn_pid(), topic, "node1", %{}) + flush() + assert_heartbeat from: @node1 + + resume_gossips(shard) + # primary sends transfer_req to node1 after seeing behind + ref = assert_transfer_req to: @node1, from: @primary + # node1 fulfills tranfer request and sends transfer_ack to primary + assert_transfer_ack ref, from: @node1 + assert_heartbeat to: @node1, from: @primary + assert [{"node1", _}] = list(shard, topic) + end + + test "requests for transfer collapses clocks", + %{shard: shard, topic: topic, tracker: tracker} do + + subscribe_to_server(shard) + subscribe(topic) + for node <- [@node1, @node2] do + spy_on_server(node, self(), shard) + start_shard(node, name: tracker) + assert_receive {{:replica_up, ^node}, @primary}, @timeout + assert_receive {{:replica_up, @primary}, ^node}, @timeout + end + assert_receive {{:replica_up, @node2}, @node1}, @timeout + assert_receive {{:replica_up, @node1}, @node2}, @timeout + + flush() + drop_gossips(shard) + track_presence(@node1, shard, spawn_pid(), topic, "node1", %{}) + track_presence(@node1, shard, spawn_pid(), topic, "node1.2", %{}) + track_presence(@node2, shard, spawn_pid(), topic, "node2", %{}) + + # node1 sends delta broadcast to node2 + assert_receive {@node1, {:pub, :heartbeat, {@node2, _vsn}, %State{mode: :delta}, _clocks}}, @timeout + + # node2 sends delta broadcast to node1 + assert_receive {@node2, {:pub, :heartbeat, {@node1, _vsn}, %State{mode: :delta}, _clocks}}, @timeout + + flush() + resume_gossips(shard) + # primary sends transfer_req to node with most dominance + assert_receive {node, {:pub, :transfer_req, ref, {@primary, _vsn}, _state}}, @timeout * 2 + # primary does not send transfer_req to other node, since in dominant node's future + refute_received {_other, {:pub, :transfer_req, _ref, {@primary, _vsn}, _state}}, @timeout * 2 + # dominant node fulfills transfer request and sends transfer_ack to primary + assert_receive {:pub, :transfer_ack, ^ref, {^node, _vsn}, _state}, @timeout + + # wait for local sync + assert_join ^topic, "node1", %{} + assert_join ^topic, "node1.2", %{} + assert_join ^topic, "node2", %{} + assert_heartbeat from: @node1 + assert_heartbeat from: @node2 + + assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic) + end + + test "old pids from a node are permdowned when the node comes back up", + %{shard: shard, shard_pid: shard_pid, topic: topic, tracker: tracker} do + track_presence(@primary, shard, self(), topic, @primary, %{}) + {node1_node, {:ok, node1_shard}} = start_shard(@node1, name: tracker) + track_presence(@node1, shard, node1_node, topic, @node1, %{}) + + spy_on_server(@node1, self(), shard) + assert_heartbeat to: @node1, from: @primary + + {node2_node, {:ok, node2_shard}} = start_shard(@node2, name: tracker) + track_presence(@node2, shard, node2_node, topic, @node2, %{}) + + Process.unlink(node1_node) + Process.exit(node1_shard, :kill) + assert_receive {{:replica_permdown, @node1}, @primary}, @permdown * 2 + + spy_on_server(@node2, self(), shard) + + {node1_node_new, {:ok, node1_shard}} = start_shard(@node1, name: tracker) + track_presence(@node1, shard, node1_node_new, topic, @node1, %{}) + + flush() + spy_on_server(@node1, self(), shard) + assert_heartbeat to: @node2, from: @primary + assert_heartbeat to: @node1, from: @node2 + + refute {@node1, node1_node} in get_values(@primary, shard_pid) + refute {@node1, node1_node} in get_values(@node1, node1_shard) + refute {@node1, node1_node} in get_values(@node2, node2_shard) + end + + # TODO split into multiple testscases + test "tempdowns with nodeups of new vsn, and permdowns", + %{shard: shard, topic: topic, tracker: tracker} do + + subscribe_to_server(shard) + subscribe(topic) + + {node1_node, {:ok, node1_server}} = start_shard(@node1, name: tracker) + {_node2_node, {:ok, _node2_server}} = start_shard(@node2, name: tracker) + for node <- [@node1, @node2] do + track_presence(node, shard, spawn_pid(), topic, node, %{}) + assert_join ^topic, ^node, %{} + end + assert_map %{@node1 => %Replica{status: :up, vsn: vsn_before}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + + # tempdown netsplit + flush() + :ok = :sys.suspend(node1_server) + assert_leave ^topic, @node1, %{} + assert_map %{@node1 => %Replica{status: :down, vsn: ^vsn_before}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + flush() + :ok = :sys.resume(node1_server) + assert_join ^topic, @node1, %{} + assert_heartbeat from: @node1 + assert_map %{@node1 => %Replica{status: :up, vsn: ^vsn_before}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + + # tempdown crash + Process.unlink(node1_node) + Process.exit(node1_server, :kill) + assert_leave ^topic, @node1, %{} + assert_map %{@node1 => %Replica{status: :down}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + + # tempdown => nodeup with new vsn + {node1_node, {:ok, node1_server}} = start_shard(@node1, name: tracker) + track_presence(@node1, shard, spawn_pid(), topic, "node1-back", %{}) + assert_join ^topic, "node1-back", %{} + assert [{@node2, _}, {"node1-back", _}] = list(shard, topic) + assert_map %{@node1 => %Replica{status: :up, vsn: new_vsn}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + assert vsn_before != new_vsn + + # tempdown again + Process.unlink(node1_node) + Process.exit(node1_server, :kill) + assert_leave ^topic, "node1-back", %{} + assert_map %{@node1 => %Replica{status: :down}, + @node2 => %Replica{status: :up}}, replicas(shard), 2 + + # tempdown => permdown + flush() + for _ <- 0..trunc(@permdown / @heartbeat), do: assert_heartbeat(from: @primary) + assert_map %{@node2 => %Replica{status: :up}}, replicas(shard), 1 + end + + test "node detects and locally broadcasts presence_join/leave", + %{shard: shard, topic: topic, tracker: tracker} do + + local_presence = spawn_pid() + remote_pres = spawn_pid() + + # local joins + subscribe(topic) + assert list(shard, topic) == [] + {:ok, _ref} = Shard.track(shard, self(), topic, "me", %{name: "me"}) + assert_join ^topic, "me", %{name: "me"} + assert [{"me", %{name: "me", phx_ref: _}}] = list(shard, topic) + + {:ok, _ref} = Shard.track(shard, local_presence , topic, "me2", %{name: "me2"}) + assert_join ^topic, "me2", %{name: "me2"} + assert [{"me", %{name: "me", phx_ref: _}}, + {"me2",%{name: "me2", phx_ref: _}}] = + list(shard, topic) + + # remote joins + assert replicas(shard) == %{} + start_shard(@node1, name: tracker) + track_presence(@node1, shard, remote_pres, topic, "node1", %{name: "s1"}) + assert_join ^topic, "node1", %{name: "s1"} + assert_map %{@node1 => %Replica{status: :up}}, replicas(shard), 1 + assert [{"me", %{name: "me", phx_ref: _}}, + {"me2",%{name: "me2", phx_ref: _}}, + {"node1", %{name: "s1", phx_ref: _}}] = + list(shard, topic) + + # local leaves + Process.exit(local_presence, :kill) + assert_leave ^topic, "me2", %{name: "me2"} + assert [{"me", %{name: "me", phx_ref: _}}, + {"node1", %{name: "s1", phx_ref: _}}] = + list(shard, topic) + + # remote leaves + Process.exit(remote_pres, :kill) + assert_leave ^topic, "node1", %{name: "s1"} + assert [{"me", %{name: "me", phx_ref: _}}] = list(shard, topic) + end + + test "detects nodedown and locally broadcasts leaves", + %{shard: shard, topic: topic, tracker: tracker} do + + local_presence = spawn_pid() + subscribe(topic) + {node_pid, {:ok, node1_server}} = start_shard(@node1, name: tracker) + assert list(shard, topic) == [] + + {:ok, _ref} = Shard.track(shard, local_presence , topic, "local1", %{name: "l1"}) + assert_join ^topic, "local1", %{} + + track_presence(@node1, shard, spawn_pid(), topic, "node1", %{name: "s1"}) + assert_join ^topic, "node1", %{name: "s1"} + assert %{@node1 => %Replica{status: :up}} = replicas(shard) + assert [{"local1", _}, {"node1", _}] = list(shard, topic) + assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + + # nodedown + Process.unlink(node_pid) + Process.exit(node1_server, :kill) + assert_leave ^topic, "node1", %{name: "s1"} + assert %{@node1 => %Replica{status: :down}} = replicas(shard) + assert [{"local1", _}] = list(shard, topic) + assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + + :timer.sleep(@permdown + 2*@heartbeat) + assert [{"local1", _}] = dirty_list(shard, topic) + end + + + test "untrack with no tracked topic is a noop", + %{shard: shard, topic: topic} do + assert Shard.untrack(shard, self(), topic, "foo") == :ok + end + + test "untrack with topic", + %{shard: shard, topic: topic} do + + Shard.track(shard, self(), topic, "user1", %{name: "user1"}) + Shard.track(shard, self(), "another:topic", "user2", %{name: "user2"}) + assert [{"user1", %{name: "user1"}}] = list(shard, topic) + assert [{"user2", %{name: "user2"}}] = list(shard, "another:topic") + assert Shard.untrack(shard, self(), topic, "user1") == :ok + assert [] = list(shard, topic) + assert [{"user2", %{name: "user2"}}] = list(shard, "another:topic") + assert Process.whereis(shard) in Process.info(self())[:links] + assert Shard.untrack(shard, self(), "another:topic", "user2") == :ok + assert [] = list(shard, "another:topic") + refute Process.whereis(shard) in Process.info(self())[:links] + end + + test "untrack from all topics", + %{shard: shard, topic: topic} do + + Shard.track(shard, self(), topic, "user1", %{name: "user1"}) + Shard.track(shard, self(), "another:topic", "user2", %{name: "user2"}) + assert [{"user1", %{name: "user1"}}] = list(shard, topic) + assert [{"user2", %{name: "user2"}}] = list(shard, "another:topic") + assert Process.whereis(shard) in Process.info(self())[:links] + assert Shard.untrack(shard, self()) == :ok + assert [] = list(shard, topic) + assert [] = list(shard, "another:topic") + refute Process.whereis(shard) in Process.info(self())[:links] + end + + test "updating presence sends join/leave and phx_ref_prev", + %{shard: shard, topic: topic} do + + subscribe(topic) + {:ok, _ref} = Shard.track(shard, self(), topic, "u1", %{name: "u1"}) + assert [{"u1", %{name: "u1", phx_ref: ref}}] = list(shard, topic) + {:ok, _ref} = Shard.update(shard, self(), topic, "u1", %{name: "u1-updated"}) + assert_leave ^topic, "u1", %{name: "u1", phx_ref: ^ref} + assert_join ^topic, "u1", %{name: "u1-updated", phx_ref_prev: ^ref} + end + + test "updating presence sends join/leave and phx_ref_prev with profer diffs if function for update used", + %{shard: shard, topic: topic} do + + subscribe(topic) + {:ok, _ref} = Shard.track(shard, self(), topic, "u1", %{browser: "Chrome", status: "online"}) + assert [{"u1", %{browser: "Chrome", status: "online", phx_ref: ref}}] = list(shard, topic) + {:ok, _ref} = Shard.update(shard, self(), topic, "u1", fn meta -> Map.put(meta, :status, "away") end) + assert_leave ^topic, "u1", %{browser: "Chrome", status: "online", phx_ref: ^ref} + assert_join ^topic, "u1", %{browser: "Chrome", status: "away", phx_ref_prev: ^ref} + end + + test "updating with no prior presence", %{shard: shard, topic: topic} do + assert {:error, :nopresence} = Shard.update(shard, self(), topic, "u1", %{}) + end + + test "duplicate tracking", %{shard: shard, topic: topic} do + pid = self() + assert {:ok, _ref} = Shard.track(shard, pid, topic, "u1", %{}) + assert {:error, {:already_tracked, ^pid, ^topic, "u1"}} = + Shard.track(shard, pid, topic, "u1", %{}) + assert {:ok, _ref} = Shard.track(shard, pid, "another:topic", "u1", %{}) + assert {:ok, _ref} = Shard.track(shard, pid, topic, "anotherkey", %{}) + + assert :ok = Shard.untrack(shard, pid, topic, "u1") + assert :ok = Shard.untrack(shard, pid, "another:topic", "u1") + assert :ok = Shard.untrack(shard, pid, topic, "anotherkey") + end + + test "graceful exits with permdown", + %{shard: shard, topic: topic, tracker: tracker} do + subscribe(topic) + {_node_pid, {:ok, _node1_server}} = start_shard(@node1, name: tracker) + track_presence(@node1, shard, spawn_pid(), topic, "node1", %{name: "s1"}) + assert_join ^topic, "node1", %{name: "s1"} + assert %{@node1 => %Replica{status: :up}} = replicas(shard) + assert [{"node1", _}] = list(shard, topic) + + # graceful permdown + {_, :ok} = graceful_permdown(@node1, shard) + assert_leave ^topic, "node1", %{name: "s1"} + assert [] = list(shard, topic) + assert replicas(shard) == %{} + end + + ## Helpers + + def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end) + + def replicas(server), do: GenServer.call(server, :replicas) + + def refute_transfer_req(opts) do + to = Keyword.fetch!(opts, :to) + from = Keyword.fetch!(opts, :from) + refute_receive {^to, {:pub, :transfer_req, _, {^from, _vsn}, _}}, @timeout * 2 + end + + def assert_transfer_req(opts) do + to = Keyword.fetch!(opts, :to) + from = Keyword.fetch!(opts, :from) + assert_receive {^to, {:pub, :transfer_req, ref, {^from, _vsn}, _}}, @timeout * 2 + ref + end + + def assert_transfer_ack(ref, opts) do + from = Keyword.fetch!(opts, :from) + if to = opts[:to] do + assert_receive {^to, {:pub, :transfer_ack, ^ref, {^from, _vsn}, _state}}, @timeout + else + assert_receive {:pub, :transfer_ack, ^ref, {^from, _vsn}, _state}, @timeout + end + end + + def assert_heartbeat(opts) do + from = Keyword.fetch!(opts, :from) + if to = opts[:to] do + assert_receive {^to, {:pub, :heartbeat, {^from, _vsn}, _delta, _clocks}}, @timeout + else + assert_receive {:pub, :heartbeat, {^from, _vsn}, _delta, _clocks}, @timeout + end + end + + defp list(shard, topic) do + Enum.sort(Shard.list(shard, topic)) + end + + defp dirty_list(shard, topic) do + Enum.sort(Shard.dirty_list(shard, topic)) + end +end diff --git a/test/phoenix/tracker/state_test.exs b/test/phoenix/tracker/state_test.exs index de8825622..ed849cd01 100644 --- a/test/phoenix/tracker/state_test.exs +++ b/test/phoenix/tracker/state_test.exs @@ -8,8 +8,8 @@ defmodule Phoenix.Tracker.StateTest do |> Enum.sort() end - defp new(node) do - State.new({node, 1}) + defp new(node, config) do + State.new({node, 1}, :"#{node} #{config.test}") end defp new_pid() do @@ -24,24 +24,24 @@ defmodule Phoenix.Tracker.StateTest do defp tab2list(tab), do: tab |> :ets.tab2list() |> Enum.sort() - test "that this is set up correctly" do - a = new(:a) + test "that this is set up correctly", config do + a = new(:a, config) assert {_a, map} = State.extract(a, :a, a.context) assert map == %{} end - test "user added online is online" do - a = new(:a) + test "user added online is online", config do + a = new(:a, config) john = new_pid() a = State.join(a, john, "lobby", :john) - assert [{{_, _, :john}, _, _}] = State.get_by_topic(a, "lobby") + assert [{:john, _meta}] = State.get_by_topic(a, "lobby") a = State.leave(a, john, "lobby", :john) assert [] = State.get_by_topic(a, "lobby") end - test "users from other servers merge" do - a = new(:a) - b = new(:b) + test "users from other servers merge", config do + a = new(:a, config) + b = new(:b, config) {a, _, _} = State.replica_up(a, b.replica) {b, _, _} = State.replica_up(b, a.replica) @@ -89,9 +89,9 @@ defmodule Phoenix.Tracker.StateTest do assert (State.online_list(b) |> Enum.sort) == (State.online_list(a) |> Enum.sort) end - test "basic netsplit" do - a = new(:a) - b = new(:b) + test "basic netsplit", config do + a = new(:a, config) + b = new(:b, config) {a, _, _} = State.replica_up(a, b.replica) {b, _, _} = State.replica_up(b, a.replica) @@ -123,9 +123,9 @@ defmodule Phoenix.Tracker.StateTest do assert [:bob, :carol, :david] = keys(State.online_list(a)) end - test "get_by_pid" do + test "get_by_pid", config do pid = self() - state = new(:node1) + state = new(:node1, config) assert State.get_by_pid(state, pid) == [] state = State.join(state, pid, "topic", "key1", %{}) @@ -139,11 +139,11 @@ defmodule Phoenix.Tracker.StateTest do assert State.get_by_pid(state, pid, "notopic", "nokey") == nil end - test "get_by_topic" do + test "get_by_topic", config do pid = self() - state = new(:node1) - state2 = new(:node2) - state3 = new(:node3) + state = new(:node1, config) + state2 = new(:node2, config) + state3 = new(:node3, config) {state, _, _} = State.replica_up(state, {:node2, 1}) {state, _, _} = State.replica_up(state, {:node3, 1}) @@ -170,37 +170,29 @@ defmodule Phoenix.Tracker.StateTest do state3 = State.join(state3, user3, "topic", "user3", %{}) # all replicas online - assert [{{"topic", ^pid, "key1"}, %{}, {{:node1, 1}, 1}}, - {{"topic", ^pid, "key2"}, %{}, {{:node1, 1}, 2}}] = + assert [{"key1", %{}}, {"key2", %{}}] = State.get_by_topic(state, "topic") {state, _, _} = State.merge(state, State.extract(state2, :node1, state.context)) {state, _, _} = State.merge(state, State.extract(state3, :node1, state.context)) - assert [{{"topic", ^pid, "key1"}, %{}, {{:node1, 1}, 1}}, - {{"topic", ^pid, "key2"}, %{}, {{:node1, 1}, 2}}, - {{"topic", ^user2, "user2"}, %{}, {{:node2, 1}, 1}}, - {{"topic", ^user3, "user3"}, %{}, {{:node3, 1}, 1}}] = - State.get_by_topic(state, "topic") + assert [{"key1", %{}}, {"key2", %{}}, {"user2", %{}}, {"user3", %{}}] = + State.get_by_topic(state, "topic") # one replica offline {state, _, _} = State.replica_down(state, state2.replica) - assert [{{"topic", ^pid, "key1"}, %{}, {{:node1, 1}, 1}}, - {{"topic", ^pid, "key2"}, %{}, {{:node1, 1}, 2}}, - {{"topic", ^user3, "user3"}, %{}, {{:node3, 1}, 1}}] = - State.get_by_topic(state, "topic") + assert [{"key1", %{}}, {"key2", %{}}, {"user3", %{}}] = + State.get_by_topic(state, "topic") # two replicas offline {state, _, _} = State.replica_down(state, state3.replica) - assert [{{"topic", ^pid, "key1"}, %{}, {{:node1, 1}, 1}}, - {{"topic", ^pid, "key2"}, %{}, {{:node1, 1}, 2}}] = - State.get_by_topic(state, "topic") + assert [{"key1", %{}}, {"key2", %{}}] = State.get_by_topic(state, "topic") assert [] = State.get_by_topic(state, "another:topic") end - test "remove_down_replicas" do - state1 = new(:node1) - state2 = new(:node2) + test "remove_down_replicas", config do + state1 = new(:node1, config) + state2 = new(:node2, config) {state1, _, _} = State.replica_up(state1, state2.replica) {state2, _, _} = State.replica_up(state2, state1.replica) @@ -222,9 +214,10 @@ defmodule Phoenix.Tracker.StateTest do assert keys(State.online_list(state2)) == [:bob] end - test "basic deltas" do - a = new(:a) - b = new(:b) + test "basic deltas", config do + a = new(:a, config) + b = new(:b, config) + {a, _, _} = State.replica_up(a, b.replica) {b, _, _} = State.replica_up(b, a.replica) @@ -249,9 +242,9 @@ defmodule Phoenix.Tracker.StateTest do assert Enum.all?(Enum.map(b.clouds, fn {_, cloud} -> Enum.empty?(cloud) end)) end - test "merging deltas" do - s1 = new(:s1) - s2 = new(:s2) + test "merging deltas", config do + s1 = new(:s1, config) + s2 = new(:s2, config) user1 = new_pid() user2 = new_pid() @@ -271,12 +264,12 @@ defmodule Phoenix.Tracker.StateTest do [{{:s1, 1}, 1}, {{:s1, 1}, 2}, {{:s2, 1}, 1}, {{:s2, 1}, 2}] end - test "merging deltas with removes" do - s1 = new(:s1) - s2 = new(:s2) + test "merging deltas with removes", config do + s1 = new(:s1, config) + s2 = new(:s2, config) + user1 = new_pid() {s1, _, _} = State.replica_up(s1, s2.replica) {s2, _, _} = State.replica_up(s2, s1.replica) - user1 = new_pid() # concurrent add wins s1 = State.join(s1, user1, "lobby", "user1", %{}) diff --git a/test/phoenix/tracker_test.exs b/test/phoenix/tracker_test.exs index 2122f81cf..f6338af2f 100644 --- a/test/phoenix/tracker_test.exs +++ b/test/phoenix/tracker_test.exs @@ -1,14 +1,16 @@ -defmodule Phoenix.TrackerTest do +defmodule Phoenix.Tracker.ShardTest do use ExUnit.Case, async: true @opts [pubsub_server: nil, name: nil] test "validates down_period" do opts = Keyword.merge(@opts, [down_period: 1]) - assert Phoenix.Tracker.init([nil, nil, opts]) == {:error, "down_period must be at least twice as large as the broadcast_period"} + assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == + {:error, "down_period must be at least twice as large as the broadcast_period"} end test "validates permdown_period" do opts = Keyword.merge(@opts, [permdown_period: 1_200_00, down_period: 1_200_000]) - assert Phoenix.Tracker.init([nil, nil, opts]) == {:error, "permdown_period must be at least larger than the down_period"} + assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == + {:error, "permdown_period must be at least larger than the down_period"} end end diff --git a/test/support/node_case.ex b/test/support/node_case.ex index a73b142a8..4063d9116 100644 --- a/test/support/node_case.ex +++ b/test/support/node_case.ex @@ -46,43 +46,60 @@ defmodule Phoenix.PubSub.NodeCase do :ok = Phoenix.PubSub.subscribe(@pubsub, topic) end - def subscribe_to_tracker(tracker) do - :ok = Phoenix.PubSub.subscribe(@pubsub, namespaced_topic(tracker)) + def subscribe_to_server(server) do + :ok = Phoenix.PubSub.subscribe(@pubsub, namespaced_topic(server)) end - defp namespaced_topic(tracker), do: "phx_presence:#{tracker}" + defp namespaced_topic(server), do: "phx_presence:#{server}" - def start_tracker(node_name, opts) do + def shard_name(server), do: :"#{server}_shard0" + + def start_shard(node_name, opts) do opts = Keyword.put_new(opts, :report_events_to, self()) - call_node(node_name, fn -> start_tracker(opts) end) + call_node(node_name, fn -> start_shard(opts) end) end - def graceful_permdown(node_name, tracker) do - call_node(node_name, fn -> Phoenix.Tracker.graceful_permdown(tracker) end) + def graceful_permdown(node_name, server) do + call_node(node_name, + fn -> Phoenix.Tracker.Shard.graceful_permdown(server) end) end - def drop_gossips(tracker) do - :ok = GenServer.call(tracker, :unsubscribe) + def drop_gossips(server) do + :ok = GenServer.call(server, :unsubscribe) end - def resume_gossips(tracker) do - :ok = GenServer.call(tracker, :resubscribe) + def resume_gossips(server) do + :ok = GenServer.call(server, :resubscribe) end - def start_tracker(opts) do - opts = Keyword.put_new(opts, :report_events_to, self()) - opts = Keyword.merge([ + def start_shard(opts) do + opts = Keyword.merge(default_tracker_opts(), + Keyword.put_new(opts, :report_events_to, self())) + Phoenix.Tracker.Shard.start_link(TestTracker, opts, opts) + end + + def start_pool(opts) do + opts = Keyword.merge(default_pool_opts(), opts) + Phoenix.Tracker.start_link(TestTracker, opts, opts) + end + + defp default_pool_opts do + Keyword.merge([shard_number: 0], default_tracker_opts()) + end + + defp default_tracker_opts do + [ pubsub_server: @pubsub, broadcast_period: @heartbeat, max_silent_periods: 2, permdown_period: @permdown, - ], opts) - Phoenix.Tracker.start_link(TestTracker, opts, opts) + shard_number: 0, + ] end - def track_presence(node_name, tracker, pid, topic, user_id, meta) do + def track_presence(node_name, server, pid, topic, user_id, meta) do call_node(node_name, fn -> - Phoenix.Tracker.track(tracker, pid, topic, user_id, meta) + Phoenix.Tracker.Shard.track(server, pid, topic, user_id, meta) end) end @@ -92,8 +109,10 @@ defmodule Phoenix.PubSub.NodeCase do end) end - def spy_on_tracker(node_name, server \\ @pubsub, target_pid, tracker) do - spy_on_pubsub(node_name, server, target_pid, "phx_presence:#{tracker}") + def spy_on_server(node_name, pubsub_server \\ @pubsub, + target_pid, tracker_server) do + spy_on_pubsub(node_name, pubsub_server, target_pid, + "phx_presence:#{tracker_server}") end def spy_on_pubsub(node_name, server \\ @pubsub, target_pid, topic) do