From 9613c17414c787d4cd23554f6474c9345ab07052 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sun, 31 Dec 2023 10:19:57 +0100 Subject: [PATCH] Ensure throttlers are supervised and pooled --- src/amoc_sup.erl | 15 +++--- src/throttle/amoc_throttle_controller.erl | 23 ++++----- src/throttle/amoc_throttle_pool.erl | 56 ++++++++++++++++++++++ src/throttle/amoc_throttle_pooler.erl | 23 +++++++++ src/throttle/amoc_throttle_process.erl | 12 ++--- src/throttle/amoc_throttle_sup.erl | 57 +++++++++++++++++++++++ 6 files changed, 158 insertions(+), 28 deletions(-) create mode 100644 src/throttle/amoc_throttle_pool.erl create mode 100644 src/throttle/amoc_throttle_pooler.erl create mode 100644 src/throttle/amoc_throttle_sup.erl diff --git a/src/amoc_sup.erl b/src/amoc_sup.erl index 92fb9eb1..c0d08f3a 100644 --- a/src/amoc_sup.erl +++ b/src/amoc_sup.erl @@ -11,7 +11,8 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(WORKER(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(SUP(I, Type), {I, {I, start_link, []}, permanent, infinity, Type, [I]}). %% =================================================================== %% API functions @@ -32,10 +33,10 @@ start_link() -> init([]) -> {ok, {{one_for_one, 5, 10}, [ - ?CHILD(amoc_users_sup, supervisor), - ?CHILD(amoc_coordinator_sup_sup, supervisor), - ?CHILD(amoc_controller, worker), - ?CHILD(amoc_cluster, worker), - ?CHILD(amoc_code_server, worker), - ?CHILD(amoc_throttle_controller, worker) + ?SUP(amoc_users_sup, supervisor), + ?SUP(amoc_throttle_sup, supervisor), + ?SUP(amoc_coordinator_sup_sup, supervisor), + ?WORKER(amoc_controller, worker), + ?WORKER(amoc_cluster, worker), + ?WORKER(amoc_code_server, worker) ]}}. diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index 3e62ad49..5c956fca 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -18,6 +18,7 @@ handle_cast/2, handle_info/2]). +-define(PG_SCOPE, amoc_throttle). -define(SERVER, ?MODULE). -define(MASTER_SERVER, {?SERVER, amoc_cluster:master_node()}). @@ -46,7 +47,6 @@ -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> - pg:start_link(), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(), @@ -125,7 +125,7 @@ init([]) -> {reply, ok, state()} | {reply, {error, any()}, state()}. handle_call({start_processes, Name, Rate, Interval, NoOfProcesses}, _From, State) -> - case pg:get_members(Name) of + case pg:get_members(?PG_SCOPE, Name) of [] -> RealNoOfProcesses = start_processes(Name, Rate, Interval, NoOfProcesses), {reply, {ok, started}, @@ -261,7 +261,7 @@ start_processes(Name, Rate, Interval, NoOfProcesses) -> -spec get_throttle_process(name()) -> {error, {no_throttle_process_registered, name()}} | {error, any()} | {ok, pid()}. get_throttle_process(Name) -> - case pg:get_members(Name) of + case pg:get_members(?PG_SCOPE, Name) of [] -> {error, {no_throttle_process_registered, Name}}; List -> %% nonempty list @@ -283,7 +283,7 @@ maybe_change_rate(Name, Rate, Interval, Info) -> -spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> {ok, non_neg_integer()} | {error, any()}. do_change_rate(Name, Rate, Interval) -> - case pg:get_members(Name) of + case pg:get_members(?PG_SCOPE, Name) of [] -> {error, no_processes_in_group}; List when is_list(List) -> RatePerMinute = rate_per_minute(Rate, Interval), @@ -302,16 +302,9 @@ start_gradual_rate_change(Name, LowRate, HighRate, RateInterval, StepInterval, N Plan = #change_rate_plan{high_rate = HighRate, no_of_steps = NoOfSteps, timer = Timer}, Info#throttle_info{rate = LowRate, interval = RateInterval, change_plan = Plan}. -start_throttle_processes(Name, Interval, Rate, 1) -> - start_throttle_process(Name, Interval, Rate); -start_throttle_processes(Name, Interval, Rate, N) when is_integer(N), N > 1 -> - ProcessRate = Rate div N, - start_throttle_process(Name, Interval, ProcessRate), - start_throttle_processes(Name, Interval, Rate - ProcessRate, N - 1). - -start_throttle_process(Name, Interval, Rate) -> - {ok, Pid} = amoc_throttle_process:start(Name, Interval, Rate), - pg:join(Name, Pid). +start_throttle_processes(Name, Interval, Rate, N) -> + {ok, Workers} = amoc_throttle_pool:start_process_pool(Name, Interval, Rate, N), + pg:join(?PG_SCOPE, Name, Workers). update_throttle_processes([Pid], Interval, Rate, 1) -> amoc_throttle_process:update(Pid, Interval, Rate); @@ -321,7 +314,7 @@ update_throttle_processes([Pid | Tail], Interval, Rate, N) when N > 1 -> update_throttle_processes(Tail, Interval, Rate - ProcessRate, N - 1). all_processes(Name, Cmd) -> - case pg:get_members(Name) of + case pg:get_members(?PG_SCOPE, Name) of [] -> {error, no_processes_in_group}; Ps -> [run_cmd(P, Cmd) || P <- Ps], ok end. diff --git a/src/throttle/amoc_throttle_pool.erl b/src/throttle/amoc_throttle_pool.erl new file mode 100644 index 00000000..5a7564a7 --- /dev/null +++ b/src/throttle/amoc_throttle_pool.erl @@ -0,0 +1,56 @@ +%% @private +%% @see amoc_throttle +%% @copyright 2023 Erlang Solutions Ltd. +-module(amoc_throttle_pool). + +-behaviour(supervisor). + +-export([start_process_pool/4]). +-export([start_link/4, init/1]). + +-spec start_process_pool( + amoc_throttle:name(), + amoc_throttle:interval(), + amoc_throttle:rate(), + pos_integer() + ) -> {ok, [Pid :: pid()]}. +start_process_pool(Name, Interval, Rate, NoOfProcesses) -> + {ok, Sup} = supervisor:start_child(amoc_throttle_sup, [Name, Interval, Rate, NoOfProcesses]), + Children = supervisor:which_children(Sup), + {ok, [ Pid || {_, Pid, _, _} <- Children ]}. + +-spec start_link( + amoc_throttle:name(), + amoc_throttle:interval(), + amoc_throttle:rate(), + pos_integer() + ) -> {ok, Pid :: pid()}. +start_link(Name, Interval, Rate, NoOfProcesses) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, {Name, Interval, Rate, NoOfProcesses}). + +-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init({Name, Interval, Rate, NoOfProcesses}) -> + Children = [ + #{id => amoc_throttle_process, + start => {amoc_throttle_process, start_link, [Name, Interval, RatePerProcess]}, + type => worker, + shutdown => timer:seconds(5), + restart => permanent, + modules => [amoc_throttle_process] + } + || RatePerProcess <- calculate_rate_per_process(Rate, NoOfProcesses) + ], + + SupFlags = #{strategy => one_for_one}, + {ok, {SupFlags, Children}}. + +%% Helpers +calculate_rate_per_process(Rate, NoOfProcesses) -> + calculate_rate_per_process([], Rate, NoOfProcesses). + +calculate_rate_per_process(Acc, Rate, 1) -> + [Rate | Acc]; +calculate_rate_per_process(Acc, Rate, N) when is_integer(N), N > 1 -> + ProcessRate = Rate div N, + calculate_rate_per_process([ProcessRate | Acc], Rate - ProcessRate, N - 1). + diff --git a/src/throttle/amoc_throttle_pooler.erl b/src/throttle/amoc_throttle_pooler.erl new file mode 100644 index 00000000..7cf73766 --- /dev/null +++ b/src/throttle/amoc_throttle_pooler.erl @@ -0,0 +1,23 @@ +%% @private +%% @see amoc_throttle +%% @copyright 2023 Erlang Solutions Ltd. +-module(amoc_throttle_pooler). + +-behaviour(supervisor). + +-export([start_link/0, init/1]). + +-spec start_link() -> {ok, Pid :: pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init([]) -> + ChildSpec = #{id => amoc_throttle_pool, + start => {amoc_throttle_pool, start_link, []}, + type => supervisor, + shutdown => infinity, + restart => permanent, + modules => [amoc_throttle_pool] }, + SupFlags = #{strategy => simple_one_for_one}, + {ok, {SupFlags, [ChildSpec]}}. diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index 9a526384..27289b2d 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -5,8 +5,7 @@ -behaviour(gen_server). %% API --export([start/3, - stop/1, +-export([stop/1, run/2, update/3, pause/1, @@ -14,7 +13,8 @@ get_state/1]). %% gen_server behaviour --export([init/1, +-export([start_link/3, + init/1, handle_call/3, handle_info/2, handle_cast/2, @@ -39,9 +39,9 @@ %% Exported functions %%------------------------------------------------------------------------------ --spec start(atom(), amoc_throttle:interval(), amoc_throttle:rate()) -> {ok, pid()}. -start(Name, Interval, Rate) -> - gen_server:start(?MODULE, [Name, Interval, Rate], []). +-spec start_link(atom(), amoc_throttle:interval(), amoc_throttle:rate()) -> {ok, pid()}. +start_link(Name, Interval, Rate) -> + gen_server:start_link(?MODULE, [Name, Interval, Rate], []). -spec stop(pid()) -> ok. stop(Pid) -> diff --git a/src/throttle/amoc_throttle_sup.erl b/src/throttle/amoc_throttle_sup.erl new file mode 100644 index 00000000..7247fdb9 --- /dev/null +++ b/src/throttle/amoc_throttle_sup.erl @@ -0,0 +1,57 @@ +%% @private +%% @see amoc_throttle +%% @copyright 2023 Erlang Solutions Ltd. +%% @doc Top supervisor for the controller, the pooler, and the process group +%% +%% The supervision tree is as follows +%% +%% amoc_sup +%% | +%% amoc_throttle_sup +%% / | \ +%% / | \ +%% / | \ +%% pooler controller pg +%% || +%% (dynamically) +%% || +%% pool +%% / | \ +%% [processes()] +%% +%% Where the pool, on creation, subscribes all its children to the named process group +%% @end +-module(amoc_throttle_sup). + +-behaviour(supervisor). + +-define(PG_SCOPE, amoc_throttle). + +-export([start_link/0, init/1]). + +-spec start_link() -> {ok, Pid :: pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init([]) -> + SupFlags = #{strategy => one_for_one}, + Pooler = #{id => amoc_throttle_pooler, + start => {amoc_throttle_pooler, start_link, []}, + type => supervisor, + shutdown => infinity, + restart => permanent, + modules => [amoc_throttle_pooler]}, + Controller = #{id => amoc_throttle_controller, + start => {amoc_throttle_controller, start_link, []}, + type => worker, + shutdown => timer:seconds(5), + restart => permanent, + modules => [amoc_throttle_controller]}, + Pg = #{id => pg, + start => {pg, start_link, [?PG_SCOPE]}, + type => worker, + shutdown => timer:seconds(5), + restart => permanent, + modules => [pg]}, + {ok, {SupFlags, [Pooler, Controller, Pg]}}.