Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttles/rework run and sending logic #180

Merged
merged 13 commits into from
Jan 18, 2024
43 changes: 22 additions & 21 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
send/3,
send/2,
send_and_wait/2,
wait/1,
run/2,
pause/1,
resume/1,
change_rate/3,
change_rate_gradually/6,
stop/1]).

-deprecated([{send_and_wait, 2, "use wait/1 instead"}]).

-define(DEFAULT_NO_PROCESSES, 10).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)).
Expand Down Expand Up @@ -123,40 +126,38 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSt
%% deactivate Async runner
%% Async runner ->Throttle process:'DOWN'
%% destroy Async runner
%% '''
%% '''
%% for the local execution, req/exec rates are increased only by throttle process.
-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
amoc_throttle_controller:run(Name, Fn).
amoc_throttle_runner:throttle(Name, Fn).

%% @see send/3
%% @doc Sends a given message to `erlang:self()'
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
amoc_throttle_runner:throttle(Name, {self(), Msg}).

%% @doc Sends a given message `Msg' to a given `Pid' when the rate for `Name' allows for that.
%%
%% May be used to schedule tasks.
-spec send(name(), pid(), any()) -> ok | {error, any()}.
send(Name, Pid, Msg) ->
run(Name, fun() -> Pid ! Msg end).

%% @doc Sends a given message to `erlang:self()'
%% @see send/3
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
send(Name, self(), Msg).
amoc_throttle_runner:throttle(Name, {Pid, Msg}).

