Skip to content

Commit

Permalink
Accept ranges of users in amoc_coordinator
Browse files Browse the repository at this point in the history
Sometimes we want to add some variance to the test and we want the coordinator
to choose groups of roughly N users instead of uniformly choosing all groups to
be of the exact same size. For this, we could give the coordinator a range of
the type {Min, Max} were every group will be chosen to have N in the interval
[Min, Max]. `rand:uniform/1` can be used to ensure that the distribution will be
pretty uniform, and in a sufficiently large scale, converge to N.

As a good to have, the coordinator could keep statistics of the distribution of
N in the interval.
  • Loading branch information
NelsonVides committed Feb 21, 2024
1 parent 7b1ea3d commit 912562b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
6 changes: 4 additions & 2 deletions guides/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ The coordinator reacts to new users showing up in a system, according to the *Co
The *Coordination Plan* consists of *Coordination Items*, and each of them is defined as one of the following: `{NumberOfUsers, CoordinationActions}`.
- When the `NumberOfUsers` is set to `all`, then only *Coordination Actions* with the arities `/1, /2` are handled.
The *Coordination Items* with `all` are triggered by the `timeout` event type.
- When the `NumberOfUsers` is set to a positive integer, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.
- When the `NumberOfUsers` is set to a positive integer or a range, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.

Note that `NumberOfUsers` can be a range, in which case a new integer within the range will be randomly selected every time the coordinator fills a batch, to ensure a non-equal but uniform distribution of coordination.

The timeout timer is reset by calling the `add` function.
A new batch size is set in the `NumberOfUsers`. Each user in the batch calls the `add` function registering to the coordinator and triggering the *Coordination Plan*.
If more then one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
If more than one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
For example if the *Coordination Plan* is `[{2, Act1}, {3, Act2}]` then on the 6th user calling `add`, `Act1` will be called with 2 users passed and `Act2` will be called with 3 users passed.

*Coordination Actions* may be one of the following:
Expand Down
31 changes: 21 additions & 10 deletions src/coordinator/amoc_coordinator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
-type action() :: amoc_coordinator:coordination_action().
-type data() :: amoc_coordinator:coordination_data().

-record(state, {required_n = all :: pos_integer() | all,
-record(state, {configured = all :: {pos_integer(), pos_integer()} | pos_integer() | all,
required_n = all :: pos_integer() | all,
n = 0 :: non_neg_integer(),
actions = [] :: [action()],
collect_data = true :: boolean(),
accumulator = [] :: [data()]}).
acc = [] :: [data()]}).

-type state() :: #state{}.

Expand Down Expand Up @@ -58,9 +59,11 @@ add(Pid, Data) ->

-spec init(amoc_coordinator:normalized_coordination_item()) -> {ok, state()}.
init({NoOfUsers, Actions}) ->
State = #state{required_n = NoOfUsers, actions = Actions},
{ok, State#state{collect_data = is_acc_required(Actions)}}.

State = #state{configured = NoOfUsers,
required_n = calculate_n(NoOfUsers),
actions = Actions,
collect_data = is_acc_required(Actions)},
{ok, State}.

-spec handle_call({reset, reset | timeout | stop}, term(), state()) ->
{reply, ok, state()} | {stop, normal, ok, state()}.
Expand All @@ -84,12 +87,12 @@ is_acc_required(Actions) ->
end, Actions).

-spec add_data(data(), state()) -> state().
add_data(Data, #state{n = N, accumulator = Acc} = State) ->
add_data(Data, #state{n = N, acc = Acc} = State) ->
NewState = case State#state.collect_data of
false ->
State#state{n = N + 1};
true ->
State#state{n = N + 1, accumulator = [Data | Acc]}
State#state{n = N + 1, acc = [Data | Acc]}
end,
maybe_reset_state(NewState).

Expand All @@ -100,13 +103,15 @@ maybe_reset_state(State) ->
State.

-spec reset_state(event_type(), state()) -> state().
reset_state(Event, #state{actions = Actions,
accumulator = Acc,
reset_state(Event, #state{configured = Config,
actions = Actions,
acc = Acc,
n = N, required_n = ReqN} = State) ->
amoc_telemetry:execute([coordinator, execute], #{count => N},
#{event => Event, configured => ReqN}),
[execute_action(Action, {Event, N}, Acc) || Action <- Actions],
State#state{accumulator = [], n = 0}.
NewN = calculate_n(Config),
State#state{required_n = NewN, n = 0, acc = []}.

-spec execute_action(action(), event(), [data()]) -> any().
execute_action(Action, Event, _) when is_function(Action, 1) ->
Expand All @@ -125,6 +130,12 @@ safe_executions(Fun, Args) ->
_:_ -> ok
end.

-spec calculate_n(amoc_coordinator:coordination_number_of_users()) -> all | pos_integer().
calculate_n({Min, Diff}) ->
Min + rand:uniform(Diff);
calculate_n(Value) ->
Value.

-spec distinct_pairs(fun((data(), data()) -> any()), [data()]) -> any().
distinct_pairs(Fun, []) ->
Fun(undefined, undefined);
Expand Down
20 changes: 20 additions & 0 deletions test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ all() ->
plan_normalises_successfully,
ordering_plan_sets_all_at_the_end,
failing_action_does_not_kill_the_worker,
execute_with_range_without_timeout,
execute_plan_without_timeout,
reset_plan_without_timeout,
execute_plan_with_timeout
Expand Down Expand Up @@ -45,6 +46,25 @@ init_per_testcase(_, Config) ->
end_per_testcase(_Config) ->
ok.

execute_with_range_without_timeout(_Config) ->
N = 20, Name = ?FUNCTION_NAME,

Plan = [ Range = {{5, 10}, mocked_action(range, 1)},
All = {all, mocked_action(all, 1)}],

?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)),
[amoc_coordinator:add(Name, User) || User <- lists:seq(1, N)],

amoc_coordinator:stop(Name),
meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000),

History = meck:history(?MOCK_MOD),
NumOfEvents = length(History),
?assert(3 =< NumOfEvents andalso NumOfEvents =< 6),

nothing_after_tags(History, [all]),
assert_telemetry_events(Name, [start, {N, add}, stop]).

execute_plan_without_timeout(_Config) ->
N = 4, Name = ?FUNCTION_NAME,

Expand Down

0 comments on commit 912562b

Please sign in to comment.