From 1a3051607925f3df6368d7f45f79231ab83a86c3 Mon Sep 17 00:00:00 2001 From: Brujo Benavides Date: Mon, 16 Jul 2018 18:16:56 -0300 Subject: [PATCH 1/4] Since we use OTP21 we don't need to care about other random modules than rand --- src/wpool_pool.erl | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index 63f331d..67b6d92 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -88,7 +88,7 @@ random_worker(Sup) -> case wpool_size(Sup) of undefined -> exit(no_workers); WpoolSize -> - WorkerNumber = rnd(WpoolSize), + WorkerNumber = rand:uniform(WpoolSize), worker_name(Sup, WorkerNumber) end. @@ -345,7 +345,7 @@ worker_with_no_task(Wpool) -> %% Moving the beginning of the list to a random point to ensure that clients %% do not always start asking for process_info to the processes that are most %% likely to have bigger message queues - First = rnd(Wpool#wpool.size), + First = rand:uniform(Wpool#wpool.size), worker_with_no_task(0, Wpool#wpool{next = First}). worker_with_no_task(Size, #wpool{size = Size}) -> undefined; @@ -370,7 +370,7 @@ min_message_queue(Wpool) -> %% Moving the beginning of the list to a random point to ensure that clients %% do not always start asking for process_info to the processes that are most %% likely to have bigger message queues - First = rnd(Wpool#wpool.size), + First = rand:uniform(Wpool#wpool.size), min_message_queue(0, Wpool#wpool{next = First}, []). min_message_queue(Size, #wpool{size = Size}, Found) -> {_, Worker} = lists:min(Found), @@ -458,24 +458,3 @@ build_wpool(Name) -> next_wpool(Wpool) -> Wpool#wpool{next = (Wpool#wpool.next rem Wpool#wpool.size) + 1}. - -rnd(WpoolSize) -> - case application:get_env(worker_pool, random_fun) of - undefined -> - set_random_fun(), - rnd(WpoolSize); - {ok, RndFun} -> - RndFun(WpoolSize) - end. - -set_random_fun() -> - RndFun = - case code:ensure_loaded(rand) of - {module, rand} -> fun rand:uniform/1; - {error, _} -> - fun(Size) -> - _ = erlang:apply(random, seed, [os:timestamp()]), - erlang:apply(random, uniform, [Size]) - end - end, - application:set_env(worker_pool, random_fun, RndFun). From fc7699153ebe9e7c95ff6adcaba3bfc41f5c060d Mon Sep 17 00:00:00 2001 From: Brujo Benavides Date: Mon, 16 Jul 2018 18:52:58 -0300 Subject: [PATCH 2/4] Contemplate handle_continue/2 --- src/wpool_process.erl | 52 ++++++++++++++++++++++------------ test/echo_server.erl | 4 +++ test/wpool_process_SUITE.erl | 54 ++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 4c9ba13..c5268e6 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -29,6 +29,7 @@ -type state() :: #state{}. -type from() :: {pid(), reference()}. +-type next_step() :: timeout() | hibernate | {continue, term()}. %% api -export([ start_link/4 @@ -44,6 +45,7 @@ , handle_call/3 , handle_cast/2 , handle_info/2 + , handle_continue/2 ]). %%%=================================================================== @@ -74,7 +76,8 @@ cast_call(Process, From, Call) -> %%% init, terminate, code_change, info callbacks %%%=================================================================== %% @private --spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}. +-spec init({atom(), atom(), term(), [wpool:option()]}) -> + {ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}. init({Name, Mod, InitArgs, Options}) -> case Mod:init(InitArgs) of {ok, ModState} -> @@ -84,13 +87,13 @@ init({Name, Mod, InitArgs, Options}) -> , state = ModState , options = Options }}; - {ok, ModState, Timeout} -> + {ok, ModState, NextStep} -> ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), {ok, #state{ name = Name , mod = Mod , state = ModState , options = Options - }, Timeout}; + }, NextStep}; ignore -> {stop, can_not_ignore}; Error -> Error end. @@ -112,14 +115,28 @@ code_change(OldVsn, State, Extra) -> %% @private -spec handle_info(any(), state()) -> - {noreply, state()} | {stop, term(), state()}. + {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. handle_info(Info, State) -> case wpool_utils:do_try( fun() -> (State#state.mod):handle_info(Info, State#state.state) end) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; - {noreply, NewState, Timeout} -> - {noreply, State#state{state = NewState}, Timeout}; + {noreply, NewState, NextStep} -> + {noreply, State#state{state = NewState}, NextStep}; + {stop, Reason, NewState} -> + {stop, Reason, State#state{state = NewState}} + end. + +%% @private +-spec handle_continue(any(), state()) -> + {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. +handle_continue(Continue, State) -> + case wpool_utils:do_try( + fun() -> (State#state.mod):handle_continue(Continue, State#state.state) end) of + {noreply, NewState} -> + {noreply, State#state{state = NewState}}; + {noreply, NewState, NextStep} -> + {noreply, State#state{state = NewState}, NextStep}; {stop, Reason, NewState} -> {stop, Reason, State#state{state = NewState}} end. @@ -128,15 +145,16 @@ handle_info(Info, State) -> %%% real (i.e. interesting) callbacks %%%=================================================================== %% @private --spec handle_cast(term(), state()) -> {noreply, state()}. +-spec handle_cast(term(), state()) -> + {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. handle_cast({call, From, Call}, State) -> case handle_call(Call, From, State) of {reply, Response, NewState} -> gen_server:reply(From, Response), {noreply, NewState}; - {reply, Response, NewState, Timeout} -> + {reply, Response, NewState, NextStep} -> gen_server:reply(From, Response), - {noreply, NewState, Timeout}; + {noreply, NewState, NextStep}; {stop, Reason, Response, NewState} -> gen_server:reply(From, Response), {stop, Reason, NewState}; @@ -156,8 +174,8 @@ handle_cast({cast, Cast}, State) -> fun() -> (State#state.mod):handle_cast(Cast, State#state.state) end) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; - {noreply, NewState, Timeout} -> - {noreply, State#state{state = NewState}, Timeout}; + {noreply, NewState, NextStep} -> + {noreply, State#state{state = NewState}, NextStep}; {stop, Reason, NewState} -> {stop, Reason, State#state{state = NewState}} end, @@ -171,9 +189,9 @@ handle_cast({cast, Cast}, State) -> %% @private -spec handle_call(term(), from(), state()) -> {reply, term(), state()} - | {reply, term(), state(), timeout() | hibernate} + | {reply, term(), state(), next_step()} | {noreply, state()} - | {noreply, state(), timeout() | hibernate} + | {noreply, state(), next_step()} | {stop, term(), term(), state()} | {stop, term(), state()}. handle_call(Call, From, State) -> @@ -191,12 +209,12 @@ handle_call(Call, From, State) -> end) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; - {noreply, NewState, Timeout} -> - {noreply, State#state{state = NewState}, Timeout}; + {noreply, NewState, NextStep} -> + {noreply, State#state{state = NewState}, NextStep}; {reply, Response, NewState} -> {reply, Response, State#state{state = NewState}}; - {reply, Response, NewState, Timeout} -> - {reply, Response, State#state{state = NewState}, Timeout}; + {reply, Response, NewState, NextStep} -> + {reply, Response, State#state{state = NewState}, NextStep}; {stop, Reason, NewState} -> {stop, Reason, State#state{state = NewState}}; {stop, Reason, Response, NewState} -> diff --git a/test/echo_server.erl b/test/echo_server.erl index eea8e72..05c9988 100644 --- a/test/echo_server.erl +++ b/test/echo_server.erl @@ -24,6 +24,7 @@ , handle_call/3 , handle_cast/2 , handle_info/2 + , handle_continue/2 ]). -dialyzer([no_behaviours]). @@ -51,3 +52,6 @@ handle_cast(Cast, _State) -> Cast. -spec handle_call(state | Call, from(), State) -> {reply, State, State} | Call. handle_call(state, _From, State) -> {reply, State, State}; handle_call(Call, _From, _State) -> Call. + +-spec handle_continue(Continue, term()) -> Continue. +handle_continue(Continue, _State) -> Continue. diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 520a7ce..2088a82 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -29,6 +29,7 @@ , info/1 , cast/1 , call/1 + , continue/1 , stop/1 ]). -export([ pool_restart_crash/1 @@ -114,6 +115,59 @@ cast(_Config) -> {comment, []}. +-spec continue(config()) -> {comment, []}. +continue(_Config) -> + C = fun(ContinueState) -> {noreply, ContinueState} end, + %% init/1 returns {continue, continue_state} + {ok, Pid} = + wpool_process:start_link( + ?MODULE, echo_server, {ok, state, {continue, C(continue_state)}}, []), + continue_state = wpool_process:call(?MODULE, state, 5000), + + %% handle_call/3 returns {continue, ...} + ok = wpool_process:call(Pid, {reply, ok, state, {continue, C(continue_state_2)}}, 5000), + continue_state_2 = wpool_process:call(?MODULE, state, 5000), + try wpool_process:call(Pid, {noreply, state, {continue, C(continue_state_3)}}, 100) of + Result -> ct:fail("Unexpected Result: ~p", [Result]) + catch + _:{timeout, _} -> + continue_state_3 = wpool_process:call(?MODULE, state, 5000) + end, + + %% handle_cast/2 returns {continue, ...} + wpool_process:cast(Pid, {noreply, state, {continue, C(continue_state_4)}}), + continue_state_4 = wpool_process:call(?MODULE, state, 5000), + + %% handle_continue/2 returns {continue, ...} + SecondContinueResponse = C(continue_state_5), + FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}}, + CastResponse = {noreply, state, {continue, FirstContinueResponse}}, + wpool_process:cast(Pid, CastResponse), + continue_state_5 = wpool_process:call(?MODULE, state, 5000), + + %% handle_info/2 returns {continue, ...} + Pid ! {noreply, state, {continue, C(continue_state_6)}}, + continue_state_6 = wpool_process:call(?MODULE, state, 5000), + + %% handle_continue/2 returns {continue, ...} + SecondContinueResponse = C(continue_state_5), + FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}}, + CastResponse = {noreply, state, {continue, FirstContinueResponse}}, + wpool_process:cast(Pid, CastResponse), + continue_state_5 = wpool_process:call(?MODULE, state, 5000), + + %% handle_continue/2 returns timeout = 0 + wpool_process:cast(Pid, {noreply, state, {continue, {noreply, continue_state_7, 0}}}), + timer:sleep(100), + timeout = wpool_process:call(?MODULE, state, 5000), + + %% handle_continue/2 returns {stop, normal, state} + wpool_process:cast(Pid, {noreply, state, {continue, {stop, normal, state}}}), + timer:sleep(1000), + false = erlang:is_process_alive(Pid), + + {comment, []}. + -spec call(config()) -> {comment, []}. call(_Config) -> {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), From 5063f068ec9b93be2c612490d389c8962cf44371 Mon Sep 17 00:00:00 2001 From: Brujo Benavides Date: Mon, 16 Jul 2018 19:29:23 -0300 Subject: [PATCH 3/4] Support Module:format_status/2 --- src/wpool_process.erl | 15 +++++++++++ test/echo_server.erl | 7 ++++-- test/wpool_process_SUITE.erl | 48 +++++++++++++++++++++++++----------- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/wpool_process.erl b/src/wpool_process.erl index c5268e6..1ad60d3 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -46,6 +46,7 @@ , handle_cast/2 , handle_info/2 , handle_continue/2 + , format_status/2 ]). %%%=================================================================== @@ -141,6 +142,20 @@ handle_continue(Continue, State) -> {stop, Reason, State#state{state = NewState}} end. +%% @private +-spec format_status(normal | terminate, [[{_, _}] | state(), ...]) -> term(). +format_status(Opt, [PDict, State]) -> + case erlang:function_exported(State#state.mod, format_status, 2) of + false -> + case Opt of % This is copied from gen_server:format_status/4 + terminate -> State#state.state; + normal -> [{data, [{"State", State#state.state}]}] + end; + true -> + wpool_utils:do_try( + fun() -> (State#state.mod):format_status(Opt, [PDict, State#state.state]) end) + end. + %%%=================================================================== %%% real (i.e. interesting) callbacks %%%=================================================================== diff --git a/test/echo_server.erl b/test/echo_server.erl index 05c9988..1bfd176 100644 --- a/test/echo_server.erl +++ b/test/echo_server.erl @@ -25,6 +25,7 @@ , handle_cast/2 , handle_info/2 , handle_continue/2 + , format_status/2 ]). -dialyzer([no_behaviours]). @@ -49,9 +50,11 @@ handle_info(Info, _State) -> Info. handle_cast(Cast, _State) -> Cast. -type from() :: {pid(), reference()}. --spec handle_call(state | Call, from(), State) -> {reply, State, State} | Call. -handle_call(state, _From, State) -> {reply, State, State}; +-spec handle_call(Call, from(), term()) -> Call. handle_call(Call, _From, _State) -> Call. -spec handle_continue(Continue, term()) -> Continue. handle_continue(Continue, _State) -> Continue. + +-spec format_status(normal | terminate, [[{_, _}] | State, ...]) -> {formatted_state, State}. +format_status(_, [_PDict, State]) -> {formatted_state, State}. diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 2088a82..2e2026b 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -30,6 +30,7 @@ , cast/1 , call/1 , continue/1 + , no_format_status/1 , stop/1 ]). -export([ pool_restart_crash/1 @@ -80,7 +81,7 @@ init_timeout(_Config) -> {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, []), timer:sleep(1), - timeout = wpool_process:call(?MODULE, state, 5000), + timeout = get_state(?MODULE), Pid ! {stop, normal, state}, timer:sleep(1000), false = erlang:is_process_alive(Pid), @@ -91,10 +92,10 @@ init_timeout(_Config) -> info(_Config) -> {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), Pid ! {noreply, newstate}, - newstate = wpool_process:call(?MODULE, state, 5000), + newstate = get_state(?MODULE), Pid ! {noreply, newerstate, 1}, timer:sleep(1), - timeout = wpool_process:call(?MODULE, state, 5000), + timeout = get_state(?MODULE), Pid ! {stop, normal, state}, timer:sleep(1000), false = erlang:is_process_alive(Pid), @@ -105,10 +106,10 @@ info(_Config) -> cast(_Config) -> {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), wpool_process:cast(Pid, {noreply, newstate}), - newstate = wpool_process:call(?MODULE, state, 5000), + newstate = get_state(?MODULE), wpool_process:cast(Pid, {noreply, newerstate, 0}), timer:sleep(100), - timeout = wpool_process:call(?MODULE, state, 5000), + timeout = get_state(?MODULE), wpool_process:cast(Pid, {stop, normal, state}), timer:sleep(1000), false = erlang:is_process_alive(Pid), @@ -122,44 +123,44 @@ continue(_Config) -> {ok, Pid} = wpool_process:start_link( ?MODULE, echo_server, {ok, state, {continue, C(continue_state)}}, []), - continue_state = wpool_process:call(?MODULE, state, 5000), + continue_state = get_state(Pid), %% handle_call/3 returns {continue, ...} ok = wpool_process:call(Pid, {reply, ok, state, {continue, C(continue_state_2)}}, 5000), - continue_state_2 = wpool_process:call(?MODULE, state, 5000), + continue_state_2 = get_state(Pid), try wpool_process:call(Pid, {noreply, state, {continue, C(continue_state_3)}}, 100) of Result -> ct:fail("Unexpected Result: ~p", [Result]) catch _:{timeout, _} -> - continue_state_3 = wpool_process:call(?MODULE, state, 5000) + continue_state_3 = get_state(Pid) end, %% handle_cast/2 returns {continue, ...} wpool_process:cast(Pid, {noreply, state, {continue, C(continue_state_4)}}), - continue_state_4 = wpool_process:call(?MODULE, state, 5000), + continue_state_4 = get_state(Pid), %% handle_continue/2 returns {continue, ...} SecondContinueResponse = C(continue_state_5), FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}}, CastResponse = {noreply, state, {continue, FirstContinueResponse}}, wpool_process:cast(Pid, CastResponse), - continue_state_5 = wpool_process:call(?MODULE, state, 5000), + continue_state_5 = get_state(Pid), %% handle_info/2 returns {continue, ...} Pid ! {noreply, state, {continue, C(continue_state_6)}}, - continue_state_6 = wpool_process:call(?MODULE, state, 5000), + continue_state_6 = get_state(Pid), %% handle_continue/2 returns {continue, ...} SecondContinueResponse = C(continue_state_5), FirstContinueResponse = {noreply, another_state, {continue, SecondContinueResponse}}, CastResponse = {noreply, state, {continue, FirstContinueResponse}}, wpool_process:cast(Pid, CastResponse), - continue_state_5 = wpool_process:call(?MODULE, state, 5000), + continue_state_5 = get_state(Pid), %% handle_continue/2 returns timeout = 0 wpool_process:cast(Pid, {noreply, state, {continue, {noreply, continue_state_7, 0}}}), timer:sleep(100), - timeout = wpool_process:call(?MODULE, state, 5000), + timeout = get_state(Pid), %% handle_continue/2 returns {stop, normal, state} wpool_process:cast(Pid, {noreply, state, {continue, {stop, normal, state}}}), @@ -168,14 +169,24 @@ continue(_Config) -> {comment, []}. +-spec no_format_status(config()) -> {comment, []}. +no_format_status(_Config) -> + %% crashy_server doesn't implement format_status/2 + {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, []), + %% therefore it uses the default format for the stauts (but with the status of the gen_server, + %% not wpool_process) + {status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid), + [state] = [S || SItemList = [_|_] <- SItems, {data, Data} <- SItemList, {"State", S} <- Data], + {comment, []}. + -spec call(config()) -> {comment, []}. call(_Config) -> {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), ok1 = wpool_process:call(Pid, {reply, ok1, newstate}, 5000), - newstate = wpool_process:call(?MODULE, state, 5000), + newstate = get_state(?MODULE), ok2 = wpool_process:call(Pid, {reply, ok2, newerstate, 1}, 5000), timer:sleep(1), - timeout = wpool_process:call(?MODULE, state, 5000), + timeout = get_state(?MODULE), ok3 = wpool_process:call(Pid, {stop, normal, ok3, state}, 5000), timer:sleep(1000), false = erlang:is_process_alive(Pid), @@ -294,3 +305,10 @@ complete_coverage(_Config) -> {error, bad} = wpool_process:code_change("oldvsn", State, bad), {comment, []}. + +get_state(Atom) when is_atom(Atom) -> + get_state(whereis(Atom)); +get_state(Pid) -> + {status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid), + [State] = [S || SItemList = [_|_] <- SItems, {formatted_state, S} <- SItemList], + State. From 9ce28d5112acc514a40e73a950e855b9e6b5fad5 Mon Sep 17 00:00:00 2001 From: Brujo Benavides Date: Tue, 17 Jul 2018 09:08:41 -0300 Subject: [PATCH 4/4] Add proper test and docs for format_status/2 support --- test/wpool_process_SUITE.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 2e2026b..cb53c42 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -30,6 +30,7 @@ , cast/1 , call/1 , continue/1 + , format_status/1 , no_format_status/1 , stop/1 ]). @@ -169,6 +170,17 @@ continue(_Config) -> {comment, []}. +-spec format_status(config()) -> {comment, []}. +format_status(_Config) -> + %% echo_server implements format_status/2 + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + %% therefore it returns {formatted_state, State} as its status and we just pass it through + {status, Pid, {module, gen_server}, SItems} = sys:get_status(Pid), + [state] = [S || SItemList = [_|_] <- SItems, {formatted_state, S} <- SItemList], + %% this code is actually what we use to retrieve the state in other tests + state = get_state(Pid), + {comment, []}. + -spec no_format_status(config()) -> {comment, []}. no_format_status(_Config) -> %% crashy_server doesn't implement format_status/2 @@ -306,6 +318,12 @@ complete_coverage(_Config) -> {comment, []}. + +%% @doc We can use this function in tests since echo_server implements format_status/2 +%% by returning the state as a tuple {formatted_state, S}. +%% We can safely grab it from the result of sys:get_status/1 +%% @see gen_server:format_status/2 +%% @see sys:get_status/2 get_state(Atom) when is_atom(Atom) -> get_state(whereis(Atom)); get_state(Pid) ->