Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to pause proc timer to erlang:suspend_process/2 #8670

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions erts/emulator/beam/atom.names
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ atom parent
atom Plus='+'
atom PlusPlus='++'
atom pause
atom pause_proc_timer
atom pending
atom pending_driver
atom pending_process
Expand Down Expand Up @@ -615,6 +616,7 @@ atom reset
atom reset_seq_trace
atom restart
atom resume
atom resume_proc_timer
atom return_from
atom return_to
atom return_to_trace
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/bif.tab
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ bif erlang:seq_trace_info/1
bif erlang:seq_trace_print/1
bif erlang:seq_trace_print/2
bif erts_internal:suspend_process/2
bif erlang:resume_process/1
bif erlang:resume_process/2
bif erts_internal:process_display/2

bif erlang:bump_reductions/1
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_alloc.types
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type TIMER_SERVICE LONG_LIVED SYSTEM timer_service
type LL_PTIMER FIXED_SIZE PROCESSES ll_ptimer
type HL_PTIMER FIXED_SIZE PROCESSES hl_ptimer
type BIF_TIMER FIXED_SIZE PROCESSES bif_timer
type PAUSED_TIMER STANDARD PROCESSES paused_timer
type TIMER_REQUEST SHORT_LIVED PROCESSES timer_request
type BTM_YIELD_STATE SHORT_LIVED PROCESSES btm_yield_state
type REG_TABLE STANDARD SYSTEM reg_tab
Expand Down
113 changes: 113 additions & 0 deletions erts/emulator/beam/erl_hl_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ typedef enum {
#define ERTS_TMR_ROFLG_PROC (((Uint32) 1) << 15)
#define ERTS_TMR_ROFLG_PORT (((Uint32) 1) << 16)
#define ERTS_TMR_ROFLG_CALLBACK (((Uint32) 1) << 17)
#define ERTS_TMR_ROFLG_PAUSED (((Uint32) 1) << 18)

#define ERTS_TMR_ROFLG_SID_MASK \
(ERTS_TMR_ROFLG_HLT - (Uint32) 1)
Expand Down Expand Up @@ -205,6 +206,12 @@ typedef union {
ErtsBifTimer btm;
} ErtsTimer;

typedef struct {
ErtsTmrHead head; /* NEED to be first! */
Sint64 time_left_in_msec;
int count;
} ErtsPausedProcTimer;

typedef ErtsTimer *(*ErtsCreateTimerFunc)(ErtsSchedulerData *esdp,
ErtsMonotonicTime timeout_pos,
int short_time, ErtsTmrType type,
Expand Down Expand Up @@ -950,6 +957,54 @@ create_tw_timer(ErtsSchedulerData *esdp,
return (ErtsTimer *) tmr;
}

/*
* Paused proc timers
*/
static ERTS_INLINE ErtsPausedProcTimer *
create_paused_proc_timer(Process *c_p)
{
ErtsPausedProcTimer *result = NULL;
erts_aint_t itmr = erts_atomic_read_nob(&c_p->common.timer);

if (itmr != ERTS_PTMR_NONE && itmr != ERTS_PTMR_TIMEDOUT) {
ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
ErtsTimer *tmr = (ErtsTimer *)itmr;

if (tmr->head.roflgs & ERTS_TMR_ROFLG_PAUSED) {
// The process timer was already paused, reuse the paused timer
ErtsPausedProcTimer *pptmr = (ErtsPausedProcTimer*) tmr;
pptmr->count++;
} else {
int is_hlt = !!(tmr->head.roflgs & ERTS_TMR_ROFLG_HLT);
ErtsMonotonicTime timeout_pos;

ASSERT(tmr->head.roflgs & ERTS_TMR_ROFLG_PROC);

result = erts_alloc(ERTS_ALC_T_PAUSED_TIMER,
sizeof(ErtsPausedProcTimer));
result->head.roflgs = tmr->head.roflgs | ERTS_TMR_ROFLG_PAUSED;
erts_atomic32_init_nob(&result->head.refc, 1);
result->head.receiver.proc = tmr->head.receiver.proc;

timeout_pos = (is_hlt
? tmr->hlt.timeout
: erts_tweel_read_timeout(&tmr->twt.u.tw_tmr));
result->time_left_in_msec = get_time_left(esdp, timeout_pos);
result->count = 1;
}
}

return result;
}

static ERTS_INLINE void
paused_proc_timer_dec_refc(ErtsPausedProcTimer *pptmr)
{
if (erts_atomic32_dec_read_relb(&pptmr->head.refc) == 0) {
erts_free(ERTS_ALC_T_PAUSED_TIMER, (void *) pptmr);
}
}

/*
* Basic high level timer stuff
*/
Expand Down Expand Up @@ -1665,6 +1720,11 @@ continue_cancel_ptimer(ErtsSchedulerData *esdp, ErtsTimer *tmr)
{
Uint32 sid = (tmr->head.roflgs & ERTS_TMR_ROFLG_SID_MASK);

if (tmr->head.roflgs & ERTS_TMR_ROFLG_PAUSED) {
paused_proc_timer_dec_refc((ErtsPausedProcTimer*) tmr);
return;
}

if (esdp->no != sid)
queue_canceled_timer(esdp, sid, tmr);
else
Expand Down Expand Up @@ -2714,6 +2774,59 @@ erts_cancel_proc_timer(Process *c_p)
(ErtsTimer *) tval);
}

void
erts_pause_proc_timer(Process *c_p)
{
ErtsPausedProcTimer *pptmr;

ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_STATUS)
& erts_proc_lc_my_proc_locks(c_p));

