Skip to content

Commit

Permalink
Implement send_and_wait as a blocking gen_server:call/3
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jan 17, 2024
1 parent c198c0c commit 1b2e5f0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
13 changes: 4 additions & 9 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,12 @@ send(Name, Pid, Msg) ->
amoc_throttle_controller:send(Name, Pid, Msg).

%% @doc Sends and receives the given message `Msg'.
%% Can be used to halt execution if we want a process to be idle when waiting for rate increase or other processes finishing their tasks.
%%
%% Can be used to halt execution if we want a process to be idle when waiting for rate increase
%% or other processes finishing their tasks.
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
send_and_wait(Name, Msg) ->
case send(Name, Msg) of
ok ->
receive
Msg -> ok
end;
Error ->
Error
end.
amoc_throttle_controller:wait(Name, Msg).

%% @doc Stops the throttle mechanism for the given `Name'.
-spec stop(name()) -> ok | {error, any()}.
Expand Down
13 changes: 12 additions & 1 deletion src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ensure_throttle_processes_started/4,
pause/1, resume/1, stop/1,
change_rate/3, change_rate_gradually/6,
run/2, send/3, raise_event_on_slave_node/2, telemetry_event/2]).
run/2, send/3, wait/2, raise_event_on_slave_node/2, telemetry_event/2]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -75,6 +75,17 @@ send(Name, Pid, Msg) ->
Error
end.

-spec wait(name(), term()) -> ok | {error, any()}.
wait(Name, Msg) ->
case amoc_throttle_process:get_throttle_process(Name) of
{ok, Throttler} ->
raise_event_on_slave_node(Name, request),
Msg = amoc_throttle_process:wait(Throttler, Msg),
raise_event_on_slave_node(Name, execute);
Error ->
Error
end.

-spec pause(name()) -> ok | {error, any()}.
pause(Name) ->
gen_server:call(?MASTER_SERVER, {pause, Name}).
Expand Down
28 changes: 26 additions & 2 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-export([stop/1,
run/3,
send/4,
wait/2,
update/3,
pause/1,
resume/1,
Expand All @@ -28,6 +29,7 @@
-define(PG_SCOPE, amoc_throttle).
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute

-type schedule() :: [{gen_server:from(), term()} | pid()].
-record(state, {can_run_fn = true :: boolean(),
pause = false :: boolean(),
max_n :: non_neg_integer(),
Expand All @@ -36,8 +38,8 @@
interval = 0 :: amoc_throttle:interval(), %%ms
delay_between_executions = 0 :: non_neg_integer(), %%ms
tref :: timer:tref() | undefined,
schedule = [] :: [pid()],
schedule_reversed = [] :: [pid()]}).
schedule = [] :: schedule(),
schedule_reversed = [] :: schedule()}).

-type state() :: #state{}.
%%------------------------------------------------------------------------------
Expand All @@ -62,6 +64,21 @@ send(Pid, Name, ReqPid, Msg) ->
RunnerPid = amoc_throttle_runner:spawn_send(Name, ReqPid, Msg),
gen_server:cast(Pid, {schedule, RunnerPid}).

%% Notice that a normal send would spawn a local process, send a pid across distribution,
%% have the local process receive an atom across distribution, and have the `Msg' be delivered
%% to the requester locally.
%%
%% That is 1 pid and 1 atom across distribution and one local payload,
%% plus one spawn that copies the payload.
%%
%% Doing a call instead will only send back and forth the payload,
%% and already solve monitoring problems
%%
%% This is specially optimal when the payload is a literal, like an atom.
-spec wait(pid(), term()) -> ok.
wait(Pid, Msg) ->
gen_server:call(Pid, {schedule, Msg}, infinity).

-spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok.
update(Pid, Interval, Rate) ->
gen_server:cast(Pid, {update, Interval, Rate}).
Expand Down Expand Up @@ -138,6 +155,9 @@ handle_cast({update, Interval, Rate}, #state{name = Name} = State) ->

-spec handle_call(term(), term(), state()) ->
{reply, {error, not_implemented} | state(), state(), {continue, maybe_run_fn}}.
handle_call({schedule, Reply}, From, #state{schedule_reversed = SchRev, name = Name} = State) ->
amoc_throttle_controller:telemetry_event(Name, request),
{noreply, State#state{schedule_reversed = [{From, Reply} | SchRev]}, {continue, maybe_run_fn}};
handle_call(get_state, _, State) ->
{reply, printable_state(State), State, {continue, maybe_run_fn}};
handle_call(_, _, State) ->
Expand Down Expand Up @@ -220,6 +240,10 @@ maybe_run_fn(#state{can_run_fn = true, pause = false, n = N} = State) when N > 0
maybe_run_fn(State) ->
State.

run_fn(#state{schedule = [{From, Reply} | T], name = Name} = State) ->
gen_server:reply(From, Reply),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T};
run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
RunnerPid ! scheduled,
Expand Down

0 comments on commit 1b2e5f0

Please sign in to comment.