diff --git a/README.md b/README.md index f0ace19..f21351a 100644 --- a/README.md +++ b/README.md @@ -6,116 +6,93 @@ A pool of gen servers. ## Abstract -The goal of **worker pool** is pretty straightforward: To provide a transparent way to manage a pool of workers and _do the best effort_ in balancing the load among them distributing the tasks requested to the pool. +The goal of **worker pool** is pretty straightforward: to provide a transparent way to manage a pool of workers and _do the best effort_ in balancing the load among them, distributing the tasks requested to the pool. -## Documentation - -The documentation can be generated from code using [rebar3_ex_doc](https://github.com/starbelly/rebar3_ex_doc) with `rebar3 ex_doc`. It is also available online [here](https://hexdocs.pm/worker_pool/) - -## Usage - -All user functions are exposed through the [wpool module](https://hexdocs.pm/worker_pool/wpool.html). - -### Starting the Application - -**Worker Pool** is an erlang application that can be started using the functions in the [`application`](https://erldocs.com/current/kernel/application.html) module. For convenience, `wpool:start/0` and `wpool:stop/0` are also provided. - -### Starting a Pool - -To start a new worker pool, you can either use `wpool:start_pool` (if you want to supervise it yourself) or `wpool:start_sup_pool` (if you want the pool to live under wpool's supervision tree). You can provide several options on any of those calls: - -* **overrun_warning**: The number of milliseconds after which a task is considered *overrun* (i.e. delayed) so a warning is emitted using **overrun_handler**. The task is monitored until it is finished, thus more than one warning might be emitted for a single task. The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with `overrun_warning = 1000` warnings would be emitted after 1000, 2000, 4000, 8000 ...) The default value for this setting is `infinity` (i.e. no warnings are emitted) -* **max_overrun_warnings**: The maximum number of overrun warnings emitted before killing a delayed task: that is, killing the worker running the task. If this parameter is set to a value other than `infinity` the rounds of warnings becomes equally timed (i.e. with `overrun_warning = 1000` and `max_overrun_warnings = 5` the task would be killed after 5 seconds of execution) The default value for this setting is `infinity` (i.e. delayed tasks are not killed) +You can just replace your calls to the `gen_server` module with calls to the `wpool` module, i.e., wherever you had a `gen_server:call/N` now you put a `wpool:call/N`, and that’s it! - **NOTE:** As the worker is being killed it might cause worker's messages to be missing if you are using a worker stategy other than `available_worker` (see worker strategies below) +## Installation -* **overrun_handler**: The module and function to call when a task is *overrun*. The default value for this setting is `{error_logger, warning_report}`. Repor values are: - - * *{alert, AlertType}*: Where `AlertType` is `overrun` on regular warnings, or `max_overrun_limit` when the worker is about to be killed. - * *{pool, Pool}*: The poolname - * *{worker, Pid}*: Pid of the worker - * *{task, Task}*: A description of the task - * *{runtime, Runtime}*: The runtime of the current round - -* **workers**: The number of workers in the pool. The default value for this setting is `100` -* **worker**: The [`gen_server`](https://erldocs.com/current/stdlib/gen_server.html) module that each worker will run and the `InitArgs` to use on the corresponding `start_link` call used to initiate it. The default value for this setting is `{wpool_worker, undefined}`. That means that if you don't provide a worker implementation, the pool will be generated with this default one. [`wpool_worker`](https://hexdocs.pm/worker_pool/wpool_worker.html) is a module that implements a very simple RPC-like interface. -* **worker_opt**: Options that will be passed to each `gen_server` worker. This are the same as described at `gen_server` documentation. -* **worker_shutdown**: The `shutdown` option to be used in the child specs of the workers. Defaults to `5000`. -* **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}` -* **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`. -* **pool_sup_shutdown**: The `shutdown` option to be used for the supervisor over the individual workers (`wpool_process_sup`). That is, the value set in the child spec for this supervisor, which is specified in its parent supervisor (`wpool_pool`). -* **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`. -* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules. - Defaults to `false`. -* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events. - This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`. - -### Using the Workers - -Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`. - -#### Choosing a Strategy - -Beyond the regular parameters for `gen_server`, wpool also provides an extra optional parameter: **Strategy**. -The strategy used to pick up the worker to perform the task. If not provided, the result of `wpool:default_strategy/0` is used. The available strategies are defined in the `t:wpool:strategy/0` type and also described below: - -##### best_worker - -Picks the worker with the smaller queue of messages. Loosely based on [this article](https://lethain.com/load-balancing-across-erlang-process-groups/). This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes. - -##### random_worker +Worker Pool is available on [Hex.pm](https://hex.pm/packages/worker_pool). To install, just add it to your dependencies in `rebar.config`: +```erlang +{deps, [{worker_pool, "~> 6.1"}]}. +``` +or in `mix.ers` +```elixir +defp deps() do + [{:worker_pool, "~> 6.1"}] +end +``` -Just picks a random worker. This strategy is the fastest one when to select a worker. It's ideal if your workers will perform many short tasks. +## Documentation -##### next_worker +The documentation can be generated from code using [rebar3_ex_doc](https://github.com/starbelly/rebar3_ex_doc) with `rebar3 ex_doc`. It is also available online in [Hexdocs](https://hexdocs.pm/worker_pool/). -Picks the next worker in a round-robin fashion. That ensures evenly distribution of tasks. +All user functions are exposed through the [wpool module](https://hexdocs.pm/worker_pool/wpool.html). -##### available_worker +Detailed usage is also documented in the same [wpool module summary](https://hexdocs.pm/worker_pool/doc/wpool.html#content). -Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: **Worker_Timeout** that controls how many milliseconds is the client willing to spend in that, regardless of the global **Timeout** for the call). -This strategy ensures that, if a worker crashes, no messages are lost in its message queue. -It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list. +## Examples -##### next_available_worker +Say your application needs a connection to a third-party service that is frequently used. You implement a `gen_server` called `my_server` that knows how to talk the third-party protocol and keeps the connection open, and your business logic uses this `my_server` as the API to interact with. But this server is not only a single-point-of-failure, but also a bottleneck. -In a way, this strategy behaves like `available_worker` in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy. +Let's pool this server! -##### hash_worker +#### Starting the pool -This strategy takes a key and selects a worker using [`erlang:phash2/2`](https://www.erlang.org/doc/man/erlang.html#phash-2). This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers. +First we need to start the pool, instead of starting a single server. If your server was part of your supervision tree, and your supervisor had a child-spec like: +```erlang + ChildSpec = #{id => my_server_name, + start => {my_server, start_link, Arguments}, + restart => permanent, + shutdown => 5000, + type => worker}. +``` -### Broadcasting a Pool +You can now replace it by +```erlang + WPoolOpts = [{worker, {my_server, Arguments}}], + ChildSpec = wpool:child_spec(my_server_name, WPoolOpts), +``` -Wpool provides a way to `broadcast` a message to every worker within the given Pool. +#### Using the pool +Now that the pool is in place, wherever you've called the server, now you can simply call the pool: all code like the following ```erlang -1> wpool:start(). -ok -2> wpool:start_pool(my_pool, [{workers, 3}]). -{ok,<0.299.0>} -3> wpool:broadcast(my_pool, {io, format, ["I got a message.~n"]}). -I got a message. -I got a message. -I got a message. -ok + %% ... + gen_server:call(my_server, Request), + %% ... + gen_server:cast(my_server, Notify), + %% ... +``` +can simply be replaced by +```erlang + %% ... + wpool:call(my_server, Request), + %% ... + wpool:cast(my_server, Notify), + %% ... ``` -**NOTE:** This messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers **before** the queued up tasks. - -### Watching a Pool +If you want all the workers to get notified of an event (for example for consistency reasons), you can use: +```erlang + wpool:broadcast(my_server, Request) +``` -Wpool provides a way to get live statistics about a pool. To do that, you can use `wpool:stats/1`. +And if events have a partial ordering, that is, there is a subset of them were they should be processed in a strict ordering, for example requests by user `X` should be processed sequentially but how they interleave with other requests is irrelevant, you can use: +```erlang + wpool:call(my_server, Request, {hash_worker, X}) +``` +and requests for `X` will always be sent to the same worker. -### Stopping a Pool +And just like that, all your requests are now pooled! -To stop a pool, just use `wpool:stop_pool/1`. +By passing a more complex configuration in the `WPoolOpts` parameter, you can tweak many things, for example the number of workers (`t:wpool:workers()`), options to pass to OTP's the `gen_server` engine behind your code `t:wpool:worker_opt()`, the strategy to supervise all the workers (`t:wpool:strategy()`), register callbacks you want to be triggered on worker's events (`t:wpool:callbacks()`), and many more. See `t:wpool:option()` for all options available. -## Examples +#### Case studies used in production To see how `wpool` is used you can check the [test](test) folder where you'll find many different scenarios exercised in the different suites. -If you want to see **worker_pool** in a _real life_ project, I recommend you to check [sumo_db](https://github.com/inaka/sumo_db), another open-source library from [Inaka](https://inaka.github.io/) that uses wpool intensively. +If you want to see **worker_pool** in a _real life_ project, we recommend you to check [sumo_db](https://github.com/inaka/sumo_db), another open-source library from [Inaka](https://inaka.github.io/) that uses wpool intensively, or [MongooseIM](https://github.com/esl/MongooseIM), an Erlang Solutions' Messaging server that uses wpool in many different ways. ## Benchmarks @@ -125,10 +102,6 @@ If you want to see **worker_pool** in a _real life_ project, I recommend you to If you find any **bugs** or have a **problem** while using this library, please [open an issue](https://github.com/inaka/worker_pool/issues/new) in this repo (or a pull request :)). -## On Hex.pm - -Worker Pool is available on [Hex.pm](https://hex.pm/packages/worker_pool). - ## Requirements **Required OTP version 25** or higher. We only provide guarantees that the system runs on `OTP25+` since that's what we're testing it in, but the `minimum_otp_vsn` is `"21"` because some systems where **worker_pool** is integrated do require it. diff --git a/src/wpool.erl b/src/wpool.erl index 6fff1ca..2b7b04c 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -11,35 +11,185 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @author Fernando Benavides %%% @doc Worker pool main interface. -%%% Use functions provided by this module to manage your pools of workers +%%% +%%% Use functions provided by this module to manage your pools of workers. +%%% +%%%

