Skip to content

Commit

Permalink
[pause_proc_timer][6/n] Add pause_proc_timer option to suspend_process/2
Browse files Browse the repository at this point in the history
We add a new `pause_proc_timer` option to the `erlang:suspend_process/2`
BIF. When given, the process is not only suspended, but its proc timer,
if set, will be paused, and resumed when the process is resumed.

This means that if the paused process is waiting on a `receive`, it will not
timeout even if suspended for long.

We add testcases for this functionality
  • Loading branch information
jcpetruzza committed Jul 19, 2024
1 parent 707550a commit 36bef84
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
1 change: 1 addition & 0 deletions erts/emulator/beam/atom.names
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ atom parent
atom Plus='+'
atom PlusPlus='++'
atom pause
atom pause_proc_timer
atom pending
atom pending_driver
atom pending_process
Expand Down
32 changes: 30 additions & 2 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -8895,6 +8895,14 @@ erts_start_schedulers(void)
}
}

static Eterm
rpc_pause_proc_timer(Process *c_p, void *, int *, ErlHeapFragment **)
{
erts_pause_proc_timer(c_p);
return am_ok;
}


BIF_RETTYPE
erts_internal_suspend_process_2(BIF_ALIST_2)
{
Expand All @@ -8905,6 +8913,8 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
int sync = 0;
int async = 0;
int unless_suspending = 0;
int pause_proc_timer = 0;
int proc_timer_already_paused = 0;
erts_aint64_t mstate;
ErtsMonitorSuspend *msp;
ErtsMonitorData *mdp;
Expand All @@ -8929,6 +8939,9 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
case am_asynchronous:
async = 1;
break;
case am_pause_proc_timer:
pause_proc_timer = 1;
break;
default: {
if (is_tuple_arity(arg, 2)) {
Eterm *tp = tuple_val(arg);
Expand Down Expand Up @@ -8994,6 +9007,13 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
}
}

if (pause_proc_timer) {
erts_aint64_t already_paused_flags = ERTS_MSUSPEND_STATE_FLG_PAUSE_TIMER
| ERTS_MSUSPEND_STATE_FLG_ACTIVE;
mstate = erts_atomic64_read_bor_nob(&msp->state, ERTS_MSUSPEND_STATE_FLG_PAUSE_TIMER);
proc_timer_already_paused = (mstate & already_paused_flags) == already_paused_flags;
}

if (suspend) {
erts_aint32_t state;
Process *rp;
Expand All @@ -9014,7 +9034,7 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
if (rp == ERTS_PROC_LOCK_BUSY)
send_sig = !0;
else {
send_sig = !suspend_process(BIF_P, rp, 0 /* no pause timer */);
send_sig = !suspend_process(BIF_P, rp, pause_proc_timer);
if (!send_sig) {
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(rp), &mdp->u.target);
erts_atomic64_read_bor_relb(&msp->state,
Expand All @@ -9035,6 +9055,14 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
res = am_badarg;
}
}
} else if (pause_proc_timer && !proc_timer_already_paused) {
erts_proc_sig_send_rpc_request_prio(BIF_P,
BIF_ARG_1,
0, /* no reply */
rpc_pause_proc_timer,
NULL,
PRIORITY_HIGH);

}

if (sync) {
Expand Down Expand Up @@ -9237,7 +9265,7 @@ erts_process_status(Process *rp, Eterm rpid)
}

/*
** Suspend a currently executing process
** Suspend a currently executing process
** If we are to suspend on a port the busy_port is the thing
** otherwise busy_port is NIL
*/
Expand Down
60 changes: 59 additions & 1 deletion erts/emulator/test/process_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
%% exit/1
%% exit/2
%% process_info/1,2
%% suspend_process/2 (partially)
%% register/2 (partially)

-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").

-define(heap_binary_size, 64).

-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
init_per_group/2,end_per_group/2, spawn_with_binaries/1,
t_exit_1/1, t_exit_2_other/1, t_exit_2_other_normal/1,
self_exit/1, normal_suicide_exit/1, abnormal_suicide_exit/1,
Expand All @@ -55,6 +56,8 @@
process_info_self_msgq_len_more/1,
process_info_msgq_len_no_very_long_delay/1,
process_info_dict_lookup/1,
suspend_process_pausing_proc_timer/1,
suspend_process_pausing_proc_timer_after_suspended/1,
bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1,
otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1,
garbage_collect/1, otp_6237/1,
Expand Down Expand Up @@ -130,6 +133,7 @@ all() ->
otp_6237,
{group, spawn_request},
{group, process_info_bif},
{group, suspend_process_bif},
{group, processes_bif},
{group, otp_7738}, garb_other_running,
{group, system_task},
Expand Down Expand Up @@ -183,6 +187,9 @@ groups() ->
process_info_self_msgq_len_more,
process_info_msgq_len_no_very_long_delay,
process_info_dict_lookup]},
{suspend_process_bif, [],
[suspend_process_pausing_proc_timer,
suspend_process_pausing_proc_timer_after_suspended]},
{otp_7738, [],
[otp_7738_waiting, otp_7738_suspended,
otp_7738_resume]},
Expand Down Expand Up @@ -1754,6 +1761,57 @@ proc_dict_helper() ->
end,
proc_dict_helper().

