Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttles/rework run and sending logic #180

Merged
merged 13 commits into from
Jan 18, 2024
43 changes: 22 additions & 21 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
send/3,
send/2,
send_and_wait/2,
wait/1,
run/2,
pause/1,
resume/1,
change_rate/3,
change_rate_gradually/6,
stop/1]).

-deprecated([{send_and_wait, 2, "use wait/1 instead"}]).

-define(DEFAULT_NO_PROCESSES, 10).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)).
Expand Down Expand Up @@ -123,40 +126,38 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSt
%% deactivate Async runner
%% Async runner ->Throttle process:'DOWN'
%% destroy Async runner
%% '''
%% '''
%% for the local execution, req/exec rates are increased only by throttle process.
-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
amoc_throttle_controller:run(Name, Fn).
amoc_throttle_runner:throttle(Name, Fn).

%% @see send/3
%% @doc Sends a given message to `erlang:self()'
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
amoc_throttle_runner:throttle(Name, {self(), Msg}).

%% @doc Sends a given message `Msg' to a given `Pid' when the rate for `Name' allows for that.
%%
%% May be used to schedule tasks.
-spec send(name(), pid(), any()) -> ok | {error, any()}.
send(Name, Pid, Msg) ->
run(Name, fun() -> Pid ! Msg end).

%% @doc Sends a given message to `erlang:self()'
%% @see send/3
-spec send(name(), any()) -> ok | {error, any()}.
send(Name, Msg) ->
send(Name, self(), Msg).
amoc_throttle_runner:throttle(Name, {Pid, Msg}).