Starting the application

+%%% Worker Pool is an Erlang application that can be started using the functions in the +%%% `application' module. For convenience, `wpool:start/0' and `wpool:stop/0' are also provided. +%%% +%%%

Starting a Pool

+%%% +%%% To start a new worker pool, you can either +%%% +%%% +%%%

Stopping a Pool

+%%% To stop a pool, just use `wpool:stop_pool/1' or `wpool:stop_sup_pool/1' according to how you +%%% started the pool. +%%% +%%%

Using the Workers

+%%% +%%% Since the workers are `gen_server's, messages can be `call'ed or `cast'ed to them. To do that +%%% you can use `wpool:call' and `wpool:cast' as you would use the equivalent functions on +%%% `gen_server'. +%%% +%%%

Choosing a Strategy

+%%% +%%% Beyond the regular parameters for `gen_server', wpool also provides an extra optional parameter +%%% Strategy The strategy used to pick up the worker to perform the task. If not provided, +%%% the result of `wpool:default_strategy/0' is used. +%%% +%%% The available strategies are defined in the `t:wpool:strategy/0' type. +%%% +%%%

Watching a Pool

+%%% Wpool provides a way to get live statistics about a pool. To do that, you can use +%%% `wpool:stats/1'. -module(wpool). +%% @todo remove this line when https://github.com/AdRoll/rebar3_format/issues/356 is fixed +-format ignore. + -behaviour(application). -%% Copied from gen.erl --type debug_flag() :: trace | log | statistics | debug | {logfile, string()}. --type gen_option() :: - {timeout, timeout()} | {debug, [debug_flag()]} | {spawn_opt, [proc_lib:spawn_option()]}. --type gen_options() :: [gen_option()]. +-type overrun_warning() :: infinity | pos_integer(). +%% The number of milliseconds after which a task is considered overrun i.e., delayed. +%% +%% A warning is emitted using {@link overrun_handler()}. +%% +%% The task is monitored until it is finished, +%% thus more than one warning might be emitted for a single task. +%% +%% The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: +%% after each warning the overrun time is doubled (i.e. with `overrun_warning = 1000' warnings would +%% be emitted after 1000, 2000, 4000, 8000 ...). +%% +%% The default value for this setting is `infinity', i.e., no warnings are emitted. + +-type max_overrun_warnings() :: infinity | pos_integer(). +%% The maximum number of overrun warnings emitted before killing the worker with a delayed task. +%% +%% If this parameter is set to a value other than `infinity' the rounds of warnings become equally +%% timed (i.e. with `overrun_warning = 1000' and `max_overrun_warnings = 5' the task would be killed +%% after 5 seconds of execution). +%% +%% The default value for this setting is `infinity', i.e., delayed tasks are not killed. +%% +%% NOTE: As the worker is being killed it might cause worker's messages to be missing if you +%% are using a worker stategy other than `available_worker' (see worker {@link strategy()} below). + +-type overrun_handler() :: {Module :: module(), Fun :: atom()}. +%% The module and function to call when a task is overrun +%% +%% The default value for this setting is `{error_logger, warning_report}'. The function must be of +%% arity 1, and it will be called as`Module:Fun(Args)' where `Args' is a proplist with the following +%% reported values: +%% + +-type workers() :: pos_integer(). +%% The number of workers in the pool. +%% +%% The default value for this setting is `100' + +-type worker() :: {Module :: module(), InitArg :: term()}. +%% The `gen_server' module and the arguments to pass to the `init' callback. +%% +%% This is the module that each worker will run and the `InitArgs' to use on the corresponding +%% `start_link' call used to initiate it. +%% +%% The default value for this setting is `{wpool_worker, undefined}'. That means that if you don't +%% provide a worker implementation, the pool will be generated with this default one. +%% See {@link wpool_worker} for details. + +-type worker_opt() :: gen_server:start_opt(). +%% Server options that will be passed to each `gen_server' worker. +%% +%% These are the same as described at the `gen_server' documentation. + +-type worker_shutdown() :: worker_shutdown(). +%% The `shutdown' option to be used over the individual workers. +%% +%% Defaults to `5000'. See {@link wpool_process_sup} for more details. + +-type supervisor_strategy() :: supervisor:sup_flags(). +%% Supervision strategy to use over the individual workers. +%% +%% Defaults to `{one_for_one, 5, 60}'. See {@link wpool_process_sup} for more details. + +-type pool_sup_shutdown() :: brutal_kill | timeout(). +%% The `shutdown' option to be used over the supervisor that supervises the workers. +%% +%% Defaults to `brutal_kill'. See {@link wpool_process_sup} for more details. + +-type pool_sup_period() :: non_neg_integer(). +%% The supervision period to use over the supervisor that supervises the workers. +%% +%% Defaults to `60'. See {@link wpool_pool} for more details. + +-type pool_sup_intensity() :: non_neg_integer(). +%% The supervision intensity to use over the supervisor that supervises the workers. +%% +%% Defaults to `5'. See {@link wpool_pool} for more details. + +-type queue_type() :: wpool_queue_manager:queue_type(). +%% Order in which requests will be stored and handled by workers. +%% +%% This option can take values `lifo' or `fifo'. Defaults to `fifo'. + +-type enable_callbacks() :: boolean(). +%% A boolean value determining if `event_manager' should be started for callback modules. +%% +%% Defaults to `false'. + +-type callbacks() :: [module()]. +%% Initial list of callback modules implementing `wpool_process_callbacks' to be +%% called on certain worker events. +%% +%% This options will only work if the {@link enable_callbacks()} is set to true. +%% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and +%% `wpool_pool:remove_callback_module/2'. + -type name() :: atom(). --type supervisor_strategy() :: {supervisor:strategy(), non_neg_integer(), pos_integer()}. +%% Name of the pool + -type option() :: - {overrun_warning, infinity | pos_integer()} | - {max_overrun_warnings, infinity | pos_integer()} | - {overrun_handler, {Module :: atom(), Fun :: atom()}} | - {workers, pos_integer()} | - {worker_opt, gen_options()} | - {worker, {Module :: atom(), InitArg :: term()}} | + {workers, workers()} | + {worker, worker()} | + {worker_opt, [worker_opt()]} | {strategy, supervisor_strategy()} | - {pool_sup_intensity, non_neg_integer()} | - {pool_sup_shutdown, brutal_kill | timeout()} | - {pool_sup_period, non_neg_integer()} | - {queue_type, wpool_queue_manager:queue_type()} | - {enable_callbacks, boolean()} | - {callbacks, [module()]}. + {worker_shutdown, worker_shutdown()} | + {overrun_handler, overrun_handler()} | + {overrun_warning, overrun_warning()} | + {max_overrun_warnings, max_overrun_warnings()} | + {pool_sup_intensity, pool_sup_intensity()} | + {pool_sup_shutdown, pool_sup_shutdown()} | + {pool_sup_period, pool_sup_period()} | + {queue_type, queue_type()} | + {enable_callbacks, enable_callbacks()} | + {callbacks, callbacks()}. +%% Options that can be provided to a new pool. +%% +%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks +%% that take a list of these options as a parameter. + -type custom_strategy() :: fun(([atom()]) -> Atom :: atom()). +%% A callback that gets the pool name and returns a worker's name. + -type strategy() :: best_worker | random_worker | @@ -48,8 +198,51 @@ next_available_worker | {hash_worker, term()} | custom_strategy(). +%% Strategy to use when choosing a worker. +%% +%%

`best_worker'

