From 0798c8a133a19ac5b5cc4df5671bd3d4098242df Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Thu, 18 Jan 2024 15:23:27 +0100 Subject: [PATCH] Handle case from async_runner when throttler worker dies --- src/throttle/amoc_throttle_runner.erl | 15 ++++---- test/throttle_SUITE.erl | 49 ++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/throttle/amoc_throttle_runner.erl b/src/throttle/amoc_throttle_runner.erl index e756c367..369f4b86 100644 --- a/src/throttle/amoc_throttle_runner.erl +++ b/src/throttle/amoc_throttle_runner.erl @@ -7,7 +7,7 @@ -module(amoc_throttle_runner). -export([throttle/2, run/1]). --export([async_runner/3]). +-export([async_runner/4]). -type action() :: wait | {pid(), term()} | fun(() -> any()). @@ -18,9 +18,9 @@ run(RunnerPid) -> -spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}. throttle(Name, Action) -> case amoc_throttle_process:get_throttle_process(Name) of - {ok, Throttler} -> - RunnerPid = erlang:spawn_link(?MODULE, async_runner, [Name, self(), Action]), - amoc_throttle_process:run(Throttler, RunnerPid), + {ok, ThrottlerPid} -> + RunnerPid = erlang:spawn_link(?MODULE, async_runner, [Name, self(), ThrottlerPid, Action]), + amoc_throttle_process:run(ThrottlerPid, RunnerPid), maybe_wait(Action, RunnerPid); Error -> Error @@ -37,10 +37,13 @@ maybe_wait(wait, RunnerPid) -> maybe_wait(_, _) -> ok. --spec async_runner(amoc_throttle:name(), pid(), term()) -> no_return(). -async_runner(Name, Caller, Action) -> +-spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return(). +async_runner(Name, Caller, ThrottlerPid, Action) -> + ThrottlerMonitor = erlang:monitor(process, ThrottlerPid), amoc_throttle_controller:raise_event_on_slave_node(Name, request), receive + {'DOWN', ThrottlerMonitor, process, ThrottlerPid, Reason} -> + exit({throttler_worker_died, ThrottlerPid, Reason}); '$scheduled' -> execute(Caller, Action), amoc_throttle_controller:raise_event_on_slave_node(Name, execute), diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 92795a90..c4a171f0 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -28,6 +28,7 @@ groups() -> just_wait, wait_for_process_to_die_sends_a_kill, async_runner_dies_while_waiting_raises_exit, + async_runner_dies_when_throttler_dies, run_with_interval_zero_limits_only_number_of_parallel_executions, pause_and_resume, get_state @@ -162,6 +163,16 @@ async_runner_dies_while_waiting_raises_exit(_) -> find_new_link_and_kill_it(self()), ?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)). +async_runner_dies_when_throttler_dies(_) -> + erlang:process_flag(trap_exit, true), + {links, OriginalLinks} = erlang:process_info(self(), links), + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 60000, 1)), + wait_until_one_throttle_worker(?FUNCTION_NAME), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + wait_until_one_async_runner(self(), OriginalLinks), + kill_throttle_workers(?FUNCTION_NAME), + ?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)). + run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> %% Start 10 actions at once in 10 processes ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 10, 0, 1)), @@ -206,16 +217,25 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> end, ?assert(lists:any(IsLowRateEventFn, TelemetryEvents)). +get_throttle_workers(Name) -> + pg:get_members(amoc_throttle, Name). + get_number_of_workers(Name) -> - Processes = pg:get_members(amoc_throttle, Name), + Processes = get_throttle_workers(Name), length(Processes). get_state_of_one_process(Name) -> - Processes = pg:get_members(amoc_throttle, Name), + Processes = get_throttle_workers(Name), ?assertMatch([_ | _], Processes), [Process | _] = Processes, amoc_throttle_process:get_state(Process). +wait_until_one_throttle_worker(Name) -> + GetWorkers = fun() -> get_throttle_workers(Name) end, + Validator = fun(Res) -> 0 < length(Res) end, + {ok, [Worker | _]} = wait_helper:wait_until(GetWorkers, ok, #{validator => Validator}), + Worker. + fill_throttle(Name, Num) -> Parent = self(), spawn(fun() -> @@ -226,16 +246,27 @@ fill_throttle(Name, Num) -> continue -> ok end. -find_new_link_and_kill_it(Self) -> +maybe_get_new_async_runners(Pid, OriginalLinks) -> + {links, Links} = erlang:process_info(Pid, links), + Links -- OriginalLinks. + +wait_until_one_async_runner(Pid, OriginalLinks) -> + GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end, + Validator = fun(Res) -> 0 < length(Res) end, + {ok, [AsyncRunner | _]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}), + AsyncRunner. + +find_new_link_and_kill_it(Pid) -> erlang:process_flag(trap_exit, true), - {links, OriginalLinks} = erlang:process_info(Self, links), - spawn(?MODULE, kill_async_runner, [Self, OriginalLinks]). + {links, OriginalLinks} = erlang:process_info(Pid, links), + spawn(?MODULE, kill_async_runner, [Pid, OriginalLinks]). + +kill_throttle_workers(Name) -> + Workers = get_throttle_workers(Name), + [ exit(Worker, kill) || Worker <- Workers ]. kill_async_runner(Pid, OriginalLinks) -> - GetLinksFun = fun() -> - {links, Links} = erlang:process_info(Pid, links), - Links -- OriginalLinks - end, + GetLinksFun = fun() -> maybe_get_new_async_runners(Pid, OriginalLinks) end, Validator = fun(Res) -> 1 =:= length(Res) end, {ok, [AsyncRunner]} = wait_helper:wait_until(GetLinksFun, ok, #{validator => Validator}), exit(AsyncRunner, kill).