%% @doc Sends and receives the given message `Msg'.
%% Can be used to halt execution if we want a process to be idle when waiting for rate increase or other processes finishing their tasks.
%%
%% Deprecated in favour of `wait/1'
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
send_and_wait(Name, Msg) ->
case send(Name, Msg) of
ok ->
receive
Msg -> ok
end;
Error ->
Error
end.
send_and_wait(Name, _) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Blocks the caller until the throttle mechanism allows.
-spec wait(name()) -> ok | {error, any()}.
wait(Name) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Stops the throttle mechanism for the given `Name'.
-spec stop(name()) -> ok | {error, any()}.
stop(Name) ->
amoc_throttle_controller:stop(Name),
ok.
amoc_throttle_controller:stop(Name).
38 changes: 13 additions & 25 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Manages throttle processes and rate changes.
-module(amoc_throttle_controller).

-behaviour(gen_server).
Expand All @@ -10,7 +11,7 @@
ensure_throttle_processes_started/4,
pause/1, resume/1, stop/1,
change_rate/3, change_rate_gradually/6,
run/2, telemetry_event/2]).
raise_event_on_slave_node/2, telemetry_event/2]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -38,6 +39,7 @@
-type change_rate_plan() :: #change_rate_plan{}.
-type throttle_info() :: #throttle_info{}.
-type state() :: #{name() => throttle_info()}.
-type event() :: init | execute | request.

%%%===================================================================
%%% API
Expand All @@ -48,29 +50,14 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(),
-spec ensure_throttle_processes_started(name(), amoc_throttle:rate(),
amoc_throttle:interval(), pos_integer()) ->
{ok, started | already_started} |
{error, wrong_reconfiguration | wrong_no_of_procs}).
{error, wrong_reconfiguration | wrong_no_of_procs}.
ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses) ->
maybe_raise_event(Name, init),
raise_event_on_slave_node(Name, init),
gen_server:call(?MASTER_SERVER, {start_processes, Name, Rate, Interval, NoOfProcesses}).

-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Pid} ->
maybe_raise_event(Name, request),
Fun =
fun() ->
maybe_raise_event(Name, execute),
Fn()
end,
amoc_throttle_process:run(Pid, Fun),
ok;
Error -> Error
end.

-spec pause(name()) -> ok | {error, any()}.
pause(Name) ->
gen_server:call(?MASTER_SERVER, {pause, Name}).
Expand Down Expand Up @@ -98,6 +85,13 @@ stop(Name) ->
telemetry_event(Name, Event) when Event =:= request; Event =:= execute ->
raise_event(Name, Event).

-spec raise_event_on_slave_node(name(), event()) -> ok.
raise_event_on_slave_node(Name, Event) ->
DenysGonchar marked this conversation as resolved.
Show resolved Hide resolved
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -204,12 +198,6 @@ handle_info({change_plan, Name}, State) ->
%%% Internal functions
%%%===================================================================

maybe_raise_event(Name, Event) ->
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> raise_event(Name, Event)
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}).

Expand Down
18 changes: 7 additions & 11 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc This process's only responsibility is to notify runners that
%% they can run exactly when allowed by the throttling mechanism.
-module(amoc_throttle_process).
-behaviour(gen_server).

Expand Down Expand Up @@ -35,8 +37,8 @@
interval = 0 :: amoc_throttle:interval(), %%ms
delay_between_executions = 0 :: non_neg_integer(), %%ms
tref :: timer:tref() | undefined,
schedule = [] :: [pid()],
schedule_reversed = [] :: [pid()]}).
schedule = [] :: [amoc_throttle_runner:runner()],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid introduction of unneeded types. it is pid(), it has to be pid() and it cannot be anything else. since we set up process monitoring with erlang:monitor/2 for the items in this list. if you want to highlight that it's a process created by amoc_trottle_runner, then the spec could look like [AmocTrottleRunnerProcess::pid()]

schedule_reversed = [] :: [amoc_throttle_runner:runner()]}).

-type state() :: #state{}.
%%------------------------------------------------------------------------------
Expand All @@ -51,9 +53,8 @@ start_link(Name, Interval, Rate) ->
stop(Pid) ->
gen_server:cast(Pid, stop_process).

-spec run(pid(), fun(() -> any())) -> ok.
run(Pid, Fun) ->
RunnerPid = spawn(fun() -> async_runner(Fun) end),
-spec run(pid(), amoc_throttle_runner:runner()) -> ok.
run(Pid, RunnerPid) ->
gen_server:cast(Pid, {schedule, RunnerPid}).

-spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok.
Expand Down Expand Up @@ -216,15 +217,10 @@ maybe_run_fn(State) ->

run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
RunnerPid ! scheduled,
amoc_throttle_runner:run(RunnerPid),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T, n = N - 1}.

async_runner(Fun) ->
receive
scheduled -> Fun()
end.

timeout(State) ->
State#state.interval + ?DEFAULT_MSG_TIMEOUT.

Expand Down
61 changes: 61 additions & 0 deletions src/throttle/amoc_throttle_runner.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
%% @private
%% @see amoc_throttle
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Asynchronous runner that always runs together with the requesting process.
%%
%% Knows how to distinguist between the different operations the caller needs.
-module(amoc_throttle_runner).

-export([throttle/2, run/1]).
-export([async_runner/3]).

-type action() :: wait | {pid(), term()} | fun(() -> any()).
-type runner() :: pid().
-export_type([runner/0]).

-spec run(runner()) -> reference().
run(RunnerPid) ->
Ref = erlang:monitor(process, RunnerPid),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the time this function is executed, the monitoring is already set up. 2 monitors is definetly a bug, please remove this call.

since amoc_throttle_process relies on monitor messages I would expect it to explicitely create that monitors

RunnerPid ! '$scheduled',
Ref.

-spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}.
throttle(Name, Action) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Throttler} ->
RunnerPid = erlang:spawn_link(?MODULE, async_runner, [Name, self(), Action]),
amoc_throttle_process:run(Throttler, RunnerPid),
maybe_wait(Action, RunnerPid);
Error ->
Error
end.

-spec maybe_wait(action(), runner()) -> ok.
maybe_wait(wait, RunnerPid) ->
receive
{'EXIT', RunnerPid, Reason} ->
exit({throttle_wait_died, RunnerPid, Reason});
'$scheduled' ->
ok
end;
maybe_wait(_, _) ->
ok.

-spec async_runner(amoc_throttle:name(), pid(), term()) -> no_return().
async_runner(Name, Caller, Action) ->
DenysGonchar marked this conversation as resolved.
Show resolved Hide resolved
amoc_throttle_controller:raise_event_on_slave_node(Name, request),
receive
'$scheduled' ->
execute(Caller, Action),
amoc_throttle_controller:raise_event_on_slave_node(Name, execute),
%% If Action failed, unlink won't be called and the caller will receive an exit signal
erlang:unlink(Caller)
end.

-spec execute(pid(), action()) -> term().
execute(Caller, wait) ->
Caller ! '$scheduled';
execute(_Caller, Fun) when is_function(Fun, 0) ->
Fun();
execute(_Caller, {Pid, Msg}) ->
Pid ! Msg.
10 changes: 5 additions & 5 deletions src/throttle/amoc_throttle_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ init([]) ->
restart => permanent,
modules => [amoc_throttle_controller]},
Pooler = #{id => amoc_throttle_pooler,
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
start => {amoc_throttle_pooler, start_link, []},
type => supervisor,
shutdown => infinity,
restart => permanent,
modules => [amoc_throttle_pooler]},
{ok, {SupFlags, [Pg, Controller, Pooler]}}.
62 changes: 50 additions & 12 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

-define(DEFAULT_NO_PROCESSES, 10).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(RECV(Msg, Timeout), receive Msg -> ok after Timeout -> {error, not_received_yet} end).

all() ->
[
Expand All @@ -24,6 +25,9 @@ groups() ->
change_rate,
change_rate_gradually,
send_and_wait,
just_wait,
wait_for_process_to_die_sends_a_kill,
async_runner_dies_while_waiting_raises_exit,
run_with_interval_zero_limits_only_number_of_parallel_executions,
pause_and_resume,
get_state
Expand Down Expand Up @@ -123,12 +127,40 @@ send_and_wait(_) ->
?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)),
%% One message is received sufficiently fast
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch(ok, receive_msg_in_timeout(receive_this, 100)),
?assertMatch(ok, ?RECV(receive_this, 100)),
%% If someone else fills the throttle heavily,
%% it will take proportionally so long to execute for me
fill_throttle(?FUNCTION_NAME, 100 * 10),
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch({error, not_received_yet}, receive_msg_in_timeout(receive_this, 200)).
?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)).

just_wait(_) ->
%% it failts if the throttle wasn't started yet
?assertMatch({error, no_throttle_process_registered},
amoc_throttle:wait(?FUNCTION_NAME)),
%% Start 100-per-10ms throttle with a single process
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)),
%% wait passes fine
?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)),
%% One message is received sufficiently fast
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch(ok, ?RECV(receive_this, 100)),
%% If someone else fills the throttle heavily,
%% it will take proportionally so long to execute for me
fill_throttle(?FUNCTION_NAME, 100 * 10),
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)).

wait_for_process_to_die_sends_a_kill(_) ->
erlang:process_flag(trap_exit, true),
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)),
amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(?FUNCTION_NAME) end),
?assertMatch(ok, ?RECV({'EXIT', _, ?FUNCTION_NAME}, 100)).

async_runner_dies_while_waiting_raises_exit(_) ->
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)),
find_new_link_and_kill_it(self()),
?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)).

run_with_interval_zero_limits_only_number_of_parallel_executions(_) ->
%% Start 10 actions at once in 10 processes
Expand All @@ -137,7 +169,7 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) ->
%% it will take proportionally so long to execute for me
fill_throttle(?FUNCTION_NAME, 100),
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch(ok, receive_msg_in_timeout(receive_this, 200)).
?assertMatch(ok, ?RECV(receive_this, 200)).

pause_and_resume(_) ->
%% Start 100-per-10ms throttle with a single process
Expand All @@ -148,10 +180,10 @@ pause_and_resume(_) ->
?assertMatch(ok, amoc_throttle:pause(?FUNCTION_NAME)),
%% It is paused, so messages aren't received
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch({error, not_received_yet}, receive_msg_in_timeout(receive_this, 200)),
?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)),
%% After resume the message is then received
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
?assertMatch(ok, receive_msg_in_timeout(receive_this, 200)).
?assertMatch(ok, ?RECV(receive_this, 200)).

get_state(_) ->
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 60000, 1)),
Expand Down Expand Up @@ -194,13 +226,19 @@ fill_throttle(Name, Num) ->
continue -> ok
end.

receive_msg_in_timeout(Msg, Timeout) ->
receive
Msg ->
ok
after Timeout ->
{error, not_received_yet}
end.
find_new_link_and_kill_it(Self) ->
erlang:process_flag(trap_exit, true),
{links, OriginalLinks} = erlang:process_info(Self, links),
spawn(?MODULE, kill_async_runner, [Self, OriginalLinks]).

kill_async_runner(Pid, OriginalLinks) ->
GetLinksFun = fun() ->
{links, Links} = erlang:process_info(Pid, links),
Links -- OriginalLinks
end,
Validator = fun(Res) -> 1 =:= length(Res) end,
{ok, [AsyncRunner]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}),
exit(AsyncRunner, kill).

%% Helpers
amoc_do(Scenario) ->
Expand Down