Skip to content

Commit

Permalink
Merge pull request #180 from esl/throttles/rework_run_and_sending_logic
Browse files Browse the repository at this point in the history
Throttles/rework run and sending logic
  • Loading branch information
DenysGonchar authored Jan 18, 2024
2 parents 9870612 + 5060d89 commit e63ae4b
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 76 deletions.
43 changes: 22 additions & 21 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
send/3,
send/2,
send_and_wait/2,
wait/1,
run/2,
pause/1,
resume/1,
change_rate/3,
change_rate_gradually/6,
stop/1]).

-deprecated([{send_and_wait, 2, "use wait/1 instead"}]).

-define(DEFAULT_NO_PROCESSES, 10).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)).
Expand Down Expand Up @@ -123,40 +126,38 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSt
%% deactivate Async runner
%% Async runner ->Throttle process:'DOWN'
%% destroy Async runner
%% '''
%% '''
%% for the local execution, req/exec rates are increased only by throttle process.
-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
amoc_throttle_controller:run(Name, Fn).
amoc_throttle_runner:throttle(Name, Fn).

%% @see send/3
%% @doc Sends a given message to `erlang:self()'
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
amoc_throttle_runner:throttle(Name, {self(), Msg}).

%% @doc Sends a given message `Msg' to a given `Pid' when the rate for `Name' allows for that.
%%
%% May be used to schedule tasks.
-spec send(name(), pid(), any()) -> ok | {error, any()}.
send(Name, Pid, Msg) ->
run(Name, fun() -> Pid ! Msg end).

%% @doc Sends a given message to `erlang:self()'
%% @see send/3
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
send(Name, self(), Msg).
amoc_throttle_runner:throttle(Name, {Pid, Msg}).

