diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index b19ff836..b76c9ee1 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -128,9 +128,13 @@ init({Name, Interval, Rate}) -> StateWithTimer = maybe_start_timer(InitialState), {ok, StateWithTimer#state{name = Name}, timeout(InitialState)}. --spec handle_info(term(), state()) -> {noreply, state(), {continue, maybe_run_fn}}. -handle_info({'DOWN', _, process, _, _}, State) -> +-spec handle_info(term(), state()) -> + {noreply, state(), {continue, maybe_run_fn}} | {stop, atom(), state()}. +handle_info({'DOWN', _, process, _, normal}, State) -> {noreply, inc_n(State), {continue, maybe_run_fn}}; +handle_info({'DOWN', _, process, Pid, Reason}, State) -> + %% The async_runner crashed and the operation will never happen, test is invalid + {stop, {error, {async_runner_died, Pid, Reason}}, State}; handle_info(delay_between_executions, State) -> {noreply, State#state{can_run_fn = true}, {continue, maybe_run_fn}}; handle_info(timeout, State) -> diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 587897a5..cd0e6a52 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -9,7 +9,8 @@ all() -> [ - {group, api} + {group, api}, + {group, crashes} ]. groups() -> @@ -27,25 +28,45 @@ groups() -> run_with_interval_zero_limits_only_number_of_parallel_executions, pause_and_resume, get_state + ]}, + {crashes, [sequence], + [ + async_runner_dies_ungrafecully ]} ]. init_per_suite(Config) -> + Config. + +end_per_suite(_) -> + ok. + +init_per_group(api, Config) -> application:ensure_all_started(amoc), amoc_cluster:set_master_node(node()), TelemetryEvents = [[amoc, throttle, Event] || Event <- [init, rate, request, execute, process]], telemetry_helpers:start(TelemetryEvents), + Config; +init_per_group(_, Config) -> Config. -end_per_suite(_) -> +end_per_group(api, _) -> application:stop(amoc), - telemetry_helpers:stop(), + telemetry_helpers:stop(); +end_per_group(_, _) -> ok. +init_per_testcase(async_runner_dies_ungrafecully, Config) -> + application:ensure_all_started(amoc), + amoc_cluster:set_master_node(node()), + Config; init_per_testcase(_, Config) -> Config. -end_per_testcase(_, _Config) -> +end_per_testcase(async_runner_dies_ungrafecully, _) -> + application:stop(amoc), + ok; +end_per_testcase(_, _) -> ok. %%----------------------------------------------------------------------------------- @@ -161,6 +182,12 @@ get_state(_) -> delay_between_executions := 600}, State). +async_runner_dies_ungrafecully(_) -> + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 10, 0, 1)), + Workers = get_workers(?FUNCTION_NAME), + amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(bad) end), + WaitUntilFun = fun() -> Workers =/= get_workers(?FUNCTION_NAME) end, + wait_helper:wait_until(WaitUntilFun, true). %% Helpers assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> @@ -174,6 +201,9 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> end, ?assert(lists:any(IsLowRateEventFn, TelemetryEvents)). +get_workers(Name) -> + pg:get_members(amoc_throttle, Name). + get_number_of_workers(Name) -> Processes = pg:get_members(amoc_throttle, Name), length(Processes).