+%% Picks the worker with the shortest queue of messages. Loosely based on this +%% article: [https://lethain.com/load-balancing-across-erlang-process-groups/]. +%% +%% This strategy is usually useful when your workers always perform the same task, +%% or tasks with expectedly similar runtimes. +%% +%%

`random_worker'

+%% Just picks a random worker. This strategy is the fastest one to select a worker. +%% It's ideal if your workers will perform many short tasks. +%% +%%

`next_worker'

+%% Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks. +%% +%%

`available_worker'

+%% Instead of just picking one of the workers in the queue and sending the request to it, this +%% strategy queues the request and waits until a worker is available to perform it. That may render +%% the worker selection part of the process much slower (thus generating the need for an additional +%% parameter: `Worker_Timeout' that controls how many milliseconds the client is willing to spend +%% in that, regardless of the global `Timeout' for the call). +%% +%% This strategy ensures that, if a worker crashes, no messages are lost in its message queue. +%% It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as +%% other worker is free it can pick up the next task in the list. +%% +%%

`next_available_worker'

+%% In a way, this strategy behaves like `available_worker' in the sense that it will pick the first +%% worker that it can find which is not running any task at the moment, but the difference is that +%% it will fail if all workers are busy. +%% +%%

`{hash_worker, Key}'

+%% This strategy takes a `Key' and selects a worker using `erlang:phash2/2'. This ensures that tasks +%% classified under the same key will be delivered to the same worker, which is useful to classify +%% events by key and work on them sequentially on the worker, distributing different keys across +%% different workers. +%% +%%

