diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index 71deb7f2..19885869 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -9,6 +9,7 @@ send/3, send/2, send_and_wait/2, + wait/1, run/2, pause/1, resume/1, @@ -16,6 +17,8 @@ change_rate_gradually/6, stop/1]). +-deprecated([{send_and_wait, 2, "use wait/1 instead"}]). + -define(DEFAULT_NO_PROCESSES, 10). -define(DEFAULT_INTERVAL, 60000). %% one minute -define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)). @@ -123,40 +126,38 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSt %% deactivate Async runner %% Async runner ->Throttle process:'DOWN' %% destroy Async runner -%% ''' +%% ''' %% for the local execution, req/exec rates are increased only by throttle process. -spec run(name(), fun(() -> any())) -> ok | {error, any()}. run(Name, Fn) -> - amoc_throttle_controller:run(Name, Fn). + amoc_throttle_runner:throttle(Name, Fn). + +%% @see send/3 +%% @doc Sends a given message to `erlang:self()' +-spec send(name(), any()) -> ok | {error, any()}. +send(Name, Msg) -> + amoc_throttle_runner:throttle(Name, {self(), Msg}). %% @doc Sends a given message `Msg' to a given `Pid' when the rate for `Name' allows for that. %% %% May be used to schedule tasks. -spec send(name(), pid(), any()) -> ok | {error, any()}. send(Name, Pid, Msg) -> - run(Name, fun() -> Pid ! Msg end). - -%% @doc Sends a given message to `erlang:self()' -%% @see send/3 --spec send(name(), any()) -> ok | {error, any()}. -send(Name, Msg) -> - send(Name, self(), Msg). + amoc_throttle_runner:throttle(Name, {Pid, Msg}). %% @doc Sends and receives the given message `Msg'. -%% Can be used to halt execution if we want a process to be idle when waiting for rate increase or other processes finishing their tasks. +%% +%% Deprecated in favour of `wait/1' -spec send_and_wait(name(), any()) -> ok | {error, any()}. -send_and_wait(Name, Msg) -> - case send(Name, Msg) of - ok -> - receive - Msg -> ok - end; - Error -> - Error - end. +send_and_wait(Name, _) -> + amoc_throttle_runner:throttle(Name, wait). + +%% @doc Blocks the caller until the throttle mechanism allows. +-spec wait(name()) -> ok | {error, any()}. +wait(Name) -> + amoc_throttle_runner:throttle(Name, wait). %% @doc Stops the throttle mechanism for the given `Name'. -spec stop(name()) -> ok | {error, any()}. stop(Name) -> - amoc_throttle_controller:stop(Name), - ok. + amoc_throttle_controller:stop(Name). diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index b07bf64a..94e97a22 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -1,6 +1,7 @@ %% @private %% @see amoc_throttle %% @copyright 2024 Erlang Solutions Ltd. +%% @doc Manages throttle processes and rate changes. -module(amoc_throttle_controller). -behaviour(gen_server). @@ -10,7 +11,7 @@ ensure_throttle_processes_started/4, pause/1, resume/1, stop/1, change_rate/3, change_rate_gradually/6, - run/2, telemetry_event/2]). + raise_event_on_slave_node/2, telemetry_event/2]). %% gen_server callbacks -export([init/1, @@ -38,6 +39,7 @@ -type change_rate_plan() :: #change_rate_plan{}. -type throttle_info() :: #throttle_info{}. -type state() :: #{name() => throttle_info()}. +-type event() :: init | execute | request. %%%=================================================================== %%% API @@ -48,29 +50,14 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(ensure_throttle_processes_started(name(), amoc_throttle:rate(), +-spec ensure_throttle_processes_started(name(), amoc_throttle:rate(), amoc_throttle:interval(), pos_integer()) -> {ok, started | already_started} | - {error, wrong_reconfiguration | wrong_no_of_procs}). + {error, wrong_reconfiguration | wrong_no_of_procs}. ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses) -> - maybe_raise_event(Name, init), + raise_event_on_slave_node(Name, init), gen_server:call(?MASTER_SERVER, {start_processes, Name, Rate, Interval, NoOfProcesses}). --spec run(name(), fun(() -> any())) -> ok | {error, any()}. -run(Name, Fn) -> - case amoc_throttle_process:get_throttle_process(Name) of - {ok, Pid} -> - maybe_raise_event(Name, request), - Fun = - fun() -> - maybe_raise_event(Name, execute), - Fn() - end, - amoc_throttle_process:run(Pid, Fun), - ok; - Error -> Error - end. - -spec pause(name()) -> ok | {error, any()}. pause(Name) -> gen_server:call(?MASTER_SERVER, {pause, Name}). @@ -98,6 +85,15 @@ stop(Name) -> telemetry_event(Name, Event) when Event =:= request; Event =:= execute -> raise_event(Name, Event). +%% The purpose of this function is to ensure that there are no event duplicates if we are running in +%% a single (non-distributed) node, as the throttle process will already raise this event. +-spec raise_event_on_slave_node(name(), event()) -> ok. +raise_event_on_slave_node(Name, Event) -> + case amoc_cluster:master_node() =:= node() of + true -> ok; + _ -> raise_event(Name, Event) + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -204,12 +200,6 @@ handle_info({change_plan, Name}, State) -> %%% Internal functions %%%=================================================================== -maybe_raise_event(Name, Event) -> - case amoc_cluster:master_node() =:= node() of - true -> ok; - _ -> raise_event(Name, Event) - end. - raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init -> amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}). diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index a8aeeaf4..9a56b37e 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -1,6 +1,8 @@ %% @private %% @see amoc_throttle %% @copyright 2024 Erlang Solutions Ltd. +%% @doc This process's only responsibility is to notify runners that +%% they can run exactly when allowed by the throttling mechanism. -module(amoc_throttle_process). -behaviour(gen_server). @@ -35,8 +37,8 @@ interval = 0 :: amoc_throttle:interval(), %%ms delay_between_executions = 0 :: non_neg_integer(), %%ms tref :: timer:tref() | undefined, - schedule = [] :: [pid()], - schedule_reversed = [] :: [pid()]}). + schedule = [] :: [AmocThrottleRunnerProcess :: pid()], + schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}). -type state() :: #state{}. %%------------------------------------------------------------------------------ @@ -51,9 +53,8 @@ start_link(Name, Interval, Rate) -> stop(Pid) -> gen_server:cast(Pid, stop_process). --spec run(pid(), fun(() -> any())) -> ok. -run(Pid, Fun) -> - RunnerPid = spawn(fun() -> async_runner(Fun) end), +-spec run(pid(), pid()) -> ok. +run(Pid, RunnerPid) -> gen_server:cast(Pid, {schedule, RunnerPid}). -spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok. @@ -216,15 +217,10 @@ maybe_run_fn(State) -> run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) -> erlang:monitor(process, RunnerPid), - RunnerPid ! scheduled, + amoc_throttle_runner:run(RunnerPid), amoc_throttle_controller:telemetry_event(Name, execute), State#state{schedule = T, n = N - 1}. -async_runner(Fun) -> - receive - scheduled -> Fun() - end. - timeout(State) -> State#state.interval + ?DEFAULT_MSG_TIMEOUT. diff --git a/src/throttle/amoc_throttle_runner.erl b/src/throttle/amoc_throttle_runner.erl new file mode 100644 index 00000000..358d2f64 --- /dev/null +++ b/src/throttle/amoc_throttle_runner.erl @@ -0,0 +1,61 @@ +%% @private +%% @see amoc_throttle +%% @copyright 2024 Erlang Solutions Ltd. +%% @doc Asynchronous runner that always runs together with the requesting process. +%% +%% Knows how to distinguist between the different operations the caller needs. +-module(amoc_throttle_runner). + +-export([throttle/2, run/1]). +-export([async_runner/4]). + +-type action() :: wait | {pid(), term()} | fun(() -> any()). + +-spec run(pid()) -> term(). +run(RunnerPid) -> + RunnerPid ! '$scheduled'. + +-spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}. +throttle(Name, Action) -> + case amoc_throttle_process:get_throttle_process(Name) of + {ok, ThrottlerPid} -> + Args = [Name, self(), ThrottlerPid, Action], + RunnerPid = erlang:spawn_link(?MODULE, async_runner, Args), + amoc_throttle_process:run(ThrottlerPid, RunnerPid), + maybe_wait(Action, RunnerPid); + Error -> + Error + end. + +-spec maybe_wait(action(), pid()) -> ok. +maybe_wait(wait, RunnerPid) -> + receive + {'EXIT', RunnerPid, Reason} -> + exit({throttle_wait_died, RunnerPid, Reason}); + '$scheduled' -> + ok + end; +maybe_wait(_, _) -> + ok. + +-spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return(). +async_runner(Name, Caller, ThrottlerPid, Action) -> + ThrottlerMonitor = erlang:monitor(process, ThrottlerPid), + amoc_throttle_controller:raise_event_on_slave_node(Name, request), + receive + {'DOWN', ThrottlerMonitor, process, ThrottlerPid, Reason} -> + exit({throttler_worker_died, ThrottlerPid, Reason}); + '$scheduled' -> + execute(Caller, Action), + amoc_throttle_controller:raise_event_on_slave_node(Name, execute), + %% If Action failed, unlink won't be called and the caller will receive an exit signal + erlang:unlink(Caller) + end. + +-spec execute(pid(), action()) -> term(). +execute(Caller, wait) -> + Caller ! '$scheduled'; +execute(_Caller, Fun) when is_function(Fun, 0) -> + Fun(); +execute(_Caller, {Pid, Msg}) -> + Pid ! Msg. diff --git a/src/throttle/amoc_throttle_sup.erl b/src/throttle/amoc_throttle_sup.erl index 0d709ee8..01b182bc 100644 --- a/src/throttle/amoc_throttle_sup.erl +++ b/src/throttle/amoc_throttle_sup.erl @@ -49,9 +49,9 @@ init([]) -> restart => permanent, modules => [amoc_throttle_controller]}, Pooler = #{id => amoc_throttle_pooler, - start => {amoc_throttle_pooler, start_link, []}, - type => supervisor, - shutdown => infinity, - restart => permanent, - modules => [amoc_throttle_pooler]}, + start => {amoc_throttle_pooler, start_link, []}, + type => supervisor, + shutdown => infinity, + restart => permanent, + modules => [amoc_throttle_pooler]}, {ok, {SupFlags, [Pg, Controller, Pooler]}}. diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 587897a5..9eb65c9e 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -6,6 +6,7 @@ -define(DEFAULT_NO_PROCESSES, 10). -define(DEFAULT_INTERVAL, 60000). %% one minute +-define(RECV(Msg, Timeout), receive Msg -> ok after Timeout -> {error, not_received_yet} end). all() -> [ @@ -24,6 +25,10 @@ groups() -> change_rate, change_rate_gradually, send_and_wait, + just_wait, + wait_for_process_to_die_sends_a_kill, + async_runner_dies_while_waiting_raises_exit, + async_runner_dies_when_throttler_dies, run_with_interval_zero_limits_only_number_of_parallel_executions, pause_and_resume, get_state @@ -123,12 +128,50 @@ send_and_wait(_) -> ?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), %% One message is received sufficiently fast amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, receive_msg_in_timeout(receive_this, 100)), + ?assertMatch(ok, ?RECV(receive_this, 100)), %% If someone else fills the throttle heavily, %% it will take proportionally so long to execute for me fill_throttle(?FUNCTION_NAME, 100 * 10), amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch({error, not_received_yet}, receive_msg_in_timeout(receive_this, 200)). + ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). + +just_wait(_) -> + %% it failts if the throttle wasn't started yet + ?assertMatch({error, no_throttle_process_registered}, + amoc_throttle:wait(?FUNCTION_NAME)), + %% Start 100-per-10ms throttle with a single process + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + %% wait passes fine + ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), + %% One message is received sufficiently fast + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch(ok, ?RECV(receive_this, 100)), + %% If someone else fills the throttle heavily, + %% it will take proportionally so long to execute for me + fill_throttle(?FUNCTION_NAME, 100 * 10), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). + +wait_for_process_to_die_sends_a_kill(_) -> + erlang:process_flag(trap_exit, true), + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(?FUNCTION_NAME) end), + ?assertMatch(ok, ?RECV({'EXIT', _, ?FUNCTION_NAME}, 100)). + +async_runner_dies_while_waiting_raises_exit(_) -> + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)), + find_new_link_and_kill_it(self()), + ?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)). + +async_runner_dies_when_throttler_dies(_) -> + erlang:process_flag(trap_exit, true), + {links, OriginalLinks} = erlang:process_info(self(), links), + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 60000, 1)), + wait_until_one_throttle_worker(?FUNCTION_NAME), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + wait_until_one_async_runner(self(), OriginalLinks), + amoc_throttle:stop(?FUNCTION_NAME), + ?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)). run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> %% Start 10 actions at once in 10 processes @@ -137,7 +180,7 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> %% it will take proportionally so long to execute for me fill_throttle(?FUNCTION_NAME, 100), amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, receive_msg_in_timeout(receive_this, 200)). + ?assertMatch(ok, ?RECV(receive_this, 200)). pause_and_resume(_) -> %% Start 100-per-10ms throttle with a single process @@ -148,10 +191,10 @@ pause_and_resume(_) -> ?assertMatch(ok, amoc_throttle:pause(?FUNCTION_NAME)), %% It is paused, so messages aren't received amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch({error, not_received_yet}, receive_msg_in_timeout(receive_this, 200)), + ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)), %% After resume the message is then received ?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)), - ?assertMatch(ok, receive_msg_in_timeout(receive_this, 200)). + ?assertMatch(ok, ?RECV(receive_this, 200)). get_state(_) -> ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 60000, 1)), @@ -174,16 +217,25 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> end, ?assert(lists:any(IsLowRateEventFn, TelemetryEvents)). +get_throttle_workers(Name) -> + pg:get_members(amoc_throttle, Name). + get_number_of_workers(Name) -> - Processes = pg:get_members(amoc_throttle, Name), + Processes = get_throttle_workers(Name), length(Processes). get_state_of_one_process(Name) -> - Processes = pg:get_members(amoc_throttle, Name), + Processes = get_throttle_workers(Name), ?assertMatch([_ | _], Processes), [Process | _] = Processes, amoc_throttle_process:get_state(Process). +wait_until_one_throttle_worker(Name) -> + GetWorkers = fun() -> get_throttle_workers(Name) end, + Validator = fun(Res) -> 0 < length(Res) end, + {ok, [Worker | _]} = wait_helper:wait_until(GetWorkers, ok, #{validator => Validator}), + Worker. + fill_throttle(Name, Num) -> Parent = self(), spawn(fun() -> @@ -194,13 +246,26 @@ fill_throttle(Name, Num) -> continue -> ok end. -receive_msg_in_timeout(Msg, Timeout) -> - receive - Msg -> - ok - after Timeout -> - {error, not_received_yet} - end. +maybe_get_new_async_runners(Pid, OriginalLinks) -> + {links, Links} = erlang:process_info(Pid, links), + Links -- OriginalLinks. + +wait_until_one_async_runner(Pid, OriginalLinks) -> + GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end, + Validator = fun(Res) -> 0 < length(Res) end, + {ok, [AsyncRunner | _]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}), + AsyncRunner. + +find_new_link_and_kill_it(Pid) -> + erlang:process_flag(trap_exit, true), + {links, OriginalLinks} = erlang:process_info(Pid, links), + spawn(?MODULE, kill_async_runner, [Pid, OriginalLinks]). + +kill_async_runner(Pid, OriginalLinks) -> + GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end, + Validator = fun(Res) -> 1 =:= length(Res) end, + {ok, [AsyncRunner]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}), + exit(AsyncRunner, kill). %% Helpers amoc_do(Scenario) ->