From b180b17e856287e30a8cc4b8c1847208f51ff1a1 Mon Sep 17 00:00:00 2001 From: Daniel Gorin Date: Wed, 7 Aug 2024 20:04:38 +0100 Subject: [PATCH] [pause_proc_timer][6/n] Add pause_proc_timer option to suspend_process/2 We add a new `pause_proc_timer` option to the `erlang:suspend_process/2` BIF. When given, the process is not only suspended, but its proc timer, if set, will be paused, and resumed when the process is resumed. This means that if the paused process is waiting on a `receive`, it will not timeout even if suspended for long. We add testcases for this functionality --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/erl_process.c | 23 ++++++++++- erts/emulator/test/process_SUITE.erl | 60 +++++++++++++++++++++++++++- erts/preloaded/src/erlang.erl | 7 +++- erts/preloaded/src/erts_internal.erl | 3 +- 5 files changed, 89 insertions(+), 5 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 758c84d1c1d7..07aaa3cfb7ee 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -553,6 +553,7 @@ atom parent atom Plus='+' atom PlusPlus='++' atom pause +atom pause_proc_timer atom pending atom pending_driver atom pending_process diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index ab7dfbdb96d4..152f5d1bd747 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -8946,6 +8946,8 @@ erts_internal_suspend_process_2(BIF_ALIST_2) int sync = 0; int async = 0; int unless_suspending = 0; + int pause_proc_timer = 0; + int proc_timer_already_paused = 0; erts_aint64_t mstate; ErtsMonitorSuspend *msp; ErtsMonitorData *mdp; @@ -8970,6 +8972,9 @@ erts_internal_suspend_process_2(BIF_ALIST_2) case am_asynchronous: async = 1; break; + case am_pause_proc_timer: + pause_proc_timer = 1; + break; default: { if (is_tuple_arity(arg, 2)) { Eterm *tp = tuple_val(arg); @@ -9035,6 +9040,13 @@ erts_internal_suspend_process_2(BIF_ALIST_2) } } + if (pause_proc_timer) { + erts_aint64_t already_paused_flags = ERTS_MSUSPEND_STATE_FLG_PAUSE_TIMER + | ERTS_MSUSPEND_STATE_FLG_ACTIVE; + mstate = erts_atomic64_read_bor_nob(&msp->state, ERTS_MSUSPEND_STATE_FLG_PAUSE_TIMER); + proc_timer_already_paused = (mstate & already_paused_flags) == already_paused_flags; + } + if (suspend) { erts_aint32_t state; Process *rp; @@ -9055,7 +9067,7 @@ erts_internal_suspend_process_2(BIF_ALIST_2) if (rp == ERTS_PROC_LOCK_BUSY) send_sig = !0; else { - send_sig = !suspend_process(BIF_P, rp, 0 /* no pause timer */); + send_sig = !suspend_process(BIF_P, rp, pause_proc_timer); if (!send_sig) { erts_monitor_list_insert(&ERTS_P_LT_MONITORS(rp), &mdp->u.target); erts_atomic64_read_bor_relb(&msp->state, @@ -9076,6 +9088,13 @@ erts_internal_suspend_process_2(BIF_ALIST_2) res = am_badarg; } } + } else if (pause_proc_timer && !proc_timer_already_paused) { + Process *rp = erts_proc_lookup(BIF_ARG_1); + erts_proc_lock(rp, ERTS_PROC_LOCK_STATUS); + + schedule_pause_proc_timer(rp, ERTS_PROC_LOCK_STATUS); + + erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); } if (sync) { @@ -9278,7 +9297,7 @@ erts_process_status(Process *rp, Eterm rpid) } /* -** Suspend a currently executing process +** Suspend a currently executing process ** If we are to suspend on a port the busy_port is the thing ** otherwise busy_port is NIL */ diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index c8b86e4e740b..91eca77564d1 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -24,6 +24,7 @@ %% exit/1 %% exit/2 %% process_info/1,2 +%% suspend_process/2 (partially) %% register/2 (partially) -include_lib("stdlib/include/assert.hrl"). @@ -31,7 +32,7 @@ -define(heap_binary_size, 64). --export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, +-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, spawn_with_binaries/1, t_exit_1/1, t_exit_2_other/1, t_exit_2_other_normal/1, self_exit/1, normal_suicide_exit/1, abnormal_suicide_exit/1, @@ -55,6 +56,8 @@ process_info_self_msgq_len_more/1, process_info_msgq_len_no_very_long_delay/1, process_info_dict_lookup/1, + suspend_process_pausing_proc_timer/1, + suspend_process_pausing_proc_timer_after_suspended/1, bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1, otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1, garbage_collect/1, otp_6237/1, @@ -130,6 +133,7 @@ all() -> otp_6237, {group, spawn_request}, {group, process_info_bif}, + {group, suspend_process_bif}, {group, processes_bif}, {group, otp_7738}, garb_other_running, {group, system_task}, @@ -183,6 +187,9 @@ groups() -> process_info_self_msgq_len_more, process_info_msgq_len_no_very_long_delay, process_info_dict_lookup]}, + {suspend_process_bif, [], + [suspend_process_pausing_proc_timer, + suspend_process_pausing_proc_timer_after_suspended]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, otp_7738_resume]}, @@ -1754,6 +1761,57 @@ proc_dict_helper() -> end, proc_dict_helper(). +suspend_process_pausing_proc_timer(_Config) -> + BeforeSuspend = fun(_Pid) -> ok end, + AfterResume = fun(_Pid) -> ok end, + suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume), + ok. + +suspend_process_pausing_proc_timer_after_suspended(_Config) -> + % We suspend the process once before using pause_proc_timer + BeforeSuspend = fun(Pid) -> true = erlang:suspend_process(Pid) end, + AfterResume = fun(Pid) -> true = erlang:resume_process(Pid) end, + suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume), + ok. + +suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume) -> + TcProc = self(), + Pid = erlang:spawn_link( + fun() -> + TcProc ! {sync, self()}, + receive go -> ok + after 2_000 -> exit(timer_not_paused) + end, + TcProc ! {sync, self()}, + receive _ -> error(unexpected) + after 2_000 -> ok + end, + TcProc ! {sync, self()} + end + ), + + WaitForSync = fun () -> + receive {sync, Pid} -> ok + after 10_000 -> error(timeout) + end + end, + + WaitForSync(), + BeforeSuspend(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + timer:sleep(5_000), + true = erlang:resume_process(Pid), + AfterResume(Pid), + timer:sleep(1_000), + Pid ! go, + WaitForSync(), + BeforeSuspend(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + true = erlang:resume_process(Pid), + AfterResume(Pid), + WaitForSync(), + ok. + %% Tests erlang:bump_reductions/1. bump_reductions(Config) when is_list(Config) -> erlang:garbage_collect(), diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 5095998225bc..fc77b518212a 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -5845,6 +5845,10 @@ Options (`Opt`s): Apart from the reply message, the `{asynchronous, ReplyTag}` option behaves exactly the same as the `asynchronous` option without reply tag. +- **`pause_proc_timer`** - If `Suspendee` is waiting on a message, pause the timer + associated with the `after` clause. The timer will be resumed when the process + is unsuspended. + - **`unless_suspending`** - The process identified by `Suspendee` is suspended unless the calling process already is suspending `Suspendee`. If `unless_suspending` is combined with option `asynchronous`, a suspend request @@ -5891,7 +5895,8 @@ Failures: -spec suspend_process(Suspendee, OptList) -> boolean() when Suspendee :: pid(), OptList :: [Opt], - Opt :: unless_suspending | asynchronous | {asynchronous, term()}. + Opt :: unless_suspending | pause_proc_timer + | asynchronous | {asynchronous, term()}. suspend_process(Suspendee, OptList) -> case case erts_internal:suspend_process(Suspendee, OptList) of Ref when erlang:is_reference(Ref) -> diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 8e9ebc6b4d93..d7ddb95de6cf 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -835,7 +835,8 @@ gather_carrier_info(_) -> Result :: boolean() | 'badarg' | reference(), Suspendee :: pid(), OptList :: [Opt], - Opt :: unless_suspending | asynchronous | {asynchronous, term()}. + Opt :: unless_suspending | pause_proc_timer + | asynchronous | {asynchronous, term()}. suspend_process(_Suspendee, _OptList) -> erlang:nif_error(undefined).