Skip to content

Commit

Permalink
Handle case from async_runner when throttler worker dies
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jan 18, 2024
1 parent ce5dde6 commit 0798c8a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
15 changes: 9 additions & 6 deletions src/throttle/amoc_throttle_runner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
-module(amoc_throttle_runner).

-export([throttle/2, run/1]).
-export([async_runner/3]).
-export([async_runner/4]).

-type action() :: wait | {pid(), term()} | fun(() -> any()).

Expand All @@ -18,9 +18,9 @@ run(RunnerPid) ->
-spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}.
throttle(Name, Action) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Throttler} ->
RunnerPid = erlang:spawn_link(?MODULE, async_runner, [Name, self(), Action]),
amoc_throttle_process:run(Throttler, RunnerPid),
{ok, ThrottlerPid} ->
RunnerPid = erlang:spawn_link(?MODULE, async_runner, [Name, self(), ThrottlerPid, Action]),
amoc_throttle_process:run(ThrottlerPid, RunnerPid),
maybe_wait(Action, RunnerPid);
Error ->
Error
Expand All @@ -37,10 +37,13 @@ maybe_wait(wait, RunnerPid) ->
maybe_wait(_, _) ->
ok.

-spec async_runner(amoc_throttle:name(), pid(), term()) -> no_return().
async_runner(Name, Caller, Action) ->
-spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return().
async_runner(Name, Caller, ThrottlerPid, Action) ->
ThrottlerMonitor = erlang:monitor(process, ThrottlerPid),
amoc_throttle_controller:raise_event_on_slave_node(Name, request),
receive
{'DOWN', ThrottlerMonitor, process, ThrottlerPid, Reason} ->
exit({throttler_worker_died, ThrottlerPid, Reason});
'$scheduled' ->
execute(Caller, Action),
amoc_throttle_controller:raise_event_on_slave_node(Name, execute),
Expand Down
49 changes: 40 additions & 9 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ groups() ->
just_wait,
wait_for_process_to_die_sends_a_kill,
async_runner_dies_while_waiting_raises_exit,
async_runner_dies_when_throttler_dies,
run_with_interval_zero_limits_only_number_of_parallel_executions,
pause_and_resume,
get_state
Expand Down Expand Up @@ -162,6 +163,16 @@ async_runner_dies_while_waiting_raises_exit(_) ->
find_new_link_and_kill_it(self()),
?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)).

async_runner_dies_when_throttler_dies(_) ->
erlang:process_flag(trap_exit, true),
{links, OriginalLinks} = erlang:process_info(self(), links),
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 60000, 1)),
wait_until_one_throttle_worker(?FUNCTION_NAME),
amoc_throttle:send(?FUNCTION_NAME, receive_this),
wait_until_one_async_runner(self(), OriginalLinks),
kill_throttle_workers(?FUNCTION_NAME),
?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)).

run_with_interval_zero_limits_only_number_of_parallel_executions(_) ->
%% Start 10 actions at once in 10 processes
?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 10, 0, 1)),
Expand Down Expand Up @@ -206,16 +217,25 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) ->
end,
?assert(lists:any(IsLowRateEventFn, TelemetryEvents)).

get_throttle_workers(Name) ->
pg:get_members(amoc_throttle, Name).

get_number_of_workers(Name) ->
Processes = pg:get_members(amoc_throttle, Name),
Processes = get_throttle_workers(Name),
length(Processes).

get_state_of_one_process(Name) ->
Processes = pg:get_members(amoc_throttle, Name),
Processes = get_throttle_workers(Name),
?assertMatch([_ | _], Processes),
[Process | _] = Processes,
amoc_throttle_process:get_state(Process).

wait_until_one_throttle_worker(Name) ->
GetWorkers = fun() -> get_throttle_workers(Name) end,
Validator = fun(Res) -> 0 < length(Res) end,
{ok, [Worker | _]} = wait_helper:wait_until(GetWorkers, ok, #{validator => Validator}),
Worker.

fill_throttle(Name, Num) ->
Parent = self(),
spawn(fun() ->
Expand All @@ -226,16 +246,27 @@ fill_throttle(Name, Num) ->
continue -> ok
end.

find_new_link_and_kill_it(Self) ->
maybe_get_new_async_runners(Pid, OriginalLinks) ->
{links, Links} = erlang:process_info(Pid, links),
Links -- OriginalLinks.

wait_until_one_async_runner(Pid, OriginalLinks) ->
GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end,
Validator = fun(Res) -> 0 < length(Res) end,
{ok, [AsyncRunner | _]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}),
AsyncRunner.

find_new_link_and_kill_it(Pid) ->
erlang:process_flag(trap_exit, true),
{links, OriginalLinks} = erlang:process_info(Self, links),
spawn(?MODULE, kill_async_runner, [Self, OriginalLinks]).
{links, OriginalLinks} = erlang:process_info(Pid, links),
spawn(?MODULE, kill_async_runner, [Pid, OriginalLinks]).

kill_throttle_workers(Name) ->
Workers = get_throttle_workers(Name),
[ exit(Worker, kill) || Worker <- Workers ].

kill_async_runner(Pid, OriginalLinks) ->
GetLinksFun = fun() ->
{links, Links} = erlang:process_info(Pid, links),
Links -- OriginalLinks
end,
GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end,
Validator = fun(Res) -> 1 =:= length(Res) end,
{ok, [AsyncRunner]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}),
exit(AsyncRunner, kill).
Expand Down

0 comments on commit 0798c8a

Please sign in to comment.