pptmr = create_paused_proc_timer(c_p);
if (!pptmr) {
return;
}

CANCEL_TIMER(c_p);

erts_atomic_set_nob(&c_p->common.timer, (erts_aint_t) pptmr);
}

int
erts_resume_paused_proc_timer(Process *c_p)
{
erts_aint_t timer;
int resumed_timer = 0;

ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_STATUS)
& erts_proc_lc_my_proc_locks(c_p));

timer = erts_atomic_xchg_nob(&c_p->common.timer, ERTS_PTMR_NONE);

ASSERT(timer != ERTS_PTMR_TIMEDOUT);

if (timer != ERTS_PTMR_NONE) {
UWord tmo = 0;
ErtsPausedProcTimer *pptmr = (ErtsPausedProcTimer *)timer;

ASSERT(pptmr->head.roflgs & ERTS_TMR_ROFLG_PAUSED);

pptmr->count -= 1;
if (pptmr->count == 0) {
if (pptmr->time_left_in_msec > 0) {
ASSERT((pptmr->time_left_in_msec >> 32) == 0);
tmo = (UWord) pptmr->time_left_in_msec;
}

erts_set_proc_timer_uword(c_p, tmo);
paused_proc_timer_dec_refc(pptmr);
resumed_timer = 1;
}
}

return resumed_timer;
}

void
erts_set_port_timer(Port *c_prt, Sint64 tmo)
{
Expand Down
2 changes: 2 additions & 0 deletions erts/emulator/beam/erl_hl_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ size_t erts_timer_type_size(ErtsAlcType_t type);
int erts_set_proc_timer_term(Process *, Eterm);
void erts_set_proc_timer_uword(Process *, UWord);
void erts_cancel_proc_timer(Process *);
void erts_pause_proc_timer(Process *);
int erts_resume_paused_proc_timer(Process *);
void erts_set_port_timer(Port *, Sint64);
void erts_cancel_port_timer(Port *);
Sint64 erts_read_port_timer(Port *);
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_monitor_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name,

msp->next = NULL;
erts_atomic_init_relb(&msp->state, 0);
msp->ptimer_count = 0;
erts_atomic32_init_nob(&mdp->refc, 2);
break;
}
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_monitor_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ struct ErtsMonitorSuspend__ {
ErtsMonitorData md; /* origin = suspender; target = suspendee */
ErtsMonitorSuspend *next;
erts_atomic_t state;
int ptimer_count;
};
#define ERTS_MSUSPEND_STATE_FLG_ACTIVE ((erts_aint_t) (((Uint) 1) << (sizeof(Uint)*8 - 1)))
#define ERTS_MSUSPEND_STATE_COUNTER_MASK (~ERTS_MSUSPEND_STATE_FLG_ACTIVE)
Expand Down
30 changes: 16 additions & 14 deletions erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4992,6 +4992,19 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
return ((int) reds)*4 + 8;
}

static void
activate_suspend_monitor(Process *c_p, ErtsMonitorSuspend *msp)
{
erts_aint_t mstate;

ASSERT(msp->ptimer_count == 0);

mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
}

static void
handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
{
Expand All @@ -5000,14 +5013,8 @@ handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);

if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) {
ErtsMonitorSuspend *msp;
erts_aint_t mstate;

msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
ErtsMonitorSuspend *msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
activate_suspend_monitor(c_p, msp);
*yieldp = !0;
}
else {
Expand Down Expand Up @@ -5213,12 +5220,7 @@ erts_proc_sig_handle_pending_suspend(Process *c_p)
msp->next = NULL;
if (!(state & ERTS_PSFLG_EXITING)
&& erts_monitor_is_in_table(&msp->md.u.target)) {
erts_aint_t mstate;

mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
activate_suspend_monitor(c_p, msp);
}

erts_monitor_release(&msp->md.u.target);
Expand Down
Loading