Skip to content

Commit

Permalink
Merge pull request #174 from esl/coordinator/ranges
Browse files Browse the repository at this point in the history
Accept ranges of users in `amoc_coordinator`
  • Loading branch information
arcusfelis authored Mar 5, 2024
2 parents f1d9152 + 67a0941 commit d04348c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
6 changes: 4 additions & 2 deletions guides/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ The coordinator reacts to new users showing up in a system, according to the *Co
The *Coordination Plan* consists of *Coordination Items*, and each of them is defined as one of the following: `{NumberOfUsers, CoordinationActions}`.
- When the `NumberOfUsers` is set to `all`, then only *Coordination Actions* with the arities `/1, /2` are handled.
The *Coordination Items* with `all` are triggered by the `timeout` event type.
- When the `NumberOfUsers` is set to a positive integer, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.
- When the `NumberOfUsers` is set to a positive integer or a range, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.

Note that `NumberOfUsers` can be a range, in which case a new integer within the range will be randomly selected every time the coordinator fills a batch, to ensure a non-equal but uniform distribution of coordination.

The timeout timer is reset by calling the `add` function.
A new batch size is set in the `NumberOfUsers`. Each user in the batch calls the `add` function registering to the coordinator and triggering the *Coordination Plan*.
If more then one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
If more than one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
For example if the *Coordination Plan* is `[{2, Act1}, {3, Act2}]` then on the 6th user calling `add`, `Act1` will be called with 2 users passed and `Act2` will be called with 3 users passed.

*Coordination Actions* may be one of the following:
Expand Down
39 changes: 25 additions & 14 deletions src/coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@

-type name() :: atom().

-type coordination_data() :: {pid(), Data :: any()}.
-type data() :: {pid(), Data :: any()}.

-type maybe_coordination_data() :: coordination_data() | undefined.
-type maybe_coordination_data() :: data() | undefined.

-type event() :: coordinator_timeout | reset_coordinator | {coordinate, {pid(), term()}}.
-type coordination_event_type() :: coordinate | timeout | stop | reset.
-type event_type() :: coordinate | timeout | stop | reset.

-type coordination_event() :: {coordination_event_type(), non_neg_integer()}.
-type coordination_event() :: {event_type(), non_neg_integer()}.

-type coordination_action() ::
fun((coordination_event(), [coordination_data()]) -> any()) |
-type action() ::
fun((coordination_event(), [data()]) -> any()) |
fun((coordination_event(), maybe_coordination_data(), maybe_coordination_data()) -> any()) |
fun((coordination_event()) -> any()).

-type coordination_actions() :: [coordination_action()] | coordination_action().
-type coordination_actions() :: [action()] | action().

-type coordination_item() :: {NoOfUsers :: pos_integer() | all,
coordination_actions()}.
-type num_of_users() :: pos_integer() | {Min :: pos_integer(), Max :: pos_integer()} | all.

-type coordination_item() :: {num_of_users(), coordination_actions()}.

-type normalized_coordination_item() :: {NoOfUsers :: pos_integer() | all,
[coordination_action()]}.
[action()]}.

-type plan() :: [coordination_item()] | coordination_item().

Expand All @@ -51,10 +52,11 @@
-export_type([name/0,
event/0,
plan/0,
coordination_event_type/0,
event_type/0,
action/0,
data/0,
num_of_users/0,
coordination_event/0,
coordination_action/0,
coordination_data/0,
normalized_coordination_item/0]).

%%%===================================================================
Expand Down Expand Up @@ -162,7 +164,11 @@ normalize_coordination_item({NoOfUsers, Action}) when is_function(Action) ->
normalize_coordination_item({NoOfUsers, Actions}) when ?IS_N_OF_USERS(NoOfUsers),
is_list(Actions) ->
[assert_action(NoOfUsers, A) || A <- Actions],
{NoOfUsers, Actions}.
{NoOfUsers, Actions};
normalize_coordination_item({{Min, Max}, Actions}) when ?IS_POS_INT(Min), ?IS_POS_INT(Max),
Max > Min, is_list(Actions) ->
[assert_action({Min, Max}, A) || A <- Actions],
{{Min, Max}, Actions}.

assert_action(all, Action) when is_function(Action, 1);
is_function(Action, 2) ->
Expand All @@ -171,4 +177,9 @@ assert_action(N, Action) when is_integer(N),
(is_function(Action, 1) orelse
is_function(Action, 2) orelse
is_function(Action, 3)) ->
ok;
assert_action({Min, Max}, Action) when is_integer(Min), is_integer(Max),
(is_function(Action, 1) orelse
is_function(Action, 2) orelse
is_function(Action, 3)) ->
ok.
37 changes: 24 additions & 13 deletions src/coordinator/amoc_coordinator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
handle_call/3,
handle_cast/2]).

