From 782022da43157946a2e5c3a62ff01230ece26f74 Mon Sep 17 00:00:00 2001 From: Leandro Pereira Date: Wed, 3 Apr 2024 17:14:53 -0400 Subject: [PATCH 1/3] Improve cluster communcation Register sites using :pg on init and rely on its state to find nodes Fix issues on net split --- lib/beacon/live_admin/application.ex | 2 - lib/beacon/live_admin/cluster.ex | 168 ++++++++++-------------- lib/beacon/live_admin/live/home_live.ex | 2 - lib/beacon/live_admin/page_live.ex | 3 - mix.lock | 2 +- test/beacon/live_admin/cluster_test.exs | 20 ++- test/support/cluster.ex | 2 - test/test_helper.exs | 7 +- 8 files changed, 80 insertions(+), 126 deletions(-) diff --git a/lib/beacon/live_admin/application.ex b/lib/beacon/live_admin/application.ex index 1bbc182d..a89e4711 100644 --- a/lib/beacon/live_admin/application.ex +++ b/lib/beacon/live_admin/application.ex @@ -10,8 +10,6 @@ defmodule Beacon.LiveAdmin.Application do Beacon.LiveAdmin.Cluster ] - :ets.new(:beacon_live_admin_sites, [:set, :named_table, :public, read_concurrency: true]) - opts = [strategy: :one_for_one, name: Beacon.LiveAdmin.Supervisor] Supervisor.start_link(children, opts) end diff --git a/lib/beacon/live_admin/cluster.ex b/lib/beacon/live_admin/cluster.ex index db66af2c..d1ffb9a0 100644 --- a/lib/beacon/live_admin/cluster.ex +++ b/lib/beacon/live_admin/cluster.ex @@ -1,43 +1,39 @@ defmodule Beacon.LiveAdmin.Cluster do + @moduledoc """ + Cluster management. Discover all sites running in the cluster and executes functions globally. + """ + use GenServer require Logger - alias Beacon.LiveAdmin.PubSub - @name __MODULE__ - @one_minute :timer.minutes(1) - @ets_table :beacon_live_admin_sites + @scope :beacon_cluster + @remote_call_retries 3 + @doc false def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: @name) + GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + @doc false @impl true def init(opts) do + {:ok, _} = :pg.start_link(@scope) + :pg.monitor_scope(@scope) :ok = :net_kernel.monitor_nodes(true, node_type: :all) {:ok, opts} end - def discover_sites do - GenServer.call(@name, :discover_sites, @one_minute) - end - - def maybe_discover_sites() do - case running_sites() do - [] -> discover_sites() - _ -> :skip - end - end - @doc false - def nodes do - [Node.self()] ++ Node.list() - end + def nodes, do: [Node.self()] ++ Node.list() - @doc false + @doc """ + Returns a list of sites running in all connected nodes in the cluster. + + It doesn't try to refresh the list of running sites, + it only returns the results cached when calling `discover_sites/0`. + """ def running_sites do - @ets_table - |> :ets.match({:"$1", :_}) - |> List.flatten() + :pg.which_groups(@scope) end @doc """ @@ -47,33 +43,55 @@ defmodule Beacon.LiveAdmin.Cluster do ## Examples - iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :reload_site, [:my_site]) + iex> Beacon.LiveAdmin.Cluster.call(:my_site, Beacon, :boot, [:my_site]) :ok """ + @spec call(atom(), module(), fun :: (-> any()), [any()]) :: any() def call(site, mod, fun, args) when is_binary(site) do - site = String.to_existing_atom(site) - call(site, mod, fun, args) + site + |> String.to_existing_atom() + |> call(mod, fun, args) end def call(site, mod, fun, args) when is_atom(site) and is_atom(mod) and is_atom(fun) and is_list(args) do - case find_node(site) do - nil -> - message = "no running node found for site #{inspect(site)}" - raise Beacon.LiveAdmin.ClusterError, message: message + id = Module.concat([mod, fun]) + nodes = find_nodes(site) + + if nodes == [] do + message = + "no nodes available to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}" + + Logger.debug(message) + raise Beacon.LiveAdmin.ClusterError, message: message + end - node -> - do_call(node, mod, fun, args) + :global.trans( + {id, self()}, + fn -> + node = pick_node(nodes) + do_call(site, node, mod, fun, args) + end, + nodes, + @remote_call_retries + ) + end + + defp do_call(site, node, mod, fun, args) do + if node == Node.self() do + apply(mod, fun, args) + else + :erpc.call(node, mod, fun, args) end rescue exception -> Logger.debug( - "failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)}" + "failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)}" ) message = """ - failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} + failed to call #{Exception.format_mfa(mod, fun, args)} for site #{inspect(site)} on node #{inspect(node)} Got: @@ -84,86 +102,38 @@ defmodule Beacon.LiveAdmin.Cluster do reraise Beacon.LiveAdmin.ClusterError, [message: message], __STACKTRACE__ end - defp do_call(node, mod, fun, args) do - if node == Node.self() do - apply(mod, fun, args) - else - :erpc.call(node, mod, fun, args) - end + @doc false + def find_nodes(site) when is_atom(site) do + Enum.map(:pg.get_members(@scope, site), &GenServer.call(&1, :current_node)) end - if Code.ensure_loaded?(Mix.Project) and Mix.env() == :test do - defp find_node(site) when is_atom(site) do - case :ets.match(@ets_table, {site, :"$1"}) do - [[nodes]] -> List.first(nodes) - _ -> nil - end - end - else - defp find_node(site) when is_atom(site) do - case :ets.match(@ets_table, {site, :"$1"}) do - # TODO: load balance and retry - [[nodes]] -> Enum.random(nodes) - _ -> nil - end - end + @doc false + defp pick_node(nodes) do + Enum.random(nodes) end ## Callbacks + @doc false @impl true - def handle_call(:discover_sites, _from, state) do - sites = do_discover_sites() - {:reply, sites, state} - end - - defp do_discover_sites do - # TODO: add or remove nodes from ets state when nodes changes instead of recreating everything - :ets.delete_all_objects(@ets_table) - - sites = - nodes() - |> Map.new(fn node -> - try do - sites = :erpc.call(node, Beacon.Registry, :running_sites, [], :timer.seconds(10)) - {node, sites} - rescue - _exception -> - {node, []} - end - end) - |> group_sites() - |> Map.new(fn site -> - true = :ets.insert(@ets_table, site) - site - end) - - PubSub.notify_sites_changed(__MODULE__) - - sites + def handle_info({:nodeup, _, _}, state) do + Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__) + {:noreply, state} end @doc false - def group_sites(mapping) do - Enum.reduce(mapping, %{}, fn {node, sites}, acc -> - new = :maps.from_list(:lists.map(&{&1, [node]}, sites)) - - Map.merge(acc, new, fn _k, v1, v2 -> - Enum.dedup(v1 ++ v2) - end) - end) + def handle_info({:nodedown, _, _}, state) do + Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__) + {:noreply, state} end - @doc false - @impl true - def handle_info({:nodeup, _, _}, state) do - do_discover_sites() + def handle_info({_ref, :join, _site, _members}, state) do + Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__) {:noreply, state} end - @doc false - def handle_info({:nodedown, _, _}, state) do - do_discover_sites() + def handle_info({_ref, :leave, _site, _members}, state) do + Beacon.LiveAdmin.PubSub.notify_sites_changed(__MODULE__) {:noreply, state} end end diff --git a/lib/beacon/live_admin/live/home_live.ex b/lib/beacon/live_admin/live/home_live.ex index 17affa9b..ab259875 100644 --- a/lib/beacon/live_admin/live/home_live.ex +++ b/lib/beacon/live_admin/live/home_live.ex @@ -11,8 +11,6 @@ defmodule Beacon.LiveAdmin.HomeLive do PubSub.subscribe() end - Cluster.maybe_discover_sites() - {:ok, assign(socket, :running_sites, Cluster.running_sites())} end diff --git a/lib/beacon/live_admin/page_live.ex b/lib/beacon/live_admin/page_live.ex index 3c15cc6b..333bc979 100644 --- a/lib/beacon/live_admin/page_live.ex +++ b/lib/beacon/live_admin/page_live.ex @@ -8,7 +8,6 @@ defmodule Beacon.LiveAdmin.PageLive do use Beacon.LiveAdmin.Web, :live_view require Logger - alias Beacon.LiveAdmin.Cluster alias Beacon.LiveAdmin.PageBuilder.Menu alias Beacon.LiveAdmin.PageBuilder.Page alias Beacon.LiveAdmin.PageBuilder.Table @@ -24,8 +23,6 @@ defmodule Beacon.LiveAdmin.PageLive do # TODO: nodedow -> notify/alert user end - Cluster.maybe_discover_sites() - sites = Beacon.LiveAdmin.Cluster.running_sites() %{"pages" => pages} = session diff --git a/mix.lock b/mix.lock index 0bafa435..8fe59c32 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "accent": {:hex, :accent, "1.1.1", "20257356446d45078b19b91608f74669b407b39af891ee3db9ee6824d1cae19d", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.3", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6d5afa50d4886e3370e04fa501468cbaa6c4b5fe926f72ccfa844ad9e259adae"}, - "beacon": {:git, "https://github.com/beaconCMS/beacon.git", "233893f8084e986a2a714d4cb43692bbb8117ea6", []}, + "beacon": {:git, "https://github.com/beaconCMS/beacon.git", "963491986df5988a180e6fa81f705f25428dd377", []}, "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.9", "e8d3364f310da6ce6463c3dd20cf90ae7bbecbf6c5203b98bf9b48035592649b", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "9dcab3d0f3038621f1601f13539e7a9ee99843862e66ad62827b0c42b2f58a54"}, "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, diff --git a/test/beacon/live_admin/cluster_test.exs b/test/beacon/live_admin/cluster_test.exs index 5e847e6a..a4c81202 100644 --- a/test/beacon/live_admin/cluster_test.exs +++ b/test/beacon/live_admin/cluster_test.exs @@ -3,17 +3,15 @@ defmodule Beacon.LiveAdmin.ClusterTest do alias Beacon.LiveAdmin.Cluster - test "group_sites" do - mapping = %{ - node_a: [:site_1, :site_2], - node_b: [], - node_c: [:site_1, :site_3] - } + test "running_sites" do + assert Enum.sort(Cluster.running_sites()) == [:site_a, :site_b, :site_c] + end + + test "find nodes" do + assert Enum.sort(Cluster.find_nodes(:site_a)) == [:"node1@127.0.0.1"] + end - assert Cluster.group_sites(mapping) == %{ - site_1: [:node_a, :node_c], - site_2: [:node_a], - site_3: [:node_c] - } + test "call function on remote node" do + assert Cluster.call(:site_a, Node, :self, []) == :"node1@127.0.0.1" end end diff --git a/test/support/cluster.ex b/test/support/cluster.ex index 0d622b6d..416c8f9b 100644 --- a/test/support/cluster.ex +++ b/test/support/cluster.ex @@ -133,7 +133,5 @@ defmodule Beacon.LiveAdminTest.Cluster do # rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :down, [all: true]]) rpc(node, Ecto.Migrator, :run, [Beacon.Repo, :up, [all: true]]) - - Beacon.LiveAdmin.Cluster.discover_sites() end end diff --git a/test/test_helper.exs b/test/test_helper.exs index dc33db5c..d8743229 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -92,11 +92,6 @@ Beacon.LiveAdminTest.Cluster.start_beacon(:"node1@127.0.0.1", extra_page_fields: [ MyApp.PageField.Type ] - ], - [ - site: :site_b, - skip_boot?: true, - endpoint: MyApp.Endpoint ] ] ) @@ -104,7 +99,7 @@ Beacon.LiveAdminTest.Cluster.start_beacon(:"node1@127.0.0.1", Beacon.LiveAdminTest.Cluster.start_beacon(:"node2@127.0.0.1", sites: [ [ - site: :site_a, + site: :site_b, skip_boot?: true, endpoint: MyApp.Endpoint, authorization_source: MyApp.AuthorizationSource From 040a9fb6f0bd681ac8b408535a58ed9fce84bc21 Mon Sep 17 00:00:00 2001 From: Leandro Pereira Date: Wed, 3 Apr 2024 17:18:01 -0400 Subject: [PATCH 2/3] do not block nodes --- lib/beacon/live_admin/cluster.ex | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/lib/beacon/live_admin/cluster.ex b/lib/beacon/live_admin/cluster.ex index d1ffb9a0..f98d8899 100644 --- a/lib/beacon/live_admin/cluster.ex +++ b/lib/beacon/live_admin/cluster.ex @@ -7,7 +7,6 @@ defmodule Beacon.LiveAdmin.Cluster do require Logger @scope :beacon_cluster - @remote_call_retries 3 @doc false def start_link(opts) do @@ -56,7 +55,6 @@ defmodule Beacon.LiveAdmin.Cluster do def call(site, mod, fun, args) when is_atom(site) and is_atom(mod) and is_atom(fun) and is_list(args) do - id = Module.concat([mod, fun]) nodes = find_nodes(site) if nodes == [] do @@ -67,15 +65,8 @@ defmodule Beacon.LiveAdmin.Cluster do raise Beacon.LiveAdmin.ClusterError, message: message end - :global.trans( - {id, self()}, - fn -> - node = pick_node(nodes) - do_call(site, node, mod, fun, args) - end, - nodes, - @remote_call_retries - ) + node = pick_node(nodes) + do_call(site, node, mod, fun, args) end defp do_call(site, node, mod, fun, args) do From ec402072cc99dc2962b7ebf3ab55da78fa79eb65 Mon Sep 17 00:00:00 2001 From: Leandro Pereira Date: Wed, 3 Apr 2024 17:23:26 -0400 Subject: [PATCH 3/3] bump min required OTP version due to :pg.monitor_scope/1 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6da7bfaf..ecbe6dfa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: matrix: include: # minimum required versions - - otp: "23" + - otp: "25.1" elixir: "1.13.0" phoenix-version: "1.7.0" phoenix-live-view-version: "0.20.2"