Skip to content

Commit

Permalink
Ensure throttlers are supervised and pooled
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 31, 2023
1 parent fcbdeda commit 9613c17
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 28 deletions.
15 changes: 8 additions & 7 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(WORKER(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(SUP(I, Type), {I, {I, start_link, []}, permanent, infinity, Type, [I]}).

%% ===================================================================
%% API functions
Expand All @@ -32,10 +33,10 @@ start_link() ->
init([]) ->
{ok, {{one_for_one, 5, 10},
[
?CHILD(amoc_users_sup, supervisor),
?CHILD(amoc_coordinator_sup_sup, supervisor),
?CHILD(amoc_controller, worker),
?CHILD(amoc_cluster, worker),
?CHILD(amoc_code_server, worker),
?CHILD(amoc_throttle_controller, worker)
?SUP(amoc_users_sup, supervisor),
?SUP(amoc_throttle_sup, supervisor),
?SUP(amoc_coordinator_sup_sup, supervisor),
?WORKER(amoc_controller, worker),
?WORKER(amoc_cluster, worker),
?WORKER(amoc_code_server, worker)
]}}.
23 changes: 8 additions & 15 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
handle_cast/2,
handle_info/2]).

-define(PG_SCOPE, amoc_throttle).
-define(SERVER, ?MODULE).
-define(MASTER_SERVER, {?SERVER, amoc_cluster:master_node()}).

Expand Down Expand Up @@ -46,7 +47,6 @@
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
pg:start_link(),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(),
Expand Down Expand Up @@ -125,7 +125,7 @@ init([]) ->
{reply, ok, state()} |
{reply, {error, any()}, state()}.
handle_call({start_processes, Name, Rate, Interval, NoOfProcesses}, _From, State) ->
case pg:get_members(Name) of
case pg:get_members(?PG_SCOPE, Name) of
[] ->
RealNoOfProcesses = start_processes(Name, Rate, Interval, NoOfProcesses),
{reply, {ok, started},
Expand Down Expand Up @@ -261,7 +261,7 @@ start_processes(Name, Rate, Interval, NoOfProcesses) ->
-spec get_throttle_process(name()) -> {error, {no_throttle_process_registered, name()}} |
{error, any()} | {ok, pid()}.
get_throttle_process(Name) ->
case pg:get_members(Name) of
case pg:get_members(?PG_SCOPE, Name) of
[] ->
{error, {no_throttle_process_registered, Name}};
List -> %% nonempty list
Expand All @@ -283,7 +283,7 @@ maybe_change_rate(Name, Rate, Interval, Info) ->
-spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) ->
{ok, non_neg_integer()} | {error, any()}.
do_change_rate(Name, Rate, Interval) ->
case pg:get_members(Name) of
case pg:get_members(?PG_SCOPE, Name) of
[] -> {error, no_processes_in_group};
List when is_list(List) ->
RatePerMinute = rate_per_minute(Rate, Interval),
Expand All @@ -302,16 +302,9 @@ start_gradual_rate_change(Name, LowRate, HighRate, RateInterval, StepInterval, N
Plan = #change_rate_plan{high_rate = HighRate, no_of_steps = NoOfSteps, timer = Timer},
Info#throttle_info{rate = LowRate, interval = RateInterval, change_plan = Plan}.

start_throttle_processes(Name, Interval, Rate, 1) ->
start_throttle_process(Name, Interval, Rate);
start_throttle_processes(Name, Interval, Rate, N) when is_integer(N), N > 1 ->
ProcessRate = Rate div N,
start_throttle_process(Name, Interval, ProcessRate),
start_throttle_processes(Name, Interval, Rate - ProcessRate, N - 1).

start_throttle_process(Name, Interval, Rate) ->
{ok, Pid} = amoc_throttle_process:start(Name, Interval, Rate),
pg:join(Name, Pid).
start_throttle_processes(Name, Interval, Rate, N) ->
{ok, Workers} = amoc_throttle_pool:start_process_pool(Name, Interval, Rate, N),
pg:join(?PG_SCOPE, Name, Workers).

update_throttle_processes([Pid], Interval, Rate, 1) ->
amoc_throttle_process:update(Pid, Interval, Rate);
Expand All @@ -321,7 +314,7 @@ update_throttle_processes([Pid | Tail], Interval, Rate, N) when N > 1 ->
update_throttle_processes(Tail, Interval, Rate - ProcessRate, N - 1).

all_processes(Name, Cmd) ->
case pg:get_members(Name) of
case pg:get_members(?PG_SCOPE, Name) of
[] -> {error, no_processes_in_group};
Ps -> [run_cmd(P, Cmd) || P <- Ps], ok
end.
Expand Down
56 changes: 56 additions & 0 deletions src/throttle/amoc_throttle_pool.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
%% @private
%% @see amoc_throttle
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_throttle_pool).