-type event_type() :: amoc_coordinator:coordination_event_type().
-type event_type() :: amoc_coordinator:event_type().
-type event() :: amoc_coordinator:coordination_event().
-type action() :: amoc_coordinator:coordination_action().
-type data() :: amoc_coordinator:coordination_data().
-type action() :: amoc_coordinator:action().
-type data() :: amoc_coordinator:data().

-record(state, {required_n = all :: pos_integer() | all,
-record(state, {configured = all :: amoc_coordinator:num_of_users(),
required_n = all :: pos_integer() | all,
n = 0 :: non_neg_integer(),
actions = [] :: [action()],
collect_data = true :: boolean(),
accumulator = [] :: [data()]}).
acc = [] :: [data()]}).

-type state() :: #state{}.

Expand Down Expand Up @@ -58,9 +59,11 @@ add(Pid, Data) ->

-spec init(amoc_coordinator:normalized_coordination_item()) -> {ok, state()}.
init({NoOfUsers, Actions}) ->
State = #state{required_n = NoOfUsers, actions = Actions},
{ok, State#state{collect_data = is_acc_required(Actions)}}.

State = #state{configured = NoOfUsers,
required_n = calculate_n(NoOfUsers),
actions = Actions,
collect_data = is_acc_required(Actions)},
{ok, State}.

-spec handle_call({reset, reset | timeout | stop}, term(), state()) ->
{reply, ok, state()} | {stop, normal, ok, state()}.
Expand All @@ -84,12 +87,12 @@ is_acc_required(Actions) ->
end, Actions).

-spec add_data(data(), state()) -> state().
add_data(Data, #state{n = N, accumulator = Acc} = State) ->
add_data(Data, #state{n = N, acc = Acc} = State) ->
NewState = case State#state.collect_data of
false ->
State#state{n = N + 1};
true ->
State#state{n = N + 1, accumulator = [Data | Acc]}
State#state{n = N + 1, acc = [Data | Acc]}
end,
maybe_reset_state(NewState).

Expand All @@ -100,13 +103,15 @@ maybe_reset_state(State) ->
State.

-spec reset_state(event_type(), state()) -> state().
reset_state(Event, #state{actions = Actions,
accumulator = Acc,
reset_state(Event, #state{configured = Config,
actions = Actions,
acc = Acc,
n = N, required_n = ReqN} = State) ->
amoc_telemetry:execute([coordinator, execute], #{count => N},
#{event => Event, configured => ReqN}),
[execute_action(Action, {Event, N}, Acc) || Action <- Actions],
State#state{accumulator = [], n = 0}.
NewN = calculate_n(Config),
State#state{required_n = NewN, n = 0, acc = []}.

-spec execute_action(action(), event(), [data()]) -> any().
execute_action(Action, Event, _) when is_function(Action, 1) ->
Expand All @@ -125,6 +130,12 @@ safe_executions(Fun, Args) ->
_:_ -> ok
end.

-spec calculate_n(amoc_coordinator:num_of_users()) -> all | pos_integer().
calculate_n({Min, Max}) ->
Min - 1 + rand:uniform(Max - Min);
calculate_n(Value) ->
Value.

-spec distinct_pairs(fun((data(), data()) -> any()), [data()]) -> any().
distinct_pairs(Fun, []) ->
Fun(undefined, undefined);
Expand Down
20 changes: 20 additions & 0 deletions test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ all() ->
plan_normalises_successfully,
ordering_plan_sets_all_at_the_end,
failing_action_does_not_kill_the_worker,
execute_with_range_without_timeout,
execute_plan_without_timeout,
reset_plan_without_timeout,
execute_plan_with_timeout
Expand Down Expand Up @@ -45,6 +46,25 @@ init_per_testcase(_, Config) ->
end_per_testcase(_Config) ->
ok.

execute_with_range_without_timeout(_Config) ->
N = 20, Name = ?FUNCTION_NAME,

Plan = [ Range = {{5, 10}, mocked_action(range, 1)},
All = {all, mocked_action(all, 1)}],

?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)),
[amoc_coordinator:add(Name, User) || User <- lists:seq(1, N)],

amoc_coordinator:stop(Name),
meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000),

History = meck:history(?MOCK_MOD),
NumOfEvents = length(History),
?assert(3 =< NumOfEvents andalso NumOfEvents =< 5),

nothing_after_tags(History, [all]),
assert_telemetry_events(Name, [start, {N, add}, stop]).

execute_plan_without_timeout(_Config) ->
N = 4, Name = ?FUNCTION_NAME,

Expand Down

0 comments on commit d04348c

Please sign in to comment.