%% @doc Sends and receives the given message `Msg'.
%% Can be used to halt execution if we want a process to be idle when waiting for rate increase or other processes finishing their tasks.
%%
%% Deprecated in favour of `wait/1'
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
send_and_wait(Name, Msg) ->
case send(Name, Msg) of
ok ->
receive
Msg -> ok
end;
Error ->
Error
end.
send_and_wait(Name, _) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Blocks the caller until the throttle mechanism allows.
-spec wait(name()) -> ok | {error, any()}.
wait(Name) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Stops the throttle mechanism for the given `Name'.
-spec stop(name()) -> ok | {error, any()}.
stop(Name) ->
amoc_throttle_controller:stop(Name),
ok.
amoc_throttle_controller:stop(Name).
40 changes: 15 additions & 25 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Manages throttle processes and rate changes.
-module(amoc_throttle_controller).

-behaviour(gen_server).
Expand All @@ -10,7 +11,7 @@
ensure_throttle_processes_started/4,
pause/1, resume/1, stop/1,
change_rate/3, change_rate_gradually/6,
run/2, telemetry_event/2]).
raise_event_on_slave_node/2, telemetry_event/2]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -38,6 +39,7 @@
-type change_rate_plan() :: #change_rate_plan{}.
-type throttle_info() :: #throttle_info{}.
-type state() :: #{name() => throttle_info()}.
-type event() :: init | execute | request.

%%%===================================================================
%%% API
Expand All @@ -48,29 +50,14 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(),
-spec ensure_throttle_processes_started(name(), amoc_throttle:rate(),
amoc_throttle:interval(), pos_integer()) ->
{ok, started | already_started} |
{error, wrong_reconfiguration | wrong_no_of_procs}).
{error, wrong_reconfiguration | wrong_no_of_procs}.
ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses) ->
maybe_raise_event(Name, init),
raise_event_on_slave_node(Name, init),
gen_server:call(?MASTER_SERVER, {start_processes, Name, Rate, Interval, NoOfProcesses}).

-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Pid} ->
maybe_raise_event(Name, request),
Fun =
fun() ->
maybe_raise_event(Name, execute),
Fn()
end,
amoc_throttle_process:run(Pid, Fun),
ok;
Error -> Error
end.

-spec pause(name()) -> ok | {error, any()}.
pause(Name) ->
gen_server:call(?MASTER_SERVER, {pause, Name}).
Expand Down Expand Up @@ -98,6 +85,15 @@ stop(Name) ->
telemetry_event(Name, Event) when Event =:= request; Event =:= execute ->
raise_event(Name, Event).

%% The purpose of this function is to ensure that there are no event duplicates if we are running in
%% a single (non-distributed) node, as the throttle process will already raise this event.
-spec raise_event_on_slave_node(name(), event()) -> ok.
raise_event_on_slave_node(Name, Event) ->
DenysGonchar marked this conversation as resolved.
Show resolved Hide resolved
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -204,12 +200,6 @@ handle_info({change_plan, Name}, State) ->
%%% Internal functions
%%%===================================================================

maybe_raise_event(Name, Event) ->
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}).

Expand Down
18 changes: 7 additions & 11 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc This process's only responsibility is to notify runners that
%% they can run exactly when allowed by the throttling mechanism.
-module(amoc_throttle_process).
-behaviour(gen_server).

Expand Down Expand Up @@ -35,8 +37,8 @@
interval = 0 :: amoc_throttle:interval(), %%ms
delay_between_executions = 0 :: non_neg_integer(), %%ms
tref :: timer:tref() | undefined,
schedule = [] :: [pid()],
schedule_reversed = [] :: [pid()]}).
schedule = [] :: [AmocThrottleRunnerProcess :: pid()],
schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}).

-type state() :: #state{}.
%%------------------------------------------------------------------------------
Expand All @@ -51,9 +53,8 @@ start_link(Name, Interval, Rate) ->
stop(Pid) ->
gen_server:cast(Pid, stop_process).

-spec run(pid(), fun(() -> any())) -> ok.
run(Pid, Fun) ->
RunnerPid = spawn(fun() -> async_runner(Fun) end),
-spec run(pid(), pid()) -> ok.
run(Pid, RunnerPid) ->
gen_server:cast(Pid, {schedule, RunnerPid}).

-spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok.
Expand Down Expand Up @@ -216,15 +217,10 @@ maybe_run_fn(State) ->

run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
RunnerPid ! scheduled,
amoc_throttle_runner:run(RunnerPid),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T, n = N - 1}.

async_runner(Fun) ->
receive
scheduled -> Fun()
end.

timeout(State) ->
State#state.interval + ?DEFAULT_MSG_TIMEOUT.

Expand Down
61 changes: 61 additions & 0 deletions src/throttle/amoc_throttle_runner.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Asynchronous runner that always runs together with the requesting process.
%%
%% Knows how to distinguist between the different operations the caller needs.
-module(amoc_throttle_runner).

-export([throttle/2, run/1]).
-export([async_runner/4]).

-type action() :: wait | {pid(), term()} | fun(() -> any()).

-spec run(pid()) -> term().
run(RunnerPid) ->
RunnerPid ! '$scheduled'.

-spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}.
throttle(Name, Action) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, ThrottlerPid} ->
Args = [Name, self(), ThrottlerPid, Action],
RunnerPid = erlang:spawn_link(?MODULE, async_runner, Args),
amoc_throttle_process:run(ThrottlerPid, RunnerPid),
maybe_wait(Action, RunnerPid);
Error ->
Error
end.

-spec maybe_wait(action(), pid()) -> ok.
maybe_wait(wait, RunnerPid) ->
receive
{'EXIT', RunnerPid, Reason} ->
exit({throttle_wait_died, RunnerPid, Reason});
'$scheduled' ->
ok
end;
maybe_wait(_, _) ->
ok.

-spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return().
async_runner(Name, Caller, ThrottlerPid, Action) ->
ThrottlerMonitor = erlang:monitor(process, ThrottlerPid),
amoc_throttle_controller:raise_event_on_slave_node(Name, request),
receive
{'DOWN', ThrottlerMonitor, process, ThrottlerPid, Reason} ->
exit({throttler_worker_died, ThrottlerPid, Reason});
'$scheduled' ->
execute(Caller, Action),
amoc_throttle_controller:raise_event_on_slave_node(Name, execute),
%% If Action failed, unlink won't be called and the caller will receive an exit signal
erlang:unlink(Caller)
end.

-spec execute(pid(), action()) -> term().
execute(Caller, wait) ->
Caller ! '$scheduled';
execute(_Caller, Fun) when is_function(Fun, 0) ->
Fun();
execute(_Caller, {Pid, Msg}) ->
Pid ! Msg.
10 changes: 5 additions & 5 deletions src/throttle/amoc_throttle_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ init([]) ->
restart => permanent,
modules => [amoc_throttle_controller]},
Pooler = #{id => amoc_throttle_pooler,
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
{ok, {SupFlags, [Pg, Controller, Pooler]}}.
Loading