-behaviour(supervisor).

-export([start_process_pool/4]).
-export([start_link/4, init/1]).

-spec start_process_pool(
amoc_throttle:name(),
amoc_throttle:interval(),
amoc_throttle:rate(),
pos_integer()
) -> {ok, [Pid :: pid()]}.
start_process_pool(Name, Interval, Rate, NoOfProcesses) ->
{ok, Sup} = supervisor:start_child(amoc_throttle_sup, [Name, Interval, Rate, NoOfProcesses]),
Children = supervisor:which_children(Sup),
{ok, [ Pid || {_, Pid, _, _} <- Children ]}.

-spec start_link(
amoc_throttle:name(),
amoc_throttle:interval(),
amoc_throttle:rate(),
pos_integer()
) -> {ok, Pid :: pid()}.
start_link(Name, Interval, Rate, NoOfProcesses) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, {Name, Interval, Rate, NoOfProcesses}).

-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Interval, Rate, NoOfProcesses}) ->
Children = [
#{id => amoc_throttle_process,
start => {amoc_throttle_process, start_link, [Name, Interval, RatePerProcess]},
type => worker,
shutdown => timer:seconds(5),
restart => permanent,
modules => [amoc_throttle_process]
}
|| RatePerProcess <- calculate_rate_per_process(Rate, NoOfProcesses)
],

SupFlags = #{strategy => one_for_one},
{ok, {SupFlags, Children}}.

%% Helpers
calculate_rate_per_process(Rate, NoOfProcesses) ->
calculate_rate_per_process([], Rate, NoOfProcesses).

calculate_rate_per_process(Acc, Rate, 1) ->
[Rate | Acc];
calculate_rate_per_process(Acc, Rate, N) when is_integer(N), N > 1 ->
ProcessRate = Rate div N,
calculate_rate_per_process([ProcessRate | Acc], Rate - ProcessRate, N - 1).

23 changes: 23 additions & 0 deletions src/throttle/amoc_throttle_pooler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%% @private
%% @see amoc_throttle
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_throttle_pooler).

-behaviour(supervisor).

-export([start_link/0, init/1]).

-spec start_link() -> {ok, Pid :: pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
ChildSpec = #{id => amoc_throttle_pool,
start => {amoc_throttle_pool, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pool] },
SupFlags = #{strategy => simple_one_for_one},
{ok, {SupFlags, [ChildSpec]}}.
12 changes: 6 additions & 6 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
-behaviour(gen_server).

%% API
-export([start/3,
stop/1,
-export([stop/1,
run/2,
update/3,
pause/1,
resume/1,
get_state/1]).

%% gen_server behaviour
-export([init/1,
-export([start_link/3,
init/1,
handle_call/3,
handle_info/2,
handle_cast/2,
Expand All @@ -39,9 +39,9 @@
%% Exported functions
%%------------------------------------------------------------------------------

-spec start(atom(), amoc_throttle:interval(), amoc_throttle:rate()) -> {ok, pid()}.
start(Name, Interval, Rate) ->
gen_server:start(?MODULE, [Name, Interval, Rate], []).
-spec start_link(atom(), amoc_throttle:interval(), amoc_throttle:rate()) -> {ok, pid()}.
start_link(Name, Interval, Rate) ->
gen_server:start_link(?MODULE, [Name, Interval, Rate], []).

-spec stop(pid()) -> ok.
stop(Pid) ->
Expand Down
57 changes: 57 additions & 0 deletions src/throttle/amoc_throttle_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
%% @private
%% @see amoc_throttle
%% @copyright 2023 Erlang Solutions Ltd.
%% @doc Top supervisor for the controller, the pooler, and the process group
%%
%% The supervision tree is as follows
%%
%% amoc_sup
%% |
%% amoc_throttle_sup
%% / | \
%% / | \
%% / | \
%% pooler controller pg
%% ||
%% (dynamically)
%% ||
%% pool
%% / | \
%% [processes()]
%%
%% Where the pool, on creation, subscribes all its children to the named process group
%% @end
-module(amoc_throttle_sup).

-behaviour(supervisor).

-define(PG_SCOPE, amoc_throttle).

-export([start_link/0, init/1]).

-spec start_link() -> {ok, Pid :: pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
SupFlags = #{strategy => one_for_one},
Pooler = #{id => amoc_throttle_pooler,
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
Controller = #{id => amoc_throttle_controller,
start => {amoc_throttle_controller, start_link, []},
type => worker,
shutdown => timer:seconds(5),
restart => permanent,
modules => [amoc_throttle_controller]},
Pg = #{id => pg,
start => {pg, start_link, [?PG_SCOPE]},
type => worker,
shutdown => timer:seconds(5),
restart => permanent,
modules => [pg]},
{ok, {SupFlags, [Pooler, Controller, Pg]}}.

0 comments on commit 9613c17

Please sign in to comment.