Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmetics changes to improve readability, types, and upgrades #185

Merged
merged 9 commits into from
Jun 17, 2024
9 changes: 5 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
workflow_dispatch:


jobs:
test:
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
strategy:
matrix:
otp_vsn: ['26.2', '25.3', '24.3']
rebar_vsn: ['3.22.0']
otp_vsn: ['27', '26', '25']
rebar_vsn: ['3.23.0']
test-type: ['regular', 'integration']
runs-on: 'ubuntu-22.04'
runs-on: 'ubuntu-24.04'
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down
3 changes: 0 additions & 3 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ Amoc supports the following generic configuration parameters:
* default value - empty list (`[]`)
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`

* `api_port` - a port for the amoc REST interfaces:
* default value - 4000
* example: `AMOC_API_PORT="4000"`

* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
for two consecutive users:
Expand Down
109 changes: 61 additions & 48 deletions integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
-module(dummy_helper).

-required_variable(#{name=>dummy_var, description=>"dummy_var",
default_value=>default_value}).
-include_lib("stdlib/include/assert.hrl").

-required_variable(#{name => dummy_var,
description => "dummy_var",
default_value => default_value}).

-define(comment(U), io_lib:format("Condition failed with last users distribution ~n~p", [U])).

%% amoc_dist testing function
-export([test_amoc_dist/0]).
Expand All @@ -11,65 +16,73 @@ test_amoc_dist() ->
Master = amoc_cluster:master_node(),
Slaves = amoc_cluster:slave_nodes(),
%% check the status of the nodes
disabled = rpc:call(Master, amoc_controller, get_status, []),
[{running, #{scenario := dummy_scenario}} = rpc:call(Node, amoc_controller, get_status, [])
|| Node <- Slaves],
%% check user ids
{N1, Nodes1, Ids1, Max1} = get_users_info(Slaves),
true = N1 > 0,
N1 = Max1,
Ids1 = lists:seq(1, N1),
?assertEqual(disabled, get_status(Master)),
[ ?assertMatch({running, #{scenario := dummy_scenario}}, get_status(Node)) || Node <- Slaves],
%% check user ids, users have all been started at the first two nodes
{N1, Max1, Nodes1, Ids1, Users1} = get_users_info(Slaves),
?assert(N1 > 0),
?assertEqual(N1, Max1, ?comment(Users1)),
?assertEqual(Ids1, lists:seq(1, N1), ?comment(Users1)),
[AddedNode] = Slaves -- Nodes1,
DenysGonchar marked this conversation as resolved.
Show resolved Hide resolved
%% add 20 users
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
timer:sleep(3000),
{N2, Nodes2, Ids2, Max2} = get_users_info(Slaves),
N2 = Max2,
Ids2 = lists:seq(1, N2),
[AddedNode] = Nodes2 -- Nodes1,
N2 = N1 + 20,
add_and_wait(Master, 20),
{N2, Max2, Nodes2, Ids2, Users2} = get_users_info(Slaves),
?assertEqual(N2, Max2, ?comment(Users2)),
?assertEqual(Ids2, lists:seq(1, N2), ?comment(Users2)),
?assertEqual([AddedNode], Nodes2 -- Nodes1, ?comment(Users2)),
?assertEqual(N2, N1 + 20, ?comment(Users2)),
%% remove 10 users
{ok, _} = rpc:call(Master, amoc_dist, remove, [10, true]),
timer:sleep(3000),
{N3, Nodes3, _Ids3, Max3} = get_users_info(Slaves),
Nodes2 = Nodes3,
Max3 = Max2,
N2 = N3 + 10,
remove_and_wait(Master, 10),
{N3, Max3, Nodes3, _Ids3, Users3} = get_users_info(Slaves),
?assertEqual(N2 - 10, N3, ?comment(Users3)),
?assertEqual(Max2, Max3, ?comment(Users3)),
?assertEqual(Nodes2, Nodes3, ?comment(Users3)),
%% try to remove N3 users
{ok, Ret} = rpc:call(Master, amoc_dist, remove, [N3, true]),
Ret = remove_and_wait(Master, N3),
RemovedN = lists:sum([N || {_, N} <- Ret]),
timer:sleep(3000),
{N4, Nodes4, Ids4, _Max4} = get_users_info(Slaves),
Nodes1 = Nodes4,
N3 = N4 + RemovedN,
true = RemovedN < N3,
{N4, _Max4, Nodes4, Ids4, Users4} = get_users_info(Slaves),
?assertEqual(N3 - RemovedN, N4, ?comment(Users4)),
?assertEqual(Nodes1, Nodes4, ?comment(Users4)),
?assert(RemovedN < N3),
%% add 20 users
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
timer:sleep(3000),
{N5, Nodes5, Ids5, Max5} = get_users_info(Slaves),
Nodes2 = Nodes5,
Max5 = Max2 + 20,
N5 = N4 + 20,
true = Ids5 -- Ids4 =:= lists:seq(Max2 + 1, Max5),
add_and_wait(Master, 20),
{N5, Max5, Nodes5, Ids5, Users5} = get_users_info(Slaves),
?assertEqual(Nodes2, Nodes5, ?comment(Users5)),
?assertEqual(Max5, Max2 + 20, ?comment(Users5)),
?assertEqual(N5, N4 + 20, ?comment(Users5)),
?assertEqual(Ids5 -- Ids4, lists:seq(Max2 + 1, Max5), ?comment(Users5)),
%% terminate scenario
{ok,_} = rpc:call(Master, amoc_dist, stop, []),
timer:sleep(3000),
[{finished, dummy_scenario} = rpc:call(Node, amoc_controller, get_status, [])
|| Node <- Slaves],
stop(Master),
[ ?assertEqual({finished, dummy_scenario}, get_status(Node)) || Node <- Slaves],
%% return expected value
amoc_dist_works_as_expected
catch
C:E:S ->
{error, {C, E, S}}
{C, E, S}
end.

get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
Distrib = [ {Node, erpc:call(Node, amoc_users_sup, get_all_children, [])} || Node <- SlaveNodes ],
Ids = lists:usort([Id || {_Node, Users} <- Distrib, {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, Users} <- Distrib, [] =/= Users]),
N = length(Ids),
N = length(Users),
MaxId = lists:max(Ids),
{N, Nodes, Ids, MaxId}.
{N, MaxId, Nodes, Ids, Distrib}.

add_and_wait(Master, Num) ->
{ok, Ret} = erpc:call(Master, amoc_dist, add, [Num]),
timer:sleep(3000),
Ret.

remove_and_wait(Master, Num) ->
{ok, Ret} = erpc:call(Master, amoc_dist, remove, [Num, true]),
timer:sleep(3000),
Ret.

stop(Master) ->
{ok, Ret} = erpc:call(Master, amoc_dist, stop, []),
timer:sleep(3000),
Ret.

get_status(Node) ->
erpc:call(Node, amoc_controller, get_status, []).
2 changes: 1 addition & 1 deletion src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
-spec start_link() -> gen_server:start_ret().
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

Expand Down
2 changes: 1 addition & 1 deletion src/amoc_scenario.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ terminate(Scenario, State) ->
%% if scenario module exports both functions, `Scenario:start/2' is used.
%%
%% Runs on the user process and spans a `[amoc, scenario, user, _]' telemetry event.
-spec start(amoc:scenario(), user_id(), state()) -> any().
-spec start(amoc:scenario(), user_id(), state()) -> term().
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any() highlights that returned value is ignored.

start(Scenario, Id, State) ->
Metadata = #{scenario => Scenario, state => State, user_id => Id},
Span = case {erlang:function_exported(Scenario, start, 2),
Expand Down
18 changes: 9 additions & 9 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(WORKER(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(SUP(I, Type), {I, {I, start_link, []}, permanent, infinity, Type, [I]}).
-define(WORKER(I), {I, {I, start_link, []}, permanent, 5000, worker, [I]}).
-define(SUP(I), {I, {I, start_link, []}, permanent, infinity, supervisor, [I]}).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this was a bit stupid :)


%% ===================================================================
%% API functions
%% ===================================================================

-spec start_link() -> {ok, Pid :: pid()}.
-spec start_link() -> supervisor:startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

Expand All @@ -30,10 +30,10 @@ start_link() ->
init([]) ->
{ok, {#{strategy => one_for_all, intensity => 0},
[
?SUP(amoc_users_sup, supervisor),
?SUP(amoc_throttle_sup, supervisor),
?SUP(amoc_coordinator_sup, supervisor),
?WORKER(amoc_controller, worker),
?WORKER(amoc_cluster, worker),
?WORKER(amoc_code_server, worker)
?SUP(amoc_users_sup),
?SUP(amoc_throttle_sup),
?SUP(amoc_coordinator_sup),
?WORKER(amoc_controller),
?WORKER(amoc_cluster),
?WORKER(amoc_code_server)
]}}.
12 changes: 6 additions & 6 deletions src/dist/amoc_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ get_state() ->
case {amoc_cluster:master_node(), get_param(state)} of
{undefined, undefined} -> idle;
{_, {ok, State}} -> State;
{Node, undefined} -> rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
{Node, undefined} -> erpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
end.

%% ------------------------------------------------------------------
Expand Down Expand Up @@ -172,7 +172,7 @@ setup_slave_node(Node) ->
{ok, _} ->
{ok, Scenario} = get_param(scenario),
{ok, Settings} = get_param(settings),
rpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
erpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
Error -> Error
end.

Expand All @@ -196,7 +196,7 @@ add_users(Result, LastId, Count, [Node | T] = Nodes) ->
0 ->
add_users([{Node, {ok, node_skipped}} | Result], LastId, Count, T);
N ->
Ret = rpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
Ret = erpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
add_users([{Node, Ret} | Result], LastId + N, Count - N, T)
end.

Expand All @@ -213,7 +213,7 @@ remove_users(Result, Count, ForceRemove, [Node | T] = Nodes) ->
0 ->
remove_users([{Node, {ok, node_skipped}} | Result], Count, ForceRemove, T);
N ->
Ret = rpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
Ret = erpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
remove_users([{Node, Ret} | Result], Count - N, ForceRemove, T)
end.

Expand All @@ -225,7 +225,7 @@ update_settings_on_nodes(Settings, Nodes) ->
-spec update_settings_on_node(amoc_config:settings(), node()) ->
ok | {badrpc, any()} | {error, any()}.
update_settings_on_node(Settings, Node) ->
rpc:call(Node, amoc_controller, update_settings, [Settings]).
erpc:call(Node, amoc_controller, update_settings, [Settings]).

-spec stop_cluster() -> {ok, any()} | {error, any()}.
stop_cluster() ->
Expand All @@ -234,7 +234,7 @@ stop_cluster() ->
{_, []} -> {error, no_slave_nodes};
{MasterNode, Slaves} ->
set_state(stopped),
Result = [{Node, rpc:call(Node, amoc_controller, stop_scenario, [])} ||
Result = [{Node, erpc:call(Node, amoc_controller, stop_scenario, [])} ||
Node <- Slaves],
maybe_error(Result);
{_, _} -> {error, not_a_master}
Expand Down
9 changes: 5 additions & 4 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
handle_info/2,
handle_cast/2,
handle_continue/2,
format_status/2]).
format_status/1]).

-define(PG_SCOPE, amoc_throttle).
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute
Expand Down Expand Up @@ -143,12 +143,13 @@ handle_continue(maybe_run_fn, State) ->
NewState = maybe_run_fn(State),
{noreply, NewState, timeout(NewState)}.

-spec format_status(term(), term()) -> term().
format_status(_Opt, [_PDict, State]) ->
-spec format_status(gen_server:format_status()) -> gen_server:format_status().
format_status(#{state := #state{} = State} = FormatStatus) ->
ScheduleLen = length(State#state.schedule),
ScheduleRevLen = length(State#state.schedule_reversed),
State1 = setelement(#state.schedule, State, ScheduleLen),
setelement(#state.schedule_reversed, State1, ScheduleRevLen).
State2 = setelement(#state.schedule_reversed, State1, ScheduleRevLen),
FormatStatus#{state := State2}.

%%------------------------------------------------------------------------------
%% internal functions
Expand Down
5 changes: 2 additions & 3 deletions src/users/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

-spec stop() -> no_return().
-spec stop() -> ok.
stop() ->
stop(self(), false).

-spec stop(pid(), boolean()) -> ok.
stop(Pid, Force) when is_pid(Pid) ->
amoc_users_sup:stop_child(Pid, Force).

-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) ->
no_return().
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
init(Parent, Scenario, Id, State) ->
proc_lib:init_ack(Parent, {ok, self()}),
process_flag(trap_exit, true),
Expand Down
4 changes: 2 additions & 2 deletions src/users/amoc_users_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ terminate_all_children() ->
-spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid().
get_sup_for_user_id(Id) ->
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
Index = Id rem SupCount + 1,
Index = erlang:phash2(Id, SupCount) + 1,
Copy link
Collaborator

@DenysGonchar DenysGonchar Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if phash makes a better distribution than a simple rem.

element(Index, Supervisors).

%% assign which users each worker will be requested to add
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) ->
Index = Id rem SupCount + 1,
Index = erlang:phash2(Id, SupCount) + 1,
ChosenSup = element(Index, Supervisors),
Vs = maps:get(ChosenSup, Acc),
NewAcc = Acc#{ChosenSup := [Id | Vs]},
Expand Down
Loading
Loading