Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry/rework logs #163

Merged
merged 6 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Throttle process internals

Events related to internals of the throttle processes, these might expose unstable conditions you
might want to log or reconfigure:
```erlang
event_name: [amoc, throttle, process]
measurements: #{logger:level() => 1}
metadata: #{monotonic_time := integer(),
log_level := logger:level(),
msg := binary(),
rate => non_neg_integer(),
interval => non_neg_integer(),
state => map(),
_ => _}
```

## Coordinator

Indicates when a coordinating event was raised, like a process being added for coordination or a timeout being triggered
Expand All @@ -88,3 +104,26 @@ event_name: [amoc, coordinator, start | stop | add | reset | timeout]
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

## Config

### Internal events
There are related to bad configuration events, they might deserve logging
```erlang
event_name: [amoc, config, get | verify | env]
measurements: #{logger:level() => 1}
metadata: #{monotonic_time := integer(),
log_level => logger:level(),
setting => atom(),
msg => binary(), _ => _}
```

## Cluster

### Internal events
There are related to clustering events
```erlang
event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down]
measurements: #{count => non_neg_integer()},
metadata: #{nodes => nodes(), state => map()}
```
8 changes: 5 additions & 3 deletions src/amoc_config/amoc_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
%% @doc TODO
-module(amoc_config).

-include_lib("kernel/include/logger.hrl").
-include("amoc_config.hrl").