suspend_process_pausing_proc_timer(_Config) ->
BeforeSuspend = fun(_Pid) -> ok end,
AfterResume = fun(_Pid) -> ok end,
suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume),
ok.

suspend_process_pausing_proc_timer_after_suspended(_Config) ->
% We suspend the process once before using pause_proc_timer
BeforeSuspend = fun(Pid) -> true = erlang:suspend_process(Pid) end,
AfterResume = fun(Pid) -> true = erlang:resume_process(Pid) end,
suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume),
ok.

suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume) ->
TcProc = self(),
Pid = erlang:spawn_link(
fun() ->
TcProc ! {sync, self()},
receive go -> ok
after 2_000 -> exit(timer_not_paused)
end,
TcProc ! {sync, self()},
receive _ -> error(unexpected)
after 2_000 -> ok
end,
TcProc ! {sync, self()}
end
),

WaitForSync = fun () ->
receive {sync, Pid} -> ok
after 10_000 -> error(timeout)
end
end,

WaitForSync(),
BeforeSuspend(Pid),
true = erlang:suspend_process(Pid, [pause_proc_timer]),
timer:sleep(5_000),
true = erlang:resume_process(Pid),
AfterResume(Pid),
timer:sleep(1_000),
Pid ! go,
WaitForSync(),
BeforeSuspend(Pid),
true = erlang:suspend_process(Pid, [pause_proc_timer]),
true = erlang:resume_process(Pid),
AfterResume(Pid),
WaitForSync(),
ok.

%% Tests erlang:bump_reductions/1.
bump_reductions(Config) when is_list(Config) ->
erlang:garbage_collect(),
Expand Down
7 changes: 6 additions & 1 deletion erts/preloaded/src/erlang.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5845,6 +5845,10 @@ Options (`Opt`s):
Apart from the reply message, the `{asynchronous, ReplyTag}` option behaves
exactly the same as the `asynchronous` option without reply tag.
- **`pause_proc_timer`** - If `Suspendee` is waiting on a message, pause the timer
associated with the `after` clause. The timer will be resumed when the process
is unsuspended.
- **`unless_suspending`** - The process identified by `Suspendee` is suspended
unless the calling process already is suspending `Suspendee`. If
`unless_suspending` is combined with option `asynchronous`, a suspend request
Expand Down Expand Up @@ -5891,7 +5895,8 @@ Failures:
-spec suspend_process(Suspendee, OptList) -> boolean() when
Suspendee :: pid(),
OptList :: [Opt],
Opt :: unless_suspending | asynchronous | {asynchronous, term()}.
Opt :: unless_suspending | pause_proc_timer
| asynchronous | {asynchronous, term()}.
suspend_process(Suspendee, OptList) ->
case case erts_internal:suspend_process(Suspendee, OptList) of
Ref when erlang:is_reference(Ref) ->
Expand Down
3 changes: 2 additions & 1 deletion erts/preloaded/src/erts_internal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ gather_carrier_info(_) ->
Result :: boolean() | 'badarg' | reference(),
Suspendee :: pid(),
OptList :: [Opt],
Opt :: unless_suspending | asynchronous | {asynchronous, term()}.
Opt :: unless_suspending | pause_proc_timer
| asynchronous | {asynchronous, term()}.

suspend_process(_Suspendee, _OptList) ->
erlang:nif_error(undefined).
Expand Down

0 comments on commit 36bef84

Please sign in to comment.