Skip to content

Commit

Permalink
Tracker consists of shard-pool; exposes dirty_list operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Studnicki authored and Simon Zelazny committed Jul 4, 2017
1 parent 1ce6cb8 commit 1c9ab71
Show file tree
Hide file tree
Showing 11 changed files with 1,231 additions and 939 deletions.
542 changes: 89 additions & 453 deletions lib/phoenix/tracker.ex

Large diffs are not rendered by default.

497 changes: 497 additions & 0 deletions lib/phoenix/tracker/shard.ex

Large diffs are not rendered by default.

25 changes: 18 additions & 7 deletions lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Phoenix.Tracker.State do
@type context :: %{name => clock}
@type values :: ets_id | :extracted | %{tag => {pid, topic, key, meta}}
@type value :: {{topic, pid, key}, meta, tag}
@type key_meta :: {key, meta}
@type delta :: %State{mode: :delta}
@type pid_lookup :: {pid, topic, key}

Expand Down Expand Up @@ -50,13 +51,13 @@ defmodule Phoenix.Tracker.State do
%Phoenix.Tracker.State{...}
"""
@spec new(name) :: t
def new(replica) do
@spec new(name, atom) :: t
def new(replica, shard_name) do
reset_delta(%State{
replica: replica,
context: %{replica => 0},
mode: :normal,
values: :ets.new(:values, [:ordered_set]),
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
pids: :ets.new(:pids, [:duplicate_bag]),
replicas: %{replica => :up}})
end
Expand Down Expand Up @@ -113,12 +114,22 @@ defmodule Phoenix.Tracker.State do
@doc """
Returns a list of elements for the topic who belong to an online replica.
"""
@spec get_by_topic(t, topic) :: [value]
@spec get_by_topic(t, topic) :: [key_meta]
def get_by_topic(%State{values: values} = state, topic) do
replicas = down_replicas(state)
:ets.select(values, [{ {{topic, :_, :_}, :_, {:"$1", :_}},
not_in(:"$1", replicas), [:"$_"]}])
tracked_values(values, topic, down_replicas(state))
end

@doc """
Performs table lookup for tracked elements in the topic, filtering out
those present on downed replicas.
"""
def tracked_values(table, topic, down_replicas) do
:ets.select(table,
[{{{topic, :_, :"$1"}, :"$2", {:"$3", :_}},
not_in(:"$3", down_replicas),
[{{:"$1", :"$2"}}]}])
end

defp not_in(_pos, []), do: []
defp not_in(pos, replicas), do: [not: ors(pos, replicas)]
defp ors(pos, [rep]), do: {:"==", pos, {rep}}
Expand Down
18 changes: 10 additions & 8 deletions test/phoenix/pubsub/pg2_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule Phoenix.PubSub.PG2Test do
@node1 :"[email protected]"
@node2 :"[email protected]"

@receive_timeout 500

setup config do
size = config[:pool_size] || 1
if config[:pool_size] do
Expand All @@ -29,15 +31,15 @@ defmodule Phoenix.PubSub.PG2Test do

PubSub.subscribe(config.pubsub, config.topic)
:ok = PubSub.direct_broadcast(@node1, config.pubsub, config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout
:ok = PubSub.direct_broadcast!(@node1, config.pubsub, config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast(@node2, config.pubsub, config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast!(@node2, config.pubsub, config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout
end

@tag pool_size: size, topic: topic
Expand All @@ -46,15 +48,15 @@ defmodule Phoenix.PubSub.PG2Test do

PubSub.subscribe(config.pubsub, config.topic)
:ok = PubSub.direct_broadcast_from(@node1, config.pubsub, self(), config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout
:ok = PubSub.direct_broadcast_from!(@node1, config.pubsub, self(), config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast_from(@node2, config.pubsub, self(), config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast_from!(@node2, config.pubsub, self(), config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout
end
end

Expand Down
24 changes: 12 additions & 12 deletions test/phoenix/tracker/delta_generation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
|> Enum.sort()
end

defp new(node) do
State.new(node)
defp new(node, config) do
State.new(node, :"#{node} #{config.test}")
end

defp new_pid() do
Expand All @@ -23,9 +23,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
|> Enum.sort()
end

test "generations" do
s1 = new(:s1)
s2 = new(:s2)
test "generations", config do
s1 = new(:s1, config)
s2 = new(:s2, config)
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
assert [gen1, gen1, gen1] = gens = push(s1, [], s1.delta, [2, 5, 6])
assert keys(gen1) == ["user1"]
Expand Down Expand Up @@ -86,9 +86,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
assert sorted_clouds(gen3.clouds) == [{:s1, 3}, {:s1, 4}, {:s2, 1}, {:s2, 2}]
end

test "does not include non-contiguous deltas" do
s1 = new(:s1)
s3 = new(:s3)
test "does not include non-contiguous deltas", config do
s1 = new(:s1, config)
s3 = new(:s3, config)
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
old_s3 = s3 = State.join(s3, new_pid(), "lobby", "user3", %{})
s3 = State.reset_delta(s3)
Expand All @@ -99,10 +99,10 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
assert [^gen1, ^gen1, ^gen1] = push(s1, gens, s3.delta, [5, 10, 15])
end

test "remove_down_replicas" do
s1 = new(:s1)
s2 = new(:s2)
s3 = new(:s3)
test "remove_down_replicas", config do
s1 = new(:s1, config)
s2 = new(:s2, config)
s3 = new(:s3, config)
s2 = State.join(s2, new_pid(), "lobby", "user2", %{})
assert [gen1, gen1, gen1] = gens = push(s1, [], s2.delta, [5, 10, 15])
assert [pruned_gen1, pruned_gen1, pruned_gen1] = DeltaGeneration.remove_down_replicas(gens, :s2)
Expand Down
Loading

0 comments on commit 1c9ab71

Please sign in to comment.