%% @doc Sends and receives the given message `Msg'.
%% Can be used to halt execution if we want a process to be idle when waiting for rate increase or other processes finishing their tasks.
%%
%% Deprecated in favour of `wait/1'
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
send_and_wait(Name, Msg) ->
case send(Name, Msg) of
ok ->
receive
Msg -> ok
end;
Error ->
Error
end.
send_and_wait(Name, _) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Blocks the caller until the throttle mechanism allows.
-spec wait(name()) -> ok | {error, any()}.
wait(Name) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Stops the throttle mechanism for the given `Name'.
-spec stop(name()) -> ok | {error, any()}.
stop(Name) ->
amoc_throttle_controller:stop(Name),
ok.
amoc_throttle_controller:stop(Name).
40 changes: 15 additions & 25 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Manages throttle processes and rate changes.
-module(amoc_throttle_controller).

-behaviour(gen_server).
Expand All @@ -10,7 +11,7 @@
ensure_throttle_processes_started/4,
pause/1, resume/1, stop/1,
change_rate/3, change_rate_gradually/6,
run/2, telemetry_event/2]).
raise_event_on_slave_node/2, telemetry_event/2]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -38,6 +39,7 @@
-type change_rate_plan() :: #change_rate_plan{}.
-type throttle_info() :: #throttle_info{}.
-type state() :: #{name() => throttle_info()}.
-type event() :: init | execute | request.

%%%===================================================================
%%% API
Expand All @@ -48,29 +50,14 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(),
-spec ensure_throttle_processes_started(name(), amoc_throttle:rate(),
amoc_throttle:interval(), pos_integer()) ->
{ok, started | already_started} |
{error, wrong_reconfiguration | wrong_no_of_procs}).
{error, wrong_reconfiguration | wrong_no_of_procs}.
ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses) ->
maybe_raise_event(Name, init),
raise_event_on_slave_node(Name, init),
gen_server:call(?MASTER_SERVER, {start_processes, Name, Rate, Interval, NoOfProcesses}).

-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Pid} ->
maybe_raise_event(Name, request),
Fun =
fun() ->
maybe_raise_event(Name, execute),
Fn()
end,
amoc_throttle_process:run(Pid, Fun),
ok;
Error -> Error
end.

-spec pause(name()) -> ok | {error, any()}.
pause(Name) ->
gen_server:call(?MASTER_SERVER, {pause, Name}).
Expand Down Expand Up @@ -98,6 +85,15 @@ stop(Name) ->
telemetry_event(Name, Event) when Event =:= request; Event =:= execute ->
raise_event(Name, Event).

%% The purpose of this function is to ensure that there are no event duplicates if we are running in
%% a single (non-distributed) node, as the throttle process will already raise this event.
-spec raise_event_on_slave_node(name(), event()) -> ok.
raise_event_on_slave_node(Name, Event) ->
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -204,12 +200,6 @@ handle_info({change_plan, Name}, State) ->
%%% Internal functions
%%%===================================================================

maybe_raise_event(Name, Event) ->
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}).

Expand Down
18 changes: 7 additions & 11 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc This process's only responsibility is to notify runners that
%% they can run exactly when allowed by the throttling mechanism.
-module(amoc_throttle_process).
-behaviour(gen_server).

Expand Down Expand Up @@ -35,8 +37,8 @@
interval = 0 :: amoc_throttle:interval(), %%ms
delay_between_executions = 0 :: non_neg_integer(), %%ms
tref :: timer:tref() | undefined,
schedule = [] :: [pid()],
schedule_reversed = [] :: [pid()]}).
schedule = [] :: [AmocThrottleRunnerProcess :: pid()],
schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}).

-type state() :: #state{}.
%%------------------------------------------------------------------------------
Expand All @@ -51,9 +53,8 @@ start_link(Name, Interval, Rate) ->
stop(Pid) ->
gen_server:cast(Pid, stop_process).

-spec run(pid(), fun(() -> any())) -> ok.
run(Pid, Fun) ->
RunnerPid = spawn(fun() -> async_runner(Fun) end),
-spec run(pid(), pid()) -> ok.
run(Pid, RunnerPid) ->
gen_server:cast(Pid, {schedule, RunnerPid}).

-spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok.
Expand Down Expand Up @@ -216,15 +217,10 @@ maybe_run_fn(State) ->

run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
RunnerPid ! scheduled,
amoc_throttle_runner:run(RunnerPid),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T, n = N - 1}.

async_runner(Fun) ->
receive
scheduled -> Fun()
end.

timeout(State) ->
State#state.interval + ?DEFAULT_MSG_TIMEOUT.

Expand Down
61 changes: 61 additions & 0 deletions src/throttle/amoc_throttle_runner.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Asynchronous runner that always runs together with the requesting process.
%%
%% Knows how to distinguist between the different operations the caller needs.
-module(amoc_throttle_runner).

-export([throttle/2, run/1]).
-export([async_runner/4]).

-type action() :: wait | {pid(), term()} | fun(() -> any()).

-spec run(pid()) -> term().
run(RunnerPid) ->
RunnerPid ! '$scheduled'.

-spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}.
throttle(Name, Action) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, ThrottlerPid} ->
Args = [Name, self(), ThrottlerPid, Action],
RunnerPid = erlang:spawn_link(?MODULE, async_runner, Args),
amoc_throttle_process:run(ThrottlerPid, RunnerPid),
maybe_wait(Action, RunnerPid);
Error ->
Error
end.

-spec maybe_wait(action(), pid()) -> ok.
maybe_wait(wait, RunnerPid) ->
receive
{'EXIT', RunnerPid, Reason} ->
exit({throttle_wait_died, RunnerPid, Reason});
'$scheduled' ->
ok
end;
maybe_wait(_, _) ->
ok.

-spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return().
async_runner(Name, Caller, ThrottlerPid, Action) ->
ThrottlerMonitor = erlang:monitor(process, ThrottlerPid),
amoc_throttle_controller:raise_event_on_slave_node(Name, request),
receive
{'DOWN', ThrottlerMonitor, process, ThrottlerPid, Reason} ->
exit({throttler_worker_died, ThrottlerPid, Reason});
'$scheduled' ->
execute(Caller, Action),
amoc_throttle_controller:raise_event_on_slave_node(Name, execute),
%% If Action failed, unlink won't be called and the caller will receive an exit signal
erlang:unlink(Caller)
end.

-spec execute(pid(), action()) -> term().
execute(Caller, wait) ->
Caller ! '$scheduled';
execute(_Caller, Fun) when is_function(Fun, 0) ->
Fun();
execute(_Caller, {Pid, Msg}) ->
Pid ! Msg.
10 changes: 5 additions & 5 deletions src/throttle/amoc_throttle_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ init([]) ->
restart => permanent,
modules => [amoc_throttle_controller]},
Pooler = #{id => amoc_throttle_pooler,
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
{ok, {SupFlags, [Pg, Controller, Pooler]}}.
Loading

0 comments on commit e63ae4b

Please sign in to comment.