Skip to content

Commit

Permalink
Merge pull request #137 from inaka/elbrujohalcon.137.adjust_to_otp21
Browse files Browse the repository at this point in the history
Adjust to OTP21
  • Loading branch information
michalwski authored Jul 18, 2018
2 parents 5d71ef6 + 9ce28d5 commit ff46635
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 50 deletions.
27 changes: 3 additions & 24 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ random_worker(Sup) ->
case wpool_size(Sup) of
undefined -> exit(no_workers);
WpoolSize ->
WorkerNumber = rnd(WpoolSize),
WorkerNumber = rand:uniform(WpoolSize),
worker_name(Sup, WorkerNumber)
end.

Expand Down Expand Up @@ -345,7 +345,7 @@ worker_with_no_task(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = rnd(Wpool#wpool.size),
First = rand:uniform(Wpool#wpool.size),
worker_with_no_task(0, Wpool#wpool{next = First}).
worker_with_no_task(Size, #wpool{size = Size}) ->
undefined;
Expand All @@ -370,7 +370,7 @@ min_message_queue(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = rnd(Wpool#wpool.size),
First = rand:uniform(Wpool#wpool.size),
min_message_queue(0, Wpool#wpool{next = First}, []).
min_message_queue(Size, #wpool{size = Size}, Found) ->
{_, Worker} = lists:min(Found),
Expand Down Expand Up @@ -458,24 +458,3 @@ build_wpool(Name) ->

next_wpool(Wpool) ->
Wpool#wpool{next = (Wpool#wpool.next rem Wpool#wpool.size) + 1}.

rnd(WpoolSize) ->
case application:get_env(worker_pool, random_fun) of
undefined ->
set_random_fun(),
rnd(WpoolSize);
{ok, RndFun} ->
RndFun(WpoolSize)
end.

set_random_fun() ->
RndFun =
case code:ensure_loaded(rand) of
{module, rand} -> fun rand:uniform/1;
{error, _} ->
fun(Size) ->
_ = erlang:apply(random, seed, [os:timestamp()]),
erlang:apply(random, uniform, [Size])
end
end,
application:set_env(worker_pool, random_fun, RndFun).
67 changes: 50 additions & 17 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-type state() :: #state{}.

-type from() :: {pid(), reference()}.
-type next_step() :: timeout() | hibernate | {continue, term()}.

%% api
-export([ start_link/4
Expand All @@ -44,6 +45,8 @@
, handle_call/3
, handle_cast/2
, handle_info/2
, handle_continue/2
, format_status/2
]).

%%%===================================================================
Expand Down Expand Up @@ -74,7 +77,8 @@ cast_call(Process, From, Call) ->
%%% init, terminate, code_change, info callbacks
%%%===================================================================
%% @private
-spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}.
-spec init({atom(), atom(), term(), [wpool:option()]}) ->
{ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}.
init({Name, Mod, InitArgs, Options}) ->
case Mod:init(InitArgs) of
{ok, ModState} ->
Expand All @@ -84,13 +88,13 @@ init({Name, Mod, InitArgs, Options}) ->
, state = ModState
, options = Options
}};
{ok, ModState, Timeout} ->
{ok, ModState, NextStep} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}, Timeout};
}, NextStep};
ignore -> {stop, can_not_ignore};
Error -> Error
end.
Expand All @@ -112,31 +116,60 @@ code_change(OldVsn, State, Extra) ->

%% @private
-spec handle_info(any(), state()) ->
{noreply, state()} | {stop, term(), state()}.
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_info(Info, State) ->
case wpool_utils:do_try(
fun() -> (State#state.mod):handle_info(Info, State#state.state) end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, Timeout} ->
{noreply, State#state{state = NewState}, Timeout};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end.

%% @private
-spec handle_continue(any(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_continue(Continue, State) ->
case wpool_utils:do_try(
fun() -> (State#state.mod):handle_continue(Continue, State#state.state) end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end.

%% @private
-spec format_status(normal | terminate, [[{_, _}] | state(), ...]) -> term().
format_status(Opt, [PDict, State]) ->
case erlang:function_exported(State#state.mod, format_status, 2) of
false ->
case Opt of % This is copied from gen_server:format_status/4
terminate -> State#state.state;
normal -> [{data, [{"State", State#state.state}]}]
end;
true ->
wpool_utils:do_try(
fun() -> (State#state.mod):format_status(Opt, [PDict, State#state.state]) end)
end.

%%%===================================================================
%%% real (i.e. interesting) callbacks
%%%===================================================================
%% @private
-spec handle_cast(term(), state()) -> {noreply, state()}.
-spec handle_cast(term(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_cast({call, From, Call}, State) ->
case handle_call(Call, From, State) of
{reply, Response, NewState} ->
gen_server:reply(From, Response),
{noreply, NewState};
{reply, Response, NewState, Timeout} ->
{reply, Response, NewState, NextStep} ->
gen_server:reply(From, Response),
{noreply, NewState, Timeout};
{noreply, NewState, NextStep};
{stop, Reason, Response, NewState} ->
gen_server:reply(From, Response),
{stop, Reason, NewState};
Expand All @@ -156,8 +189,8 @@ handle_cast({cast, Cast}, State) ->
fun() -> (State#state.mod):handle_cast(Cast, State#state.state) end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, Timeout} ->
{noreply, State#state{state = NewState}, Timeout};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end,
Expand All @@ -171,9 +204,9 @@ handle_cast({cast, Cast}, State) ->
%% @private
-spec handle_call(term(), from(), state()) ->
{reply, term(), state()}
| {reply, term(), state(), timeout() | hibernate}
| {reply, term(), state(), next_step()}
| {noreply, state()}
| {noreply, state(), timeout() | hibernate}
| {noreply, state(), next_step()}
| {stop, term(), term(), state()}
| {stop, term(), state()}.
handle_call(Call, From, State) ->
Expand All @@ -191,12 +224,12 @@ handle_call(Call, From, State) ->
end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, Timeout} ->
{noreply, State#state{state = NewState}, Timeout};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{reply, Response, NewState} ->
{reply, Response, State#state{state = NewState}};
{reply, Response, NewState, Timeout} ->
{reply, Response, State#state{state = NewState}, Timeout};
{reply, Response, NewState, NextStep} ->
{reply, Response, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}};
{stop, Reason, Response, NewState} ->
Expand Down
11 changes: 9 additions & 2 deletions test/echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
, handle_call/3
, handle_cast/2
, handle_info/2
, handle_continue/2
, format_status/2
]).

-dialyzer([no_behaviours]).
Expand All @@ -48,6 +50,11 @@ handle_info(Info, _State) -> Info.
handle_cast(Cast, _State) -> Cast.

-type from() :: {pid(), reference()}.
-spec handle_call(state | Call, from(), State) -> {reply, State, State} | Call.
handle_call(state, _From, State) -> {reply, State, State};
-spec handle_call(Call, from(), term()) -> Call.
handle_call(Call, _From, _State) -> Call.

-spec handle_continue(Continue, term()) -> Continue.
handle_continue(Continue, _State) -> Continue.

-spec format_status(normal | terminate, [[{_, _}] | State, ...]) -> {formatted_state, State}.
format_status(_, [_PDict, State]) -> {formatted_state, State}.
104 changes: 97 additions & 7 deletions test/wpool_process_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
, info/1
, cast/1
, call/1
, continue/1
, format_status/1
, no_format_status/1
, stop/1
]).
-export([ pool_restart_crash/1
Expand Down Expand Up @@ -79,7 +82,7 @@ init_timeout(_Config) ->
{ok, Pid} =
wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, []),
timer:sleep(1),
timeout = wpool_process:call(?MODULE, state, 5000),
timeout = get_state(?MODULE),
Pid ! {stop, normal, state},
timer:sleep(1000),
false = erlang:is_process_alive(Pid),
Expand All @@ -90,10 +93,10 @@ init_timeout(_Config) ->
info(_Config) ->
{ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
Pid ! {noreply, newstate},
newstate = wpool_process:call(?MODULE, state, 5000),
newstate = get_state(?MODULE),
Pid ! {noreply, newerstate, 1},
timer:sleep(1),
timeout = wpool_process:call(?MODULE, state, 5000),
timeout = get_state(?MODULE),
Pid ! {stop, normal, state},
timer:sleep(1000),
false = erlang:is_process_alive(Pid),
Expand All @@ -104,24 +107,98 @@ info(_Config) ->
cast(_Config) ->
{ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
wpool_process:cast(Pid, {noreply, newstate}),
newstate = wpool_process:call(?MODULE, state, 5000),
newstate = get_state(?MODULE),
wpool_process:cast(Pid, {noreply, newerstate, 0}),
timer:sleep(100),
timeout = wpool_process:call(?MODULE, state, 5000),
timeout = get_state(?MODULE),
wpool_process:cast(Pid, {stop, normal, state}),
timer:sleep(1000),
false = erlang:is_process_alive(Pid),

{comment, []}.

-spec continue(config()) -> {comment, []}.
continue(_Config) ->
C = fun(ContinueState) -> {noreply, ContinueState} end,
%% init/1 returns {continue, continue_state}
{ok, Pid} =
wpool_process:start_link(
?MODULE, echo_server, {ok, state, {continue, C(continue_state)}}, []),
continue_state = get_state(Pid),

%% handle_call/3 returns {continue, ...}
ok = wpool_process:call(Pid, {reply, ok, state, {continue, C(continue_state_2)}}, 5000),
continue_state_2 = get_state(Pid),
try wpool_process:call(Pid, {noreply, state, {continue, C(continue_state_3)}}, 100) of
Result -> ct:fail("Unexpected Result: ~p", [Result])
catch
_:{timeout, _} ->
continue_state_3 = get_state(Pid)
end,

%% handle_cast/2 returns {continue, ...}
wpool_process:cast(Pid, {noreply, state, {continue, C(continue_state_4)}}),
continue_state_4 = get_state(Pid),

%% handle_continue/2 returns {continue, ...}
SecondContinueResponse = C(continue_state_5),
FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}},
CastResponse = {noreply, state, {continue, FirstContinueResponse}},
wpool_process:cast(Pid, CastResponse),
continue_state_5 = get_state(Pid),

%% handle_info/2 returns {continue, ...}
Pid ! {noreply, state, {continue, C(continue_state_6)}},
continue_state_6 = get_state(Pid),

%% handle_continue/2 returns {continue, ...}
SecondContinueResponse = C(continue_state_5),
FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}},
CastResponse = {noreply, state, {continue, FirstContinueResponse}},
wpool_process:cast(Pid, CastResponse),
continue_state_5 = get_state(Pid),

%% handle_continue/2 returns timeout = 0
wpool_process:cast(Pid, {noreply, state, {continue, {noreply, continue_state_7, 0}}}),
timer:sleep(100),
timeout = get_state(Pid),

%% handle_continue/2 returns {stop, normal, state}
wpool_process:cast(Pid, {noreply, state, {continue, {stop, normal, state}}}),
timer:sleep(1000),
false = erlang:is_process_alive(Pid),

{comment, []}.

-spec format_status(config()) -> {comment, []}.
format_status(_Config) ->
%% echo_server implements format_status/2
{ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
%% therefore it returns {formatted_state, State} as its status and we just pass it through
{status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid),
[state] = [S || SItemList = [_|_] <- SItems, {formatted_state, S} <- SItemList],
%% this code is actually what we use to retrieve the state in other tests
state = get_state(Pid),
{comment, []}.

-spec no_format_status(config()) -> {comment, []}.
no_format_status(_Config) ->
%% crashy_server doesn't implement format_status/2
{ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, []),
%% therefore it uses the default format for the stauts (but with the status of the gen_server,
%% not wpool_process)
{status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid),
[state] = [S || SItemList = [_|_] <- SItems, {data, Data} <- SItemList, {"State", S} <- Data],
{comment, []}.

-spec call(config()) -> {comment, []}.
call(_Config) ->
{ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
ok1 = wpool_process:call(Pid, {reply, ok1, newstate}, 5000),
newstate = wpool_process:call(?MODULE, state, 5000),
newstate = get_state(?MODULE),
ok2 = wpool_process:call(Pid, {reply, ok2, newerstate, 1}, 5000),
timer:sleep(1),
timeout = wpool_process:call(?MODULE, state, 5000),
timeout = get_state(?MODULE),
ok3 = wpool_process:call(Pid, {stop, normal, ok3, state}, 5000),
timer:sleep(1000),
false = erlang:is_process_alive(Pid),
Expand Down Expand Up @@ -240,3 +317,16 @@ complete_coverage(_Config) ->
{error, bad} = wpool_process:code_change("oldvsn", State, bad),

{comment, []}.


%% @doc We can use this function in tests since echo_server implements format_status/2
%% by returning the state as a tuple {formatted_state, S}.
%% We can safely grab it from the result of sys:get_status/1
%% @see gen_server:format_status/2
%% @see sys:get_status/2
get_state(Atom) when is_atom(Atom) ->
get_state(whereis(Atom));
get_state(Pid) ->
{status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid),
[State] = [S || SItemList = [_|_] <- SItems, {formatted_state, S} <- SItemList],
State.

0 comments on commit ff46635

Please sign in to comment.