Skip to content

Commit

Permalink
Merge pull request #162 from inaka/elbrujohalcon.161.forward_port_new…
Browse files Browse the repository at this point in the history
…_changes_in_re

[Fixes #161] Forward-port new changes in rel-3.1 to master
  • Loading branch information
elbrujohalcon authored Oct 25, 2018
2 parents df2769c + 1f17c53 commit fc4ae97
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 15 deletions.
12 changes: 2 additions & 10 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
.erlang.mk
worker_pool.d
codecov.json
_build/
.rebar/
all.coverdata
.classpath
bin
*.plt
doc
ebin
deps
.DS_Store
log
logs
erl_crash.dump
*.beam
*.log
*~
.idea
*.iml
*.orig
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t
* **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}`
* **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`.
* **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`.
* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules.
Defaults to `false`.
* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events.
This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`.

#### Using the Workers
Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`.
Expand Down
42 changes: 40 additions & 2 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
-export([ next/2
, wpool_get/2
]).
-export([ add_callback_module/2
, remove_callback_module/2
]).

%% Supervisor callbacks
-export([ init/1
Expand Down Expand Up @@ -253,6 +256,16 @@ wpool_size(Name) ->
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, WPool) -> WPool#wpool{next = Next}.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:add_callback_module(EventManager, Module).

-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:remove_callback_module(EventManager, Module).

%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any().
Expand Down Expand Up @@ -288,6 +301,7 @@ init({Name, Options}) ->
TimeChecker = time_checker_name(Name),
QueueManager = queue_manager_name(Name),
ProcessSup = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool =
store_wpool(
#wpool{ name = Name
Expand Down Expand Up @@ -315,9 +329,19 @@ init({Name, Options}) ->
, [wpool_queue_manager]
},

EventManagerSpec =
{ EventManagerName
, {gen_event, start_link, [{local, EventManagerName}]}
, permanent
, brutal_kill
, worker
, dynamic
},

SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
WorkerOpts =
[{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options],
[{queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options] ++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
{ ProcessSup
, {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]}
Expand All @@ -327,10 +351,14 @@ init({Name, Options}) ->
, [wpool_process_sup]
},

Children = [TimeCheckerSpec, QueueManagerSpec] ++
maybe_event_manager(Options, EventManagerSpec) ++
[ProcessSupSpec],

SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
SupStrategy = {one_for_all, SupIntensity, SupPeriod},
{ok, {SupStrategy, [TimeCheckerSpec, QueueManagerSpec, ProcessSupSpec]}}.
{ok, {SupStrategy, Children}}.

%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
Expand All @@ -345,6 +373,8 @@ process_sup_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-process-sup").
queue_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-queue-manager").
event_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-event-manager").

worker_with_no_task(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
Expand Down Expand Up @@ -463,3 +493,11 @@ build_wpool(Name) ->

next_wpool(Wpool) ->
Wpool#wpool{next = (Wpool#wpool.next rem Wpool#wpool.size) + 1}.

maybe_event_manager(Options, Item) ->
EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
case EnableEventManager of
true ->
[Item];
_ -> []
end.
5 changes: 5 additions & 0 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,20 @@ cast_call(Process, From, Call) ->
-spec init({atom(), atom(), term(), [wpool:option()]}) ->
{ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}.
init({Name, Mod, InitArgs, Options}) ->
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),

case Mod:init(InitArgs) of
{ok, ModState} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}};
{ok, ModState, NextStep} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
Expand All @@ -104,6 +108,7 @@ init({Name, Mod, InitArgs, Options}) ->
terminate(Reason, State) ->
#state{mod=Mod, state=ModState, name=Name, options=Options} = State,
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).

%% @private
Expand Down
84 changes: 84 additions & 0 deletions src/wpool_process_callbacks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
-module(wpool_process_callbacks).

-behaviour(gen_event).

%% gen_event callbacks

-export([ init/1
, handle_event/2
, handle_call/2
]).

-export([ notify/3
, add_callback_module/2
, remove_callback_module/2
]).
-type state() :: module().

-type event() :: handle_init_start | handle_worker_creation | handle_worker_death.

-callback handle_init_start(wpool:name()) -> any().
-callback handle_worker_creation(wpool:name()) -> any().
-callback handle_worker_death(wpool:name(), term()) -> any().

-optional_callbacks([handle_init_start/1, handle_worker_creation/1, handle_worker_death/2]).

-spec init(module()) -> {ok, state()}.
init(Module) ->
{ok, Module}.

-spec handle_event({event(), [any()]}, state()) -> {ok, state()}.
handle_event({Event, Args}, Module) ->
call(Module, Event, Args),
{ok, Module}.

-spec handle_call(Msg, state()) -> {ok, {error, {unexpected_call, Msg}}, state()}.
handle_call(Msg, State) ->
{ok, {error, {unexpected_call, Msg}}, State}.

-spec notify(event(), [wpool:option()], [any()]) -> ok.
notify(Event, Options, Args) ->
case lists:keyfind(event_manager, 1, Options) of
{event_manager, EventMgr} ->
gen_event:notify(EventMgr, {Event, Args});
_ ->
ok
end.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}.
add_callback_module(EventManager, Module) ->
case ensure_loaded(Module) of
ok ->
gen_event:add_handler(EventManager,
{wpool_process_callbacks, Module}, Module);
Other ->
Other
end.


-spec remove_callback_module(wpool:name(), module()) -> ok | {error, any()}.
remove_callback_module(EventManager, Module) ->
gen_event:delete_handler(EventManager, {wpool_process_callbacks, Module}, Module).

call(Module, Event, Args) ->
try
case erlang:function_exported(Module, Event, length(Args)) of
true ->
erlang:apply(Module, Event, Args);
_ ->
ok
end
catch
E:R ->
error_logger:warning_msg("Could not call callback module, error:~p, reason:~p", [E, R])
end.

ensure_loaded(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
ok;
{error, embedded} -> %% We are in embedded mode so the module was loaded if exists
ok;
Other ->
Other
end.
20 changes: 20 additions & 0 deletions src/wpool_process_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ start_link(Parent, Name, Options) ->
init({Name, Options}) ->
Workers = proplists:get_value(workers, Options, 100),
Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}),
maybe_add_event_handler(Options),
{WorkerType, Worker, InitArgs} =
case proplists:get_value(worker_type, Options, gen_server) of
gen_server ->
Expand All @@ -55,3 +56,22 @@ init({Name, Options}) ->
, [Worker]
} || I <- lists:seq(1, Workers)],
{ok, {Strategy, WorkerSpecs}}.

maybe_add_event_handler(Options) ->
case proplists:get_value(event_manager, Options, undefined) of
undefined ->
ok;
EventMgr ->
lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end,
proplists:get_value(callbacks, Options, []))
end.

add_initial_callback(EventManager, Module) ->
case wpool_process_callbacks:add_callback_module(EventManager, Module) of
ok ->
ok;
Other ->
error_logger:warning_msg("The callback module:~p could not be loaded, reason:~p",
[Module, Other])
end.

6 changes: 4 additions & 2 deletions src/wpool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
%% PUBLIC API
%%-------------------------------------------------------------------
%% @doc Starts the supervisor
-spec start_link() -> {ok, pid()}.
-spec start_link() ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).

%% @doc Starts a new pool
-spec start_pool(wpool:name(), [wpool:option()]) -> {ok, pid()}.
-spec start_pool(wpool:name(), [wpool:option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_pool(Name, Options) -> supervisor:start_child(?MODULE, [Name, Options]).

%% @doc Stops a pool
Expand Down
1 change: 0 additions & 1 deletion test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ broadcast(_Config) ->
meck:unload(x),
{comment, []}.


%% =============================================================================
%% Helpers
%% =============================================================================
Expand Down
Loading

0 comments on commit fc4ae97

Please sign in to comment.