Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cluster communcation #112

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
include:
# minimum required versions
- otp: "23"
- otp: "25.1"
elixir: "1.13.0"
phoenix-version: "1.7.0"
phoenix-live-view-version: "0.20.2"
Expand Down
2 changes: 0 additions & 2 deletions lib/beacon/live_admin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ defmodule Beacon.LiveAdmin.Application do
Beacon.LiveAdmin.Cluster
]

:ets.new(:beacon_live_admin_sites, [:set, :named_table, :public, read_concurrency: true])

opts = [strategy: :one_for_one, name: Beacon.LiveAdmin.Supervisor]
Supervisor.start_link(children, opts)
end
Expand Down
159 changes: 60 additions & 99 deletions lib/beacon/live_admin/cluster.ex
Original file line number Diff line number Diff line change
@@ -1,43 +1,38 @@
defmodule Beacon.LiveAdmin.Cluster do
@moduledoc """
Cluster management. Discover all sites running in the cluster and executes functions globally.
"""

use GenServer
require Logger
alias Beacon.LiveAdmin.PubSub

@name __MODULE__
@one_minute :timer.minutes(1)
@ets_table :beacon_live_admin_sites
@scope :beacon_cluster

@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@doc false
@impl true
def init(opts) do
{:ok, _} = :pg.start_link(@scope)
:pg.monitor_scope(@scope)
:ok = :net_kernel.monitor_nodes(true, node_type: :all)
{:ok, opts}
end

def discover_sites do
GenServer.call(@name, :discover_sites, @one_minute)
end

def maybe_discover_sites() do
case running_sites() do
[] -> discover_sites()
_ -> :skip
end
end

@doc false
def nodes do
[Node.self()] ++ Node.list()
end
def nodes, do: [Node.self()] ++ Node.list()

@doc false
@doc """
Returns a list of sites running in all connected nodes in the cluster.

It doesn't try to refresh the list of running sites,
it only returns the results cached when calling `discover_sites/0`.
"""
def running_sites do
@ets_table
|> :ets.match({:"$1", :_})
|> List.flatten()
:pg.which_groups(@scope)
end

@doc """
Expand All @@ -47,33 +42,47 @@ defmodule Beacon.LiveAdmin.Cluster do

## Examples

iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :reload_site, [:my_site])
iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :boot, [:my_site])
:ok

"""
@spec call(atom(), module(), fun :: (-> any()), [any()]) :: any()
def call(site, mod, fun, args) when is_binary(site) do
site = String.to_existing_atom(site)
call(site, mod, fun, args)
site
|> String.to_existing_atom()
|> call(mod, fun, args)
end

def call(site, mod, fun, args)
when is_atom(site) and is_atom(mod) and is_atom(fun) and is_list(args) do
case find_node(site) do
nil ->
message = "no running node found for site #{inspect(site)}"
raise Beacon.LiveAdmin.ClusterError, message: message
nodes = find_nodes(site)

if nodes == [] do
message =
"no nodes available to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}"

Logger.debug(message)
raise Beacon.LiveAdmin.ClusterError, message: message
end

node ->
do_call(node, mod, fun, args)
node = pick_node(nodes)
do_call(site, node, mod, fun, args)
end

defp do_call(site, node, mod, fun, args) do
if node == Node.self() do
apply(mod, fun, args)
else
:erpc.call(node, mod, fun, args)
end
rescue
exception ->
Logger.debug(
"failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}"
"failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)}"
)