{@link custom_strategy()}

+%% A callback that gets the pool name and returns a worker's name. + -type worker_stats() :: [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}]. +%% Statistics about a worker in a pool. + -type stats() :: [{pool, name()} | {supervisor, pid()} | @@ -58,11 +251,12 @@ {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}]. +%% Statistics about a given live pool. -export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]). -export([start/0, start/2, stop/0, stop/1]). --export([start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]). +-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]). -export([stop_pool/1, stop_sup_pool/1]). -export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]). -export([send_request/2, send_request/3, send_request/4]). @@ -111,7 +305,17 @@ start_pool(Name) -> start_pool(Name, Options) -> wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)). -%% @doc Stops the pool +%% @doc Builds a child specification to pass to a supervisor. +-spec child_spec(name(), [option()]) -> supervisor:child_spec(). +child_spec(Name, Options) -> + FullOptions = wpool_utils:add_defaults(Options), + #{id => Name, + start => {wpool, start_pool, [Name, FullOptions]}, + restart => permanent, + shutdown => infinity, + type => supervisor}. + +%% @doc Stops a pool that doesn't belong to `wpool_sup'. -spec stop_pool(name()) -> true. stop_pool(Name) -> case whereis(Name) of @@ -132,7 +336,7 @@ start_sup_pool(Name) -> start_sup_pool(Name, Options) -> wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)). -%% @doc Stops the pool +%% @doc Stops a pool supervised by `wpool_sup' supervision tree. -spec stop_sup_pool(name()) -> ok. stop_sup_pool(Name) -> wpool_sup:stop_pool(Name). @@ -218,12 +422,16 @@ send_request(Sup, Call, Strategy, _Timeout) -> wpool_process:send_request( wpool_pool:Strategy(Sup), Call). -%% @doc Retrieves a snapshot of the pool stats +%% @doc Retrieves a snapshot of statistics for all pools. +%% +%% See `t:stats/0' for details on the return type. -spec stats() -> [stats()]. stats() -> wpool_pool:stats(). -%% @doc Retrieves a snapshot of a given pool stats +%% @doc Retrieves a snapshot of statistics for a a given pool. +%% +%% See `t:stats/0' for details on the return type. -spec stats(name()) -> stats(). stats(Sup) -> wpool_pool:stats(Sup). @@ -235,11 +443,18 @@ get_workers(Sup) -> wpool_pool:get_workers(Sup). %% @doc Casts a message to all the workers within the given pool. +%% +%% NOTE: These messages don't get queued, they go straight to the worker's message queues, so +%% if you're using available_worker strategy to balance the charge and you have some tasks queued up +%% waiting for the next available worker, the broadcast will reach all the workers before the +%% queued up tasks. -spec broadcast(wpool:name(), term()) -> ok. broadcast(Sup, Cast) -> wpool_pool:broadcast(Sup, Cast). -%% @doc Calls all the workers within the given pool async and waits for the responses synchronously +%% @doc Calls all the workers within the given pool async and waits for the responses synchronously. +%% +%% If one worker times out, the entire call is considered timed-out. -spec broadcall(wpool:name(), term(), timeout()) -> {[Replies :: term()], [Errors :: term()]}. broadcall(Sup, Call, Timeout) -> diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index e72bfe6..3314bda 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -11,9 +11,14 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @author Fernando Benavides -%%% @doc A pool of workers. If you want to put it in your supervisor tree, -%%% remember it's a supervisor. +%%% @doc Top supervisor for a `worker_pool'. +%%% +%%% This supervisor supervises `wpool_process_sup' (which is the worker's supervisor) together with +%%% auxiliary servers that help keep the whole pool running and in order. +%%% +%%% The strategy of this supervisor must be `one_for_all' but the intensity and period may be +%%% changed from their defaults by the `t:wpool:pool_sup_intensity()' and +%%% `t:wpool:pool_sup_intensity()' options respectively. -module(wpool_pool). -behaviour(supervisor). @@ -275,7 +280,7 @@ next(Next, #wpool{next = Atomic} = Wpool) -> Wpool. %% @doc Adds a callback module. -%% The module must implement the
wpool_process_callbacks
behaviour. +%% The module must implement the `wpool_process_callbacks' behaviour. -spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}. add_callback_module(Pool, Module) -> EventManager = event_manager_name(Pool), @@ -371,7 +376,10 @@ init({Name, Options}) -> SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5), SupPeriod = proplists:get_value(pool_sup_period, Options, 60), - SupStrategy = {one_for_all, SupIntensity, SupPeriod}, + SupStrategy = + #{strategy => one_for_all, + intensity => SupIntensity, + period => SupPeriod}, {ok, {SupStrategy, Children}}. %% @private diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 4a88ab7..dff3d84 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -11,7 +11,7 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @author Fernando Benavides +%%% @private %%% @doc Decorator over `gen_server' that lets `wpool_pool' %%% control certain aspects of the execution -module(wpool_process). diff --git a/src/wpool_process_sup.erl b/src/wpool_process_sup.erl index f0b2a23..10dc9e6 100644 --- a/src/wpool_process_sup.erl +++ b/src/wpool_process_sup.erl @@ -11,7 +11,7 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @private +%%% @doc This is the supervisor that supervises the `gen_server' workers specifically. -module(wpool_process_sup). -behaviour(supervisor). diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl index 9c4c623..25bd93a 100644 --- a/src/wpool_utils.erl +++ b/src/wpool_utils.erl @@ -11,7 +11,6 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @author Felipe Ripoll %%% @doc Common functions for wpool_process and other modules. -module(wpool_utils). diff --git a/src/wpool_worker.erl b/src/wpool_worker.erl index eb8f909..3bc9176 100644 --- a/src/wpool_worker.erl +++ b/src/wpool_worker.erl @@ -11,8 +11,9 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @author Fernando Benavides %%% @doc Default instance for `wpool_process' +%%% +%%% It is a module that implements a very simple RPC-like interface. -module(wpool_worker). -behaviour(gen_server). diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index 24a0a52..9abfee4 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -27,8 +27,8 @@ -export([init_per_suite/1, end_per_suite/1]). -export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1, kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1, - overrun_handler2/1, default_options/1, complete_coverage/1, broadcall/1, broadcast/1, - send_request/1, worker_killed_stats/1]). + overrun_handler2/1, default_options/1, complete_coverage/1, child_spec/1, broadcall/1, + broadcast/1, send_request/1, worker_killed_stats/1]). -elvis([{elvis_style, no_block_expressions, disable}]). @@ -45,6 +45,7 @@ all() -> default_strategy, default_options, complete_coverage, + child_spec, broadcast, broadcall, send_request, @@ -385,6 +386,14 @@ complete_coverage(_Config) -> {comment, []}. +-spec child_spec(config()) -> {comment, []}. +child_spec(_Config) -> + ct:comment("Verify child_spec is correct"), + ChildSpec = wpool:child_spec(child_spec, []), + ok = supervisor:check_childspecs([ChildSpec]), + + {comment, []}. + -spec broadcall(config()) -> {comment, []}. broadcall(_Config) -> Pool = broadcall,