Skip to content

Commit

Permalink
Merge pull request #185 from esl/cosmetics
Browse files Browse the repository at this point in the history
Cosmetics changes to improve readability, types, and upgrades
  • Loading branch information
DenysGonchar authored Jun 17, 2024
2 parents 887e312 + 7d9299b commit 360c3a3
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 112 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
workflow_dispatch:


jobs:
test:
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
strategy:
matrix:
otp_vsn: ['26.2', '25.3', '24.3']
rebar_vsn: ['3.22.0']
otp_vsn: ['27', '26', '25']
rebar_vsn: ['3.23.0']
test-type: ['regular', 'integration']
runs-on: 'ubuntu-22.04'
runs-on: 'ubuntu-24.04'
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down
3 changes: 0 additions & 3 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ Amoc supports the following generic configuration parameters:
* default value - empty list (`[]`)
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`

* `api_port` - a port for the amoc REST interfaces:
* default value - 4000
* example: `AMOC_API_PORT="4000"`

* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
for two consecutive users:
Expand Down
109 changes: 61 additions & 48 deletions integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
-module(dummy_helper).

-required_variable(#{name=>dummy_var, description=>"dummy_var",
default_value=>default_value}).
-include_lib("stdlib/include/assert.hrl").

-required_variable(#{name => dummy_var,
description => "dummy_var",
default_value => default_value}).

-define(comment(U), io_lib:format("Condition failed with last users distribution ~n~p", [U])).

%% amoc_dist testing function
-export([test_amoc_dist/0]).
Expand All @@ -11,65 +16,73 @@ test_amoc_dist() ->
Master = amoc_cluster:master_node(),
Slaves = amoc_cluster:slave_nodes(),
%% check the status of the nodes
disabled = rpc:call(Master, amoc_controller, get_status, []),
[{running, #{scenario := dummy_scenario}} = rpc:call(Node, amoc_controller, get_status, [])
|| Node <- Slaves],
%% check user ids
{N1, Nodes1, Ids1, Max1} = get_users_info(Slaves),
true = N1 > 0,
N1 = Max1,
Ids1 = lists:seq(1, N1),
?assertEqual(disabled, get_status(Master)),
[ ?assertMatch({running, #{scenario := dummy_scenario}}, get_status(Node)) || Node <- Slaves],
%% check user ids, users have all been started at the first two nodes
{N1, Max1, Nodes1, Ids1, Users1} = get_users_info(Slaves),
?assert(N1 > 0),
?assertEqual(N1, Max1, ?comment(Users1)),
?assertEqual(Ids1, lists:seq(1, N1), ?comment(Users1)),
[AddedNode] = Slaves -- Nodes1,
%% add 20 users
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
timer:sleep(3000),
{N2, Nodes2, Ids2, Max2} = get_users_info(Slaves),
N2 = Max2,
Ids2 = lists:seq(1, N2),
[AddedNode] = Nodes2 -- Nodes1,
N2 = N1 + 20,
add_and_wait(Master, 20),
{N2, Max2, Nodes2, Ids2, Users2} = get_users_info(Slaves),
?assertEqual(N2, Max2, ?comment(Users2)),
?assertEqual(Ids2, lists:seq(1, N2), ?comment(Users2)),
?assertEqual([AddedNode], Nodes2 -- Nodes1, ?comment(Users2)),
?assertEqual(N2, N1 + 20, ?comment(Users2)),
%% remove 10 users
{ok, _} = rpc:call(Master, amoc_dist, remove, [10, true]),
timer:sleep(3000),
{N3, Nodes3, _Ids3, Max3} = get_users_info(Slaves),
Nodes2 = Nodes3,
Max3 = Max2,
N2 = N3 + 10,
remove_and_wait(Master, 10),
{N3, Max3, Nodes3, _Ids3, Users3} = get_users_info(Slaves),
?assertEqual(N2 - 10, N3, ?comment(Users3)),
?assertEqual(Max2, Max3, ?comment(Users3)),
?assertEqual(Nodes2, Nodes3, ?comment(Users3)),
%% try to remove N3 users
{ok, Ret} = rpc:call(Master, amoc_dist, remove, [N3, true]),
Ret = remove_and_wait(Master, N3),
RemovedN = lists:sum([N || {_, N} <- Ret]),
timer:sleep(3000),
{N4, Nodes4, Ids4, _Max4} = get_users_info(Slaves),
Nodes1 = Nodes4,
N3 = N4 + RemovedN,
true = RemovedN < N3,
{N4, _Max4, Nodes4, Ids4, Users4} = get_users_info(Slaves),
?assertEqual(N3 - RemovedN, N4, ?comment(Users4)),
?assertEqual(Nodes1, Nodes4, ?comment(Users4)),
?assert(RemovedN < N3),
%% add 20 users
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
timer:sleep(3000),
{N5, Nodes5, Ids5, Max5} = get_users_info(Slaves),
Nodes2 = Nodes5,
Max5 = Max2 + 20,
N5 = N4 + 20,
true = Ids5 -- Ids4 =:= lists:seq(Max2 + 1, Max5),
add_and_wait(Master, 20),
{N5, Max5, Nodes5, Ids5, Users5} = get_users_info(Slaves),
?assertEqual(Nodes2, Nodes5, ?comment(Users5)),
?assertEqual(Max5, Max2 + 20, ?comment(Users5)),
?assertEqual(N5, N4 + 20, ?comment(Users5)),
?assertEqual(Ids5 -- Ids4, lists:seq(Max2 + 1, Max5), ?comment(Users5)),
%% terminate scenario
{ok,_} = rpc:call(Master, amoc_dist, stop, []),
timer:sleep(3000),
[{finished, dummy_scenario} = rpc:call(Node, amoc_controller, get_status, [])
|| Node <- Slaves],
stop(Master),
[ ?assertEqual({finished, dummy_scenario}, get_status(Node)) || Node <- Slaves],
%% return expected value
amoc_dist_works_as_expected
catch
C:E:S ->
{error, {C, E, S}}
{C, E, S}
end.

get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
Distrib = [ {Node, erpc:call(Node, amoc_users_sup, get_all_children, [])} || Node <- SlaveNodes ],
Ids = lists:usort([Id || {_Node, Users} <- Distrib, {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, Users} <- Distrib, [] =/= Users]),
N = length(Ids),
N = length(Users),
MaxId = lists:max(Ids),
{N, Nodes, Ids, MaxId}.
{N, MaxId, Nodes, Ids, Distrib}.

add_and_wait(Master, Num) ->
{ok, Ret} = erpc:call(Master, amoc_dist, add, [Num]),
timer:sleep(3000),
Ret.

remove_and_wait(Master, Num) ->
{ok, Ret} = erpc:call(Master, amoc_dist, remove, [Num, true]),
timer:sleep(3000),
Ret.

stop(Master) ->
{ok, Ret} = erpc:call(Master, amoc_dist, stop, []),
timer:sleep(3000),
Ret.

get_status(Node) ->
erpc:call(Node, amoc_controller, get_status, []).
2 changes: 1 addition & 1 deletion src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
-spec start_link() -> gen_server:start_ret().
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

Expand Down
2 changes: 1 addition & 1 deletion src/amoc_scenario.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ terminate(Scenario, State) ->
%% if scenario module exports both functions, `Scenario:start/2' is used.
%%
%% Runs on the user process and spans a `[amoc, scenario, user, _]' telemetry event.
-spec start(amoc:scenario(), user_id(), state()) -> any().
-spec start(amoc:scenario(), user_id(), state()) -> term().
start(Scenario, Id, State) ->
Metadata = #{scenario => Scenario, state => State, user_id => Id},
Span = case {erlang:function_exported(Scenario, start, 2),
Expand Down
18 changes: 9 additions & 9 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(WORKER(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(SUP(I, Type), {I, {I, start_link, []}, permanent, infinity, Type, [I]}).
-define(WORKER(I), {I, {I, start_link, []}, permanent, 5000, worker, [I]}).
-define(SUP(I), {I, {I, start_link, []}, permanent, infinity, supervisor, [I]}).

%% ===================================================================
%% API functions
%% ===================================================================

-spec start_link() -> {ok, Pid :: pid()}.
-spec start_link() -> supervisor:startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

Expand All @@ -30,10 +30,10 @@ start_link() ->
init([]) ->
{ok, {#{strategy => one_for_all, intensity => 0},
[
?SUP(amoc_users_sup, supervisor),
?SUP(amoc_throttle_sup, supervisor),
?SUP(amoc_coordinator_sup, supervisor),
?WORKER(amoc_controller, worker),
?WORKER(amoc_cluster, worker),
?WORKER(amoc_code_server, worker)
?SUP(amoc_users_sup),
?SUP(amoc_throttle_sup),
?SUP(amoc_coordinator_sup),
?WORKER(amoc_controller),
?WORKER(amoc_cluster),
?WORKER(amoc_code_server)
]}}.
12 changes: 6 additions & 6 deletions src/dist/amoc_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ get_state() ->
case {amoc_cluster:master_node(), get_param(state)} of
{undefined, undefined} -> idle;
{_, {ok, State}} -> State;
{Node, undefined} -> rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
{Node, undefined} -> erpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
end.

%% ------------------------------------------------------------------
Expand Down Expand Up @@ -172,7 +172,7 @@ setup_slave_node(Node) ->
{ok, _} ->
{ok, Scenario} = get_param(scenario),
{ok, Settings} = get_param(settings),
rpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
erpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
Error -> Error
end.

Expand All @@ -196,7 +196,7 @@ add_users(Result, LastId, Count, [Node | T] = Nodes) ->
0 ->
add_users([{Node, {ok, node_skipped}} | Result], LastId, Count, T);
N ->
Ret = rpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
Ret = erpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
add_users([{Node, Ret} | Result], LastId + N, Count - N, T)
end.

Expand All @@ -213,7 +213,7 @@ remove_users(Result, Count, ForceRemove, [Node | T] = Nodes) ->
0 ->
remove_users([{Node, {ok, node_skipped}} | Result], Count, ForceRemove, T);
N ->
Ret = rpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
Ret = erpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
remove_users([{Node, Ret} | Result], Count - N, ForceRemove, T)
end.

Expand All @@ -225,7 +225,7 @@ update_settings_on_nodes(Settings, Nodes) ->
-spec update_settings_on_node(amoc_config:settings(), node()) ->
ok | {badrpc, any()} | {error, any()}.
update_settings_on_node(Settings, Node) ->
rpc:call(Node, amoc_controller, update_settings, [Settings]).
erpc:call(Node, amoc_controller, update_settings, [Settings]).

-spec stop_cluster() -> {ok, any()} | {error, any()}.
stop_cluster() ->
Expand All @@ -234,7 +234,7 @@ stop_cluster() ->
{_, []} -> {error, no_slave_nodes};
{MasterNode, Slaves} ->
set_state(stopped),
Result = [{Node, rpc:call(Node, amoc_controller, stop_scenario, [])} ||
Result = [{Node, erpc:call(Node, amoc_controller, stop_scenario, [])} ||
Node <- Slaves],
maybe_error(Result);
{_, _} -> {error, not_a_master}
Expand Down
9 changes: 5 additions & 4 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
handle_info/2,
handle_cast/2,
handle_continue/2,
format_status/2]).
format_status/1]).

-define(PG_SCOPE, amoc_throttle).
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute
Expand Down Expand Up @@ -143,12 +143,13 @@ handle_continue(maybe_run_fn, State) ->
NewState = maybe_run_fn(State),
{noreply, NewState, timeout(NewState)}.

-spec format_status(term(), term()) -> term().
format_status(_Opt, [_PDict, State]) ->
-spec format_status(gen_server:format_status()) -> gen_server:format_status().
format_status(#{state := #state{} = State} = FormatStatus) ->
ScheduleLen = length(State#state.schedule),
ScheduleRevLen = length(State#state.schedule_reversed),
State1 = setelement(#state.schedule, State, ScheduleLen),
setelement(#state.schedule_reversed, State1, ScheduleRevLen).
State2 = setelement(#state.schedule_reversed, State1, ScheduleRevLen),
FormatStatus#{state := State2}.

%%------------------------------------------------------------------------------
%% internal functions
Expand Down
5 changes: 2 additions & 3 deletions src/users/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

-spec stop() -> no_return().
-spec stop() -> ok.
stop() ->
stop(self(), false).

-spec stop(pid(), boolean()) -> ok.
stop(Pid, Force) when is_pid(Pid) ->
amoc_users_sup:stop_child(Pid, Force).

-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) ->
no_return().
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
init(Parent, Scenario, Id, State) ->
proc_lib:init_ack(Parent, {ok, self()}),
process_flag(trap_exit, true),
Expand Down
4 changes: 2 additions & 2 deletions src/users/amoc_users_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ terminate_all_children() ->
-spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid().
get_sup_for_user_id(Id) ->
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
Index = Id rem SupCount + 1,
Index = erlang:phash2(Id, SupCount) + 1,
element(Index, Supervisors).

%% assign which users each worker will be requested to add
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) ->
Index = Id rem SupCount + 1,
Index = erlang:phash2(Id, SupCount) + 1,
ChosenSup = element(Index, Supervisors),
Vs = maps:get(ChosenSup, Acc),
NewAcc = Acc#{ChosenSup := [Id | Vs]},
Expand Down
Loading

0 comments on commit 360c3a3

Please sign in to comment.