Skip to content

Commit

Permalink
Improve cluster communcation
Browse files Browse the repository at this point in the history
Register sites using :pg on init and rely on its state to find nodes

Fix issues on net split
  • Loading branch information
leandrocp committed Apr 3, 2024
1 parent 956801d commit 782022d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 126 deletions.
2 changes: 0 additions & 2 deletions lib/beacon/live_admin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ defmodule Beacon.LiveAdmin.Application do
Beacon.LiveAdmin.Cluster
]

:ets.new(:beacon_live_admin_sites, [:set, :named_table, :public, read_concurrency: true])

opts = [strategy: :one_for_one, name: Beacon.LiveAdmin.Supervisor]
Supervisor.start_link(children, opts)
end
Expand Down
168 changes: 69 additions & 99 deletions lib/beacon/live_admin/cluster.ex
Original file line number Diff line number Diff line change
@@ -1,43 +1,39 @@
defmodule Beacon.LiveAdmin.Cluster do
@moduledoc """
Cluster management. Discover all sites running in the cluster and executes functions globally.
"""

use GenServer
require Logger
alias Beacon.LiveAdmin.PubSub

@name __MODULE__
@one_minute :timer.minutes(1)
@ets_table :beacon_live_admin_sites
@scope :beacon_cluster
@remote_call_retries 3

@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@doc false
@impl true
def init(opts) do
{:ok, _} = :pg.start_link(@scope)
:pg.monitor_scope(@scope)

Check warning on line 21 in lib/beacon/live_admin/cluster.ex

View workflow job for this annotation

GitHub Actions / test: OTP 23 | Elixir 1.13.0 | Phoenix 1.7.0 | LiveView 0.20.2

:pg.monitor_scope/1 is undefined or private
:ok = :net_kernel.monitor_nodes(true, node_type: :all)
{:ok, opts}
end

def discover_sites do
GenServer.call(@name, :discover_sites, @one_minute)
end

def maybe_discover_sites() do
case running_sites() do
[] -> discover_sites()
_ -> :skip
end
end

@doc false
def nodes do
[Node.self()] ++ Node.list()
end
def nodes, do: [Node.self()] ++ Node.list()

@doc false
@doc """
Returns a list of sites running in all connected nodes in the cluster.
It doesn't try to refresh the list of running sites,
it only returns the results cached when calling `discover_sites/0`.
"""
def running_sites do
@ets_table
|> :ets.match({:"$1", :_})
|> List.flatten()
:pg.which_groups(@scope)
end

@doc """
Expand All @@ -47,33 +43,55 @@ defmodule Beacon.LiveAdmin.Cluster do
## Examples
iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :reload_site, [:my_site])
iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :boot, [:my_site])
:ok
"""
@spec call(atom(), module(), fun :: (-> any()), [any()]) :: any()
def call(site, mod, fun, args) when is_binary(site) do
site = String.to_existing_atom(site)
call(site, mod, fun, args)
site
|> String.to_existing_atom()
|> call(mod, fun, args)
end

def call(site, mod, fun, args)
when is_atom(site) and is_atom(mod) and is_atom(fun) and is_list(args) do
case find_node(site) do
nil ->
message = "no running node found for site #{inspect(site)}"
raise Beacon.LiveAdmin.ClusterError, message: message
id = Module.concat([mod, fun])
nodes = find_nodes(site)

if nodes == [] do
message =
"no nodes available to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}"

Logger.debug(message)
raise Beacon.LiveAdmin.ClusterError, message: message
end

node ->
do_call(node, mod, fun, args)
:global.trans(
{id, self()},
fn ->
node = pick_node(nodes)
do_call(site, node, mod, fun, args)
end,
nodes,
@remote_call_retries
)
end

defp do_call(site, node, mod, fun, args) do
if node == Node.self() do
apply(mod, fun, args)
else
:erpc.call(node, mod, fun, args)
end
rescue
exception ->
Logger.debug(
"failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}"
"failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)}"
)