-export([get/1, get/2]).
Expand All @@ -20,13 +19,16 @@ get(Name) ->
get(Name, Default) when is_atom(Name) ->
case ets:lookup(amoc_config, Name) of
[] ->
?LOG_ERROR("no scenario setting ~p", [Name]),
amoc_telemetry:execute_log(
error, [config, get], #{setting => Name}, <<"no scenario setting">>),
throw({invalid_setting, Name});
[#module_parameter{name = Name, value = undefined}] ->
Default;
[#module_parameter{name = Name, value = Value}] ->
Value;
InvalidLookupRet ->
?LOG_ERROR("invalid lookup return value ~p ~p", [Name, InvalidLookupRet]),
amoc_telemetry:execute_log(
error, [config, get], #{setting => Name, return => InvalidLookupRet},
<<"invalid lookup return value">>),
throw({invalid_lookup_ret_value, InvalidLookupRet})
end.
13 changes: 5 additions & 8 deletions src/amoc_config/amoc_config_env.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

-export([get/1, get/2]).

-include_lib("kernel/include/logger.hrl").

-define(DEFAULT_PARSER_MODULE, amoc_config_parser).

-callback(parse_value(string()) -> {ok, amoc_config:value()} | {error, any()}).
Expand All @@ -41,12 +39,11 @@ get_os_env(Name, Default) ->
case parse_value(Value, Default) of
{ok, Term} -> Term;
{error, Error} ->
?LOG_ERROR("cannot parse environment variable, using default value.~n"
" parsing error: '~p'~n"
" variable name: '$~s'~n"
" variable value: '~s'~n"
" default value: '~p'~n",
[Error, EnvName, Value, Default]),
amoc_telemetry:execute_log(
error, [config, env],
#{error => Error, variable_name => EnvName,
variable_value => Value, default_value => Default},
<<"cannot parse environment variable, using default value">>),
Default
end.

Expand Down
14 changes: 9 additions & 5 deletions src/amoc_config/amoc_config_verification.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
%% API
-export([process_scenario_config/2]).

-include_lib("kernel/include/logger.hrl").
-include("amoc_config.hrl").

%% @doc Applies the processing as provided by the `required_variable' list to the provided scenario config
Expand Down Expand Up @@ -40,12 +39,17 @@ verify(Fun, Value) ->
{true, NewValue} -> {true, NewValue};
{false, Reason} -> {false, {verification_failed, Reason}};
Ret ->
?LOG_ERROR("invalid verification method ~p(~p), return value : ~p",
[Fun, Value, Ret]),
amoc_telemetry:execute_log(
error, [config, verify],
#{verification_method => Fun, verification_arg => Value, verification_return => Ret},
<<"invalid verification method">>),
{false, {invalid_verification_return_value, Ret}}
catch
C:E:S ->
?LOG_ERROR("invalid verification method ~p(~p), exception: ~p ~p ~p",
[Fun, Value, C, E, S]),
amoc_telemetry:execute_log(
error, [config, verify],
#{verification_method => Fun, verification_arg => Value,
kind => C, reason => E, stacktrace => S},
<<"invalid verification method">>),
{false, {exception_during_verification, {C, E, S}}}
end.
13 changes: 8 additions & 5 deletions src/amoc_distribution/amoc_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
-behaviour(gen_server).
-define(SERVER, ?MODULE).

-include_lib("kernel/include/logger.hrl").

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
Expand Down Expand Up @@ -134,7 +132,7 @@ handle_call(_Request, _From, State) ->

-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast({connect_nodes, Nodes}, State) ->
?LOG_INFO("{connect_nodes, ~p}, state: ~p", [Nodes, state_to_map(State)]),
raise_nodes_event(connect_nodes, Nodes, state_to_map(State)),
NewState = handle_connect_nodes(Nodes, State),
schedule_timer(NewState),
{noreply, NewState};
Expand All @@ -149,11 +147,11 @@ handle_info(timeout, State) ->
schedule_timer(NewState),
{noreply, NewState};
handle_info({nodedown, Node}, #state{master = Node} = State) ->
?LOG_ERROR("Master node ~p is down. Halting.", [Node]),
raise_nodes_event(master_node_down, [Node], state_to_map(State)),
erlang:halt(),
{noreply, State};
handle_info({nodedown, Node}, State) ->
?LOG_ERROR("node ~p is down.", [Node]),
raise_nodes_event(nodedown, [Node], state_to_map(State)),
{noreply, merge(connection_lost, [Node], State)};
handle_info(_Info, State) ->
{noreply, State}.
Expand Down Expand Up @@ -282,3 +280,8 @@ maybe_set_master(Node, #state{new_connection_action = Action}) ->
%% to avoid a possibility of the amoc_cluster deadlock while
%% running the Action call set_master_node/2 asynchronously
spawn(fun() -> set_master_node(Node, Action) end).

-spec raise_nodes_event(atom(), [node()], #{any() => any()}) -> ok.
raise_nodes_event(Name, Nodes, State) ->
amoc_telemetry:execute(
[cluster, Name], #{count => length(Nodes)}, #{nodes => Nodes, state => State}).
15 changes: 12 additions & 3 deletions src/amoc_telemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_telemetry).

-export([execute/3]).
-export([execute/3, execute_log/4]).

-spec execute(EventName, Measurements, Metadata) -> ok when
EventName :: telemetry:event_name(),
Measurements :: telemetry:event_measurements(),
Metadata :: telemetry:event_metadata().
execute(Name, Measurements, Metadata) ->
TimeStamp = erlang:monotonic_time(),
NameWithAmocPrefix = [amoc | Name],
PrefixedName = [amoc | Name],
MetadataWithTS = Metadata#{monotonic_time => TimeStamp},
telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS).
telemetry:execute(PrefixedName, Measurements, MetadataWithTS).

-spec execute_log(Level, EventName, Metadata, Msg) -> ok when
Level :: logger:level(),
EventName :: telemetry:event_name(),
Metadata :: telemetry:event_metadata(),
Msg :: binary().
execute_log(Level, Name, Metadata, Message) ->
MetadataWithLog = Metadata#{log_level => Level, msg => Message},
execute(Name, #{Level => 1}, MetadataWithLog).
61 changes: 37 additions & 24 deletions src/amoc_throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
handle_continue/2,
format_status/2]).

-include_lib("kernel/include/logger.hrl").

-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute

-record(state, {can_run_fn = true :: boolean(),
Expand Down Expand Up @@ -76,7 +74,7 @@ get_state(Pid) ->

-spec init(list()) -> {ok, state(), timeout()}.
init([Name, Interval, Rate]) ->
InitialState = initial_state(Interval, Rate),
InitialState = initial_state(Name, Interval, Rate),
StateWithTimer = maybe_start_timer(InitialState),
{ok, StateWithTimer#state{name = Name}, timeout(InitialState)}.

Expand All @@ -86,7 +84,7 @@ handle_info({'DOWN', _, process, _, _}, State) ->
handle_info(delay_between_executions, State) ->
{noreply, State#state{can_run_fn = true}, {continue, maybe_run_fn}};
handle_info(timeout, State) ->
log_state("is inactive", State),
internal_event(<<"is inactive">>, State),
{noreply, State, {continue, maybe_run_fn}}.

-spec handle_cast(term(), state()) ->
Expand All @@ -100,9 +98,9 @@ handle_cast(resume_process, State) ->
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
amoc_throttle_controller:telemetry_event(Name, request),
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
handle_cast({update, Interval, Rate}, State) ->
NewState = merge_state(initial_state(Interval, Rate), State),
log_state("state update", NewState),
handle_cast({update, Interval, Rate}, #state{name = Name} = State) ->
NewState = merge_state(initial_state(Name, Interval, Rate), State),
internal_event(<<"state update">>, NewState),
{noreply, NewState, {continue, maybe_run_fn}}.

-spec handle_call(term(), term(), state()) ->
Expand All @@ -128,23 +126,29 @@ format_status(_Opt, [_PDict, State]) ->
%% internal functions
%%------------------------------------------------------------------------------

initial_state(Interval, 0) ->
?LOG_ERROR("invalid rate, must be higher than zero"),
initial_state(Interval, 1);
initial_state(Interval, Rate) when Rate > 0 ->
case Rate < 5 of
true -> ?LOG_ERROR("too low rate, please reduce NoOfProcesses");
false -> ok
end,
Delay = case {Interval, Interval div Rate, Interval rem Rate} of
initial_state(Name, Interval, Rate) when Rate >= 0 ->
NewRate = case {Rate =:= 0, Rate < 5} of
{true, _} ->
Msg = <<"invalid rate, must be higher than zero">>,
internal_error(Msg, Name, Rate, Interval),
1;
{_, true} ->
Msg = <<"too low rate, please reduce NoOfProcesses">>,
internal_error(Msg, Name, Rate, Interval),
Rate;
{_, false} ->
Rate
end,
Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of
{0, _, _} -> 0; %% limit only No of simultaneous executions
{_, I, _} when I < 10 ->
?LOG_ERROR("too high rate, please increase NoOfProcesses"),
Message = <<"too high rate, please increase NoOfProcesses">>,
internal_error(Message, Name, Rate, Interval),
10;
{_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions;
{_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1
end,
#state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}.
#state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}.

merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN},
#state{n = OldN, max_n = OldMaxN} = OldState) ->
Expand Down Expand Up @@ -201,25 +205,34 @@ async_runner(Fun) ->
timeout(State) ->
State#state.interval + ?DEFAULT_MSG_TIMEOUT.

inc_n(#state{n = N, max_n = MaxN} = State) ->
inc_n(#state{name = Name, n = N, max_n = MaxN} = State) ->
NewN = N + 1,
case MaxN < NewN of
true ->
PrintableState = printable_state(State),
?LOG_ERROR("~nthrottle process ~p: invalid N (~p)~n", [self(), PrintableState]),
Msg = <<"throttle proccess has invalid N">>,
amoc_telemetry:execute_log(
error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg),
State#state{n = MaxN};
false ->
State#state{n = NewN}
end.

log_state(Msg, State) ->
-spec internal_event(binary(), state()) -> any().
internal_event(Msg, #state{name = Name} = State) ->
PrintableState = printable_state(State),
?LOG_DEBUG("~nthrottle process ~p: ~s (~p)~n", [self(), Msg, PrintableState]).
amoc_telemetry:execute_log(
debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg).

-spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any().
internal_error(Msg, Name, Rate, Interval) ->
amoc_telemetry:execute_log(
error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg).

printable_state(#state{} = State) ->
Fields = record_info(fields, state),
[_ | Values] = tuple_to_list(State#state{schedule = [], schedule_reversed = []}),
StateMap = maps:from_list(lists:zip(Fields, Values)),
StateMap#{
schedule:=length(State#state.schedule),
schedule_reversed:=length(State#state.schedule_reversed)}.
schedule := length(State#state.schedule),
schedule_reversed := length(State#state.schedule_reversed)}.