message = """
failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}
failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)}

Got:

Expand All @@ -84,86 +93,38 @@ defmodule Beacon.LiveAdmin.Cluster do
reraise Beacon.LiveAdmin.ClusterError, [message: message], __STACKTRACE__
end

defp do_call(node, mod, fun, args) do
if node == Node.self() do
apply(mod, fun, args)
else
:erpc.call(node, mod, fun, args)
end
@doc false
def find_nodes(site) when is_atom(site) do
Enum.map(:pg.get_members(@scope, site), &GenServer.call(&1, :current_node))
end

if Code.ensure_loaded?(Mix.Project) and Mix.env() == :test do
defp find_node(site) when is_atom(site) do
case :ets.match(@ets_table, {site, :"$1"}) do
[[nodes]] -> List.first(nodes)
_ -> nil
end
end
else
defp find_node(site) when is_atom(site) do
case :ets.match(@ets_table, {site, :"$1"}) do
# TODO: load balance and retry
[[nodes]] -> Enum.random(nodes)
_ -> nil
end
end
@doc false
defp pick_node(nodes) do
Enum.random(nodes)
end

## Callbacks

@doc false
@impl true
def handle_call(:discover_sites, _from, state) do
sites = do_discover_sites()
{:reply, sites, state}
end

defp do_discover_sites do
# TODO: add or remove nodes from ets state when nodes changes instead of recreating everything
:ets.delete_all_objects(@ets_table)

sites =
nodes()
|> Map.new(fn node ->
try do
sites = :erpc.call(node, Beacon.Registry, :running_sites, [], :timer.seconds(10))
{node, sites}
rescue
_exception ->
{node, []}
end
end)
|> group_sites()
|> Map.new(fn site ->
true = :ets.insert(@ets_table, site)
site
end)

PubSub.notify_sites_changed(__MODULE__)

sites
def handle_info({:nodeup, _, _}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
def group_sites(mapping) do
Enum.reduce(mapping, %{}, fn {node, sites}, acc ->
new = :maps.from_list(:lists.map(&{&1, [node]}, sites))

Map.merge(acc, new, fn _k, v1, v2 ->
Enum.dedup(v1 ++ v2)
end)
end)
def handle_info({:nodedown, _, _}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
@impl true
def handle_info({:nodeup, _, _}, state) do
do_discover_sites()
def handle_info({_ref, :join, _site, _members}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end

@doc false
def handle_info({:nodedown, _, _}, state) do
do_discover_sites()
def handle_info({_ref, :leave, _site, _members}, state) do
Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__)
{:noreply, state}
end
end
2 changes: 0 additions & 2 deletions lib/beacon/live_admin/live/home_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Beacon.LiveAdmin.HomeLive do
PubSub.subscribe()
end

Cluster.maybe_discover_sites()

{:ok, assign(socket, :running_sites, Cluster.running_sites())}
end

Expand Down
3 changes: 0 additions & 3 deletions lib/beacon/live_admin/page_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Beacon.LiveAdmin.PageLive do

use Beacon.LiveAdmin.Web, :live_view
require Logger
alias Beacon.LiveAdmin.Cluster
alias Beacon.LiveAdmin.PageBuilder.Menu
alias Beacon.LiveAdmin.PageBuilder.Page
alias Beacon.LiveAdmin.PageBuilder.Table
Expand All @@ -24,8 +23,6 @@ defmodule Beacon.LiveAdmin.PageLive do
# TODO: nodedow -> notify/alert user
end

Cluster.maybe_discover_sites()

sites = Beacon.LiveAdmin.Cluster.running_sites()

%{"pages" => pages} = session
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{
"accent": {:hex, :accent, "1.1.1", "20257356446d45078b19b91608f74669b407b39af891ee3db9ee6824d1cae19d", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.3", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6d5afa50d4886e3370e04fa501468cbaa6c4b5fe926f72ccfa844ad9e259adae"},
"beacon": {:git, "https://github.com/beaconCMS/beacon.git", "233893f8084e986a2a714d4cb43692bbb8117ea6", []},
"beacon": {:git, "https://github.com/beaconCMS/beacon.git", "963491986df5988a180e6fa81f705f25428dd377", []},
"castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
"cc_precompiler": {:hex, :cc_precompiler, "0.1.9", "e8d3364f310da6ce6463c3dd20cf90ae7bbecbf6c5203b98bf9b48035592649b", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "9dcab3d0f3038621f1601f13539e7a9ee99843862e66ad62827b0c42b2f58a54"},
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
Expand Down
20 changes: 9 additions & 11 deletions test/beacon/live_admin/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ defmodule Beacon.LiveAdmin.ClusterTest do

alias Beacon.LiveAdmin.Cluster

test "group_sites" do
mapping = %{
node_a: [:site_1, :site_2],
node_b: [],
node_c: [:site_1, :site_3]
}
test "running_sites" do
assert Enum.sort(Cluster.running_sites()) == [:site_a, :site_b, :site_c]
end

test "find nodes" do
assert Enum.sort(Cluster.find_nodes(:site_a)) == [:"[email protected]"]
end

assert Cluster.group_sites(mapping) == %{
site_1: [:node_a, :node_c],
site_2: [:node_a],
site_3: [:node_c]
}
test "call function on remote node" do
assert Cluster.call(:site_a, Node, :self, []) == :"[email protected]"
end
end
2 changes: 0 additions & 2 deletions test/support/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,5 @@ defmodule Beacon.LiveAdminTest.Cluster do

# rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :down, [all: true]])
rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :up, [all: true]])

Beacon.LiveAdmin.Cluster.discover_sites()
end
end
7 changes: 1 addition & 6 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,14 @@ Beacon.LiveAdminTest.Cluster.start_beacon(:"[email protected]",
extra_page_fields: [
MyApp.PageField.Type
]
],
[
site: :site_b,
skip_boot?: true,
endpoint: MyApp.Endpoint
]
]
)

Beacon.LiveAdminTest.Cluster.start_beacon(:"[email protected]",
sites: [
[
site: :site_a,
site: :site_b,
skip_boot?: true,
endpoint: MyApp.Endpoint,
authorization_source: MyApp.AuthorizationSource
Expand Down
Loading