message = """
failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}
failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)}
Got:
Expand All @@ -84,86 +102,38 @@ defmodule Beacon.LiveAdmin.Cluster do
reraise Beacon.LiveAdmin.ClusterError, [message: message], __STACKTRACE__
end

defp do_call(node, mod, fun, args) do
if node == Node.self() do
apply(mod, fun, args)
else
:erpc.call(node, mod, fun, args)
end
@doc false
def find_nodes(site) when is_atom(site) do
Enum.map(:pg.get_members(@scope, site), &GenServer.call(&1, :current_node))
end

if Code.ensure_loaded?(Mix.Project) and Mix.env() == :test do
defp find_node(site) when is_atom(site) do
case :ets.match(@ets_table, {site, :"$1"}) do
[[nodes]] -> List.first(nodes)
_ -> nil
end
end
else
defp find_node(site) when is_atom(site) do
case :ets.match(@ets_table, {site, :"$1"}) do
# TODO: load balance and retry
[[nodes]] -> Enum.random(nodes)
_ -> nil
end
end
@doc false
defp pick_node(nodes) do
Enum.random(nodes)
end

## Callbacks

@doc false
@impl true
def handle_call(:discover_sites, _from, state) do
sites = do_discover_sites()
{:reply, sites, state}
end

defp do_discover_sites do
# TODO: add or remove nodes from ets state when nodes changes instead of recreating everything
:ets.delete_all_objects(@ets_table)

sites =
nodes()
|> Map.new(fn node ->
try do
sites = :erpc.call(node, Beacon.Registry, :running_sites, [], :timer.seconds(10))
{node, sites}
rescue
_exception ->
{node, []}
end
end)
|> group_sites()
|> Map.new(fn site ->
true = :ets.insert(@ets_table, site)
site
end)

PubSub.notify_sites_changed(__MODULE__)

sites
def handle_info({:nodeup, _, _}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
def group_sites(mapping) do
Enum.reduce(mapping, %{}, fn {node, sites}, acc ->
new = :maps.from_list(:lists.map(&{&1, [node]}, sites))

Map.merge(acc, new, fn _k, v1, v2 ->
Enum.dedup(v1 ++ v2)
end)
end)
def handle_info({:nodedown, _, _}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
@impl true
def handle_info({:nodeup, _, _}, state) do
do_discover_sites()
def handle_info({_ref, :join, _site, _members}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
def handle_info({:nodedown, _, _}, state) do
do_discover_sites()
def handle_info({_ref, :leave, _site, _members}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end
end
2 changes: 0 additions & 2 deletions lib/beacon/live_admin/live/home_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Beacon.LiveAdmin.HomeLive do
PubSub.subscribe()
end

Cluster.maybe_discover_sites()

{:ok, assign(socket, :running_sites, Cluster.running_sites())}
end

Expand Down
3 changes: 0 additions & 3 deletions lib/beacon/live_admin/page_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Beacon.LiveAdmin.PageLive do

use Beacon.LiveAdmin.Web, :live_view
require Logger
alias Beacon.LiveAdmin.Cluster
alias Beacon.LiveAdmin.PageBuilder.Menu
alias Beacon.LiveAdmin.PageBuilder.Page
alias Beacon.LiveAdmin.PageBuilder.Table
Expand All @@ -24,8 +23,6 @@ defmodule Beacon.LiveAdmin.PageLive do
# TODO: nodedow -> notify/alert user
end

Cluster.maybe_discover_sites()

sites = Beacon.LiveAdmin.Cluster.running_sites()

%{"pages" => pages} = session
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{
"accent": {:hex, :accent, "1.1.1", "20257356446d45078b19b91608f74669b407b39af891ee3db9ee6824d1cae19d", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.3", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6d5afa50d4886e3370e04fa501468cbaa6c4b5fe926f72ccfa844ad9e259adae"},
"beacon": {:git, "https://github.com/beaconCMS/beacon.git", "233893f8084e986a2a714d4cb43692bbb8117ea6", []},
"beacon": {:git, "https://github.com/beaconCMS/beacon.git", "963491986df5988a180e6fa81f705f25428dd377", []},
"castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
"cc_precompiler": {:hex, :cc_precompiler, "0.1.9", "e8d3364f310da6ce6463c3dd20cf90ae7bbecbf6c5203b98bf9b48035592649b", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "9dcab3d0f3038621f1601f13539e7a9ee99843862e66ad62827b0c42b2f58a54"},
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
Expand Down
20 changes: 9 additions & 11 deletions test/beacon/live_admin/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ defmodule Beacon.LiveAdmin.ClusterTest do

alias Beacon.LiveAdmin.Cluster

test "group_sites" do
mapping = %{
node_a: [:site_1, :site_2],
node_b: [],
node_c: [:site_1, :site_3]
}
test "running_sites" do
assert Enum.sort(Cluster.running_sites()) == [:site_a, :site_b, :site_c]
end

test "find nodes" do
assert Enum.sort(Cluster.find_nodes(:site_a)) == [:"[email protected]"]
end

assert Cluster.group_sites(mapping) == %{
site_1: [:node_a, :node_c],
site_2: [:node_a],
site_3: [:node_c]
}
test "call function on remote node" do
assert Cluster.call(:site_a, Node, :self, []) == :"[email protected]"
end
end
2 changes: 0 additions & 2 deletions test/support/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,5 @@ defmodule Beacon.LiveAdminTest.Cluster do

# rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :down, [all: true]])
rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :up, [all: true]])

Beacon.LiveAdmin.Cluster.discover_sites()
end
end
7 changes: 1 addition & 6 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,14 @@ Beacon.LiveAdminTest.Cluster.start_beacon(:"[email protected]",
extra_page_fields: [
MyApp.PageField.Type
]
],
[
site: :site_b,
skip_boot?: true,
endpoint: MyApp.Endpoint
]
]
)

Beacon.LiveAdminTest.Cluster.start_beacon(:"[email protected]",
sites: [
[
site: :site_a,
site: :site_b,
skip_boot?: true,
endpoint: MyApp.Endpoint,
authorization_source: MyApp.AuthorizationSource
Expand Down

0 comments on commit 782022d

Please sign in to comment.