Skip to content

Commit

Permalink
Ensure coordinators are supervised and pooled
Browse files Browse the repository at this point in the history
This serves two purposes: first, no process is left unsupervised, and second,
solves the potential bottleneck a single 'gen_event' process could become at
handling all the notifications.
  • Loading branch information
NelsonVides committed Dec 31, 2023
1 parent d4a1071 commit fcbdeda
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 141 deletions.
1 change: 1 addition & 0 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ init([]) ->
{ok, {{one_for_one, 5, 10},
[
?CHILD(amoc_users_sup, supervisor),
?CHILD(amoc_coordinator_sup_sup, supervisor),
?CHILD(amoc_controller, worker),
?CHILD(amoc_cluster, worker),
?CHILD(amoc_code_server, worker),
Expand Down
158 changes: 30 additions & 128 deletions src/coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
%%==============================================================================
-module(amoc_coordinator).

-behaviour(gen_event).

%% API
-export([start/3, start/2,
add/2, add/3,
stop/1, reset/1]).

%% gen_event callbacks
-export([init/1,
handle_event/2,
handle_call/2,
terminate/2]).
stop/1, reset/1,
notify/2]).

-define(DEFAULT_TIMEOUT, 30). %% 30 seconds

Expand All @@ -24,7 +17,6 @@
-define(IS_TIMEOUT(Timeout), (?IS_POS_INT(Timeout) orelse Timeout =:= infinity)).

-type name() :: atom().
-type state() :: {worker, pid()} | {timeout, name(), pid()}.

-type coordination_data() :: {pid(), Data :: any()}.

Expand Down Expand Up @@ -52,7 +44,8 @@
%% timeout in seconds
-type coordination_timeout_in_sec() :: pos_integer() | infinity.

-export_type([coordination_event_type/0,
-export_type([name/0,
coordination_event_type/0,
coordination_event/0,
coordination_action/0,
coordination_data/0,
Expand All @@ -72,33 +65,17 @@ start(Name, CoordinationPlan) ->
-spec start(name(), coordination_plan(), coordination_timeout_in_sec()) -> ok | error.
start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
Plan = normalize_coordination_plan(CoordinationPlan),
case gen_event:start({local, Name}) of
case amoc_coordinator_sup_sup:start_coordinator(Name, Plan, Timeout) of
{ok, _} ->
amoc_telemetry:execute([coordinator, start], #{count => 1}, #{name => Name}),
%% according to gen_event documentation:
%%
%% When the event is received, the event manager calls
%% handle_event(Event, State) for each installed event
%% handler, in the same order as they were added.
%%
%% in reality the order is reversed, the last added handler
%% is executed at first. so to ensure that all the items in
%% the plan with NoOfUsers =:= all are executed in the very
%% end, we need to add them first. Also, the timeout handler
%% must be added last, so gen_event triggers it first.
AllItemsHandlers = lists:reverse([Item || {all, _} = Item <- Plan]),
[gen_event:add_handler(Name, ?MODULE, Item) || Item <- AllItemsHandlers],
[gen_event:add_handler(Name, ?MODULE, Item) || {N, _} = Item <- Plan,
is_integer(N)],
gen_event:add_handler(Name, ?MODULE, {timeout, Name, Timeout}),
ok;
{error, _} -> error
end.

%% @doc Stops a coordinator.
-spec stop(name()) -> ok.
stop(Name) ->
gen_event:stop(Name),
amoc_coordinator_sup_sup:stop_coordinator(Name),
amoc_telemetry:execute([coordinator, stop], #{count => 1}, #{name => Name}).

%% @see add/3
Expand All @@ -109,106 +86,43 @@ add(Name, Data) ->
%% @doc Adds the current process data. Usually called in the `start/2' callback of an amoc scenario.
-spec add(name(), pid(), any()) -> ok.
add(Name, Pid, Data) ->
gen_event:notify(Name, {coordinate, {Pid, Data}}).
notify(Name, {coordinate, {Pid, Data}}).

%% @doc Resets a coordinator, that is, calls all coordination actions with `reset' as the coordination data.
-spec reset(name()) -> ok.
reset(Name) ->
gen_event:notify(Name, reset_coordinator).
notify(Name, reset_coordinator).

-spec notify(name(), coordinator_timeout | reset_coordinator | {coordinate, {pid(), term()}}) -> ok.
notify(Name, Event) ->
raise_telemetry_event(Name, Event),
case amoc_coordinator_sup_sup:get_workers(Name) of
{ok, TimeoutWorker, Workers} ->
erlang:send(TimeoutWorker, Event),
[ notify_worker(Worker, Event) || Worker <- Workers ],
ok;
{error, not_found} ->
ok
end.

%%%===================================================================
%%% gen_event callbacks
%%% Internal functions
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @end
%%--------------------------------------------------------------------
-spec init({timeout, name(), coordination_timeout_in_sec()} | normalized_coordination_item()) ->
{ok, state()}.
init({timeout, Name, Timeout}) ->
Pid = spawn(fun() ->
case Timeout of
infinity ->
timeout_fn(Name, infinity, infinity);
Int when is_integer(Int), Int > 0 ->
timeout_fn(Name, timer:seconds(Timeout), infinity)
end
end),
{ok, {timeout, Name, Pid}};
init(CoordinationItem) ->
{ok, Pid} = amoc_coordinator_worker:start_link(CoordinationItem),
{ok, {worker, Pid}}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @end
%%--------------------------------------------------------------------
-spec handle_event(Event :: term(), state()) -> {ok, state()}.
handle_event(Event, {timeout, Name, Pid}) ->
%% there's only one "timeout" event handler for coordinator,
%% so calling amoc_telemetry:execute/3 here to ensure that it's
%% triggered just once per event.
notify_worker(WorkerPid, coordinator_timeout) -> %% synchronous notification
amoc_coordinator_worker:timeout(WorkerPid);
notify_worker(WorkerPid, reset_coordinator) -> %% synchronous notification
amoc_coordinator_worker:reset(WorkerPid);
notify_worker(WorkerPid, {coordinate, {Pid, Data}}) when is_pid(Pid) -> %% async notification
amoc_coordinator_worker:add(WorkerPid, {Pid, Data}).

raise_telemetry_event(Name, Event) ->
TelemetryEvent = case Event of
{coordinate, _} -> add;
reset_coordinator -> reset;
coordinator_timeout -> timeout
end,
amoc_telemetry:execute([coordinator, TelemetryEvent], #{count => 1}, #{name => Name}),
erlang:send(Pid, Event),
{ok, {timeout, Name, Pid}};
handle_event(Event, {worker, WorkerPid}) ->
case Event of
coordinator_timeout -> %% synchronous notification
amoc_coordinator_worker:timeout(WorkerPid);
reset_coordinator -> %% synchronous notification
amoc_coordinator_worker:reset(WorkerPid);
{coordinate, {Pid, Data}} when is_pid(Pid) -> %% async notification
amoc_coordinator_worker:add(WorkerPid, {Pid, Data})
end,
{ok, {worker, WorkerPid}}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), state()) -> {ok, {error, not_implemented}, state()}.
handle_call(_Request, State) ->
{ok, {error, not_implemented}, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @end
%%--------------------------------------------------------------------
-spec terminate(any(), state()) -> ok.
terminate(_, {timeout, _Name, Pid}) ->
erlang:send(Pid, terminate), ok;
terminate(_, {worker, Pid}) ->
%% synchronous notification
amoc_coordinator_worker:stop(Pid), ok.
amoc_telemetry:execute([coordinator, TelemetryEvent], #{count => 1}, #{name => Name}).

%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec normalize_coordination_plan(coordination_plan()) -> [normalized_coordination_item()].
normalize_coordination_plan(CoordinationPlan) when is_tuple(CoordinationPlan) ->
normalize_coordination_plan([CoordinationPlan]);
Expand All @@ -230,15 +144,3 @@ assert_action(N, Action) when is_integer(N),
is_function(Action, 2) orelse
is_function(Action, 3)) ->
ok.

timeout_fn(Name, CoordinationTimeout, Timeout) ->
receive
terminate -> ok;
{coordinate, _} ->
timeout_fn(Name, CoordinationTimeout, CoordinationTimeout);
_ -> %% coordinator_timeout or reset_coordinator
timeout_fn(Name, CoordinationTimeout, infinity)
after Timeout ->
gen_event:notify(Name, coordinator_timeout),
timeout_fn(Name, CoordinationTimeout, infinity)
end.
51 changes: 51 additions & 0 deletions src/coordinator/amoc_coordinator_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
%% @private
%% @see amoc_coordinator
%% @copyright 2023 Erlang Solutions Ltd.
%% @doc Supervisor for a pool of handlers for a new supervision tree
-module(amoc_coordinator_sup).

-behaviour(supervisor).

-export([start_link/1, init/1]).

-spec start_link({amoc_coordinator:name(), amoc_coordinator:coordination_plan(), timeout()}) ->
gen_server:start_ret().
start_link({Name, Plan, Timeout}) ->
supervisor:start_link(?MODULE, {Name, Plan, Timeout}).

-spec init({amoc_coordinator:name(), amoc_coordinator:coordination_plan(), timeout()}) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Plan, Timeout}) ->
%% Order matters, as we need to ensure that:
%% - all the items in the plan with NoOfUsers =:= all are executed last
%% - the timeout handler is executed first
{All, NotAll} = lists:partition(fun partitioner/1, Plan),
OrderedPlan = lists:reverse(All) ++ lists:reverse(NotAll),
OrderedChilds = [ worker_spec(Item) || Item <- OrderedPlan ],
TimeoutChild = timeout_child(Name, Timeout),
Childs = [TimeoutChild | OrderedChilds],

SupFlags = #{strategy => one_for_one, intensity => 1, period => 5},
{ok, {SupFlags, Childs}}.

%% Helpers
timeout_child(Name, Timeout) ->
#{id => {amoc_coordinator_timeout, Name, Timeout},
start => {amoc_coordinator_timeout, start_link, [Name, Timeout]},
restart => permanent,
shutdown => timer:seconds(5),
type => worker,
modules => [amoc_coordinator_timeout]}.

worker_spec(Item) ->
#{id => {amoc_coordinator_worker, Item},
start => {amoc_coordinator_worker, start_link, [Item]},
restart => permanent,
shutdown => timer:seconds(5),
type => worker,
modules => [amoc_coordinator_worker]}.

partitioner({all, _}) ->
true;
partitioner(_) ->
false.
65 changes: 65 additions & 0 deletions src/coordinator/amoc_coordinator_sup_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
%% @private
%% @see amoc_coordinator
%% @copyright 2023 Erlang Solutions Ltd.
%% @doc Global supervisor for all started coordination actions
-module(amoc_coordinator_sup_sup).

-behaviour(supervisor).

-export([start_coordinator/3, stop_coordinator/1, get_workers/1]).

-export([start_link/0, init/1]).

-spec start_coordinator(amoc_coordinator:name(), amoc_coordinator:coordination_plan(), timeout()) ->
{ok, pid()} | {error, term()}.
start_coordinator(Name, Plan, Timeout) ->
case supervisor:start_child(?MODULE, [{Name, Plan, Timeout}]) of
{ok, Coordinator} ->
Children = supervisor:which_children(Coordinator),
[TimeoutWorker] = [ Pid || {{amoc_coordinator_timeout, _, _}, Pid, _, _} <- Children ],
Workers = [ Pid || {{amoc_coordinator_worker, _}, Pid, _, _} <- Children ],
store_coordinator(Name, Coordinator, TimeoutWorker, Workers),
{ok, Coordinator};
Other ->
Other
end.

-spec stop_coordinator(amoc_coordinator:name()) ->
ok | {error, term()}.
stop_coordinator(Name) ->
case ets:lookup(?MODULE, Name) of
[{_, #{coordinator := Coordinator}}] ->
supervisor:terminate_child(?MODULE, Coordinator);
[] ->
{error, not_found}
end.

-spec get_workers(amoc_coordinator:name()) ->
{ok, pid(), [pid()]} | {error, term()}.
get_workers(Name) ->
case ets:lookup(?MODULE, Name) of
[{_, #{timeout_worker := TimeoutWorker, workers := Workers}}] ->
{ok, TimeoutWorker, Workers};
[] ->
{error, not_found}
end.

store_coordinator(Name, Coordinator, TimeoutWorker, Workers) ->
Item = #{coordinator => Coordinator, timeout_worker => TimeoutWorker, workers => Workers},
ets:insert(?MODULE, {Name, Item}).

-spec start_link() -> {ok, Pid :: pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
ets:new(?MODULE, [named_table, ordered_set, public, {read_concurrency, true}]),
SupFlags = #{strategy => simple_one_for_one},
AChild = #{id => amoc_coordinator_sup,
start => {amoc_coordinator_sup, start_link, []},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [amoc_coordinator_sup]},
{ok, {SupFlags, [AChild]}}.
35 changes: 35 additions & 0 deletions src/coordinator/amoc_coordinator_timeout.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-module(amoc_coordinator_timeout).

-export([start_link/2, init/2]).

-spec start_link(amoc_coordinator:name(), timeout()) ->
{ok, pid()}.
start_link(Name, Timeout) ->
proc_lib:start_link(?MODULE, init, [Name, Timeout]).

-spec init(amoc_coordinator:name(), timeout()) ->
no_return().
init(Name, Timeout) ->
proc_lib:init_ack({ok, self()}),
case Timeout of
infinity ->
timeout_fn(Name, infinity, infinity);
Int when is_integer(Int), Int > 0 ->
timeout_fn(Name, timer:seconds(Timeout), infinity)
end.

timeout_fn(Name, CoordinationTimeout, Timeout) ->
%% coordinator_timeout | reset_coordinator | {coordinate, {pid(), term()}}
receive
coordinator_timeout ->
timeout_fn(Name, CoordinationTimeout, CoordinationTimeout);
reset_coordinator ->
timeout_fn(Name, CoordinationTimeout, CoordinationTimeout);
{coordinate, _} ->
timeout_fn(Name, CoordinationTimeout, CoordinationTimeout);
_ -> %% other, end
ok
after Timeout ->
amoc_coordinator:notify(Name, coordinator_timeout),
timeout_fn(Name, CoordinationTimeout, infinity)
end.
Loading

0 comments on commit fcbdeda

Please sign in to comment.