Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated logging towards GRiSP.io #18

Merged
merged 22 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,51 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth
Accepts an integer that represents time in milliseconds, default value is `5_000`.
Allows to tweak the timeout of each API request going through the websocket.

### logs_interval

Accepts an integer that represents time in milliseconds, default value is `2_000`.
Sets the intervall between each log batch dispatch to grisp.io.
ziopio marked this conversation as resolved.
Show resolved Hide resolved

### logs_batch_size

Accepts an integer that represents the maximum number of logs that can be batched together, default value is `100`.

## API Usage example

ok = grisp_io:connect().
true = grisp_io:is_connected().
{ok, <<pong>>} = grisp_io:ping().

## See all logs from boot on GRiSP.io

Once this app is started, it forwards all logs to GRiSP.io without the need of setting up anything. The only logs that we do not catch are the ones generated before `grisp_io` boots.
If you want to see ALL logs, even from applications that boot before `grisp_io`, you need to disable the default logger handler and set the grisp_io handler as the default one. This involves changing the `kernel` and `grisp_io` app configuration settings in your sys.config file.

You can copy paste these settings. Here we both swap the default logger handler with the grisp_io logger handler and also request it to print logs to stdout.
```erlang
% sys.config
[
{kernel, [
% Disable 'default' handler (which buffers all log events in logger).
{logger, [{handler, default, undefined}]}
]},
{grisp_io,[
{logger, [
% Enable the grisp_io handler as default,
% so that it will receive all events from boot
{handler,
default, % name
grisp_io_logger_bin, % module
#{
formatter => {grisp_io_logger_bin, #{
% To see logs printed on the USB serial appoint a logger
% formatter module of your choice and set the stdout
% configuration stdout => {Formatter, FormatterConfig}
stdout => {logger_formatter, #{}}
}}
}
}
]}
]}
].
```
1 change: 1 addition & 0 deletions grisp/default/common/deploy/files/erl_inetrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

% Add hosts
{host, {149,248,205,211}, ["grisp.io"]}.
{host, {37, 16, 30, 91}, ["stage.grisp.io"]}.
{host, {212,25,1,1}, ["0.europe.pool.ntp.org"]}.

% Do not monitor the hosts file
Expand Down
12 changes: 11 additions & 1 deletion src/grisp_io.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@
{port, 7777},
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000}
{ws_requests_timeout, 5_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_io_log_handler,
grisp_io_logger_bin,
#{formatter => {grisp_io_logger_bin, #{}}}}
ziopio marked this conversation as resolved.
Show resolved Hide resolved
]}
]},
{modules, []},
{links, []}
Expand Down
1 change: 1 addition & 0 deletions src/grisp_io_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
%--- Behaviour application Callback Functions ----------------------------------

start(_StartType, _StartArgs) ->
logger:add_handlers(grisp_io),
grisp_io_sup:start_link().

stop(_State) ->
Expand Down
126 changes: 126 additions & 0 deletions src/grisp_io_binlog.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
%% @doc Ring buffer for storing logger logs
%%
%% This module extends a standard queue into a ring buffer.
%% It can be truncated once a batch of logs has been sent out
%% and is not needed anymore.
%% @end
-module(grisp_io_binlog).
ziopio marked this conversation as resolved.
Show resolved Hide resolved

% API
-export([defaults/1]).
-export([new/0]).
-export([new/1]).
-export([insert/2]).
-export([items/1]).
-export([count/1]).
-export([bytes/1]).
-export([seq/1]).
-export([opts/1]).
-export([peek/2]).
-export([truncate/2]).

-define(DEFAULT_COUNT_MAX, 10_000).
% 10 MiB:
-define(DEFAULT_BYTES_MAX, 10 * 1_024 * 1_024).

-record(binlog, {
queue = queue:new(),
seq = -1,
count = 0,
count_max = ?DEFAULT_COUNT_MAX,
bytes = 0,
bytes_max = ?DEFAULT_BYTES_MAX
}).

%--- API -----------------------------------------------------------------------

defaults(count_max) -> ?DEFAULT_COUNT_MAX;
defaults(bytes_max) -> ?DEFAULT_BYTES_MAX.

new() -> new(#{}).

new(Opts) ->
Log = #binlog{},
Log#binlog{
count_max = maps:get(count_max, Opts, Log#binlog.count_max),
bytes_max = maps:get(bytes_max, Opts, Log#binlog.bytes_max)
}.

insert({Seq, Bin} = Item, #binlog{bytes_max = BytesMax, count = Count} = L) when
is_integer(Seq), Seq >= 0, byte_size(Bin) >= BytesMax
->
{add(Item, clear(L)), Count};
insert({Seq, Bin} = Item, #binlog{} = L) when
is_integer(Seq), Seq >= 0, is_binary(Bin)
->
flush(add(Item, L));
insert(Item, #binlog{}) ->
error({invalid_item, Item});
insert(_Item, Log) ->
error({invalid_log, Log}).

items(#binlog{queue = Q}) ->
queue:to_list(Q).

count(#binlog{count = Count}) -> Count.

bytes(#binlog{bytes = Bytes}) -> Bytes.

seq(#binlog{seq = Seq}) -> Seq.

opts(#binlog{count_max = CountMax, bytes_max = BytesMax}) ->
#{count_max => CountMax, bytes_max => BytesMax}.

peek(Count, #binlog{queue = Queue}) when is_integer(Count), Count > 0 ->
peek(Count, Queue, []);
peek(Count, _L) ->
error({invalid_count, Count}).

truncate(To, #binlog{seq = Seq} = L) when To >= Seq ->
clear(L);
truncate(To, #binlog{queue = Q0} = L) ->
case queue:out(Q0) of
{{value, {Seq, Bin}}, Q1} when Seq =< To ->
Copy link
Member Author

@ziopio ziopio Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug could also affect DAB
It happens when the client does not send the whole log queue and later syncs the logs.
This can result in a repeated log sent to the server or, way worse, infinite loop.
I had an infinite loop in my case because now I am forcing the chunk to be under a fixed byte size. So I had a queue long 2 and always sending only one log. This was resulting in a failed sync because of this truncate function. And this would endup looping forever.

Copy link
Member Author

@ziopio ziopio Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DAB this may cause a log line to be sent 2 times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this image as reference for the protocol algorithm https://github.com/stritzinger/dab/blob/master/docs/log_buffer.excalidraw.png

truncate(To, drop(Q1, Bin, L));
_ ->
L
end.

%--- Internal ------------------------------------------------------------------

flush(L) -> flush(L, 0).

flush(#binlog{queue = Q0, count = Count, bytes = Bytes} = L, N) when
Count > L#binlog.count_max; Bytes > L#binlog.bytes_max
->
{{value, {_Seq, Bin}}, Q1} = queue:out(Q0),
flush(drop(Q1, Bin, L), N + 1);
flush(L, N) ->
{L, N}.

drop(Q1, Bin, #binlog{count = Count, bytes = Bytes} = L) ->
L#binlog{
queue = Q1,
count = Count - 1,
bytes = Bytes - byte_size(Bin)
}.

add({Seq, Bin}, L) when Seq > L#binlog.seq ->
L#binlog{
seq = Seq,
queue = queue:in({Seq, Bin}, L#binlog.queue),
count = L#binlog.count + 1,
bytes = L#binlog.bytes + byte_size(Bin)
};
add({Seq, _Bin}, _L) ->
error({out_of_sequence, Seq}).

peek(0, _Q, Acc) ->
lists:reverse(Acc);
peek(Count, Q0, Acc) ->
case queue:out(Q0) of
{{value, Item}, Q1} -> peek(Count - 1, Q1, [Item | Acc]);
{empty, _} -> peek(0, Q0, Acc)
end.

clear(L) -> L#binlog{queue = queue:new(), count = 0, bytes = 0}.
4 changes: 3 additions & 1 deletion src/grisp_io_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,17 @@ handle_event(state_timeout, retry, connecting, Data) ->
?LOG_NOTICE(#{event => connected}),
{next_state, connected, Data};
false ->
?LOG_NOTICE(#{event => waiting_ws_connection}),
?LOG_INFO(#{event => waiting_ws_connection}),
{keep_state_and_data, [{state_timeout, ?STD_TIMEOUT, retry}]}
end;

% CONNECTED
handle_event(enter, _OldState, connected, _Data) ->
grisp_io_log_server:start(),
keep_state_and_data;
handle_event(cast, disconnected, connected, Data) ->
?LOG_WARNING(#{event => disconnected}),
grisp_io_log_server:stop(),
{next_state, waiting_ip, Data};

handle_event(E, OldS, NewS, Data) ->
Expand Down
73 changes: 73 additions & 0 deletions src/grisp_io_log_server.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
%% @doc gen_server that sends logs to GRiSP.io on a timed intervall.
%%
%% This process is controlled by grisp_io_client.
%% @end
-module(grisp_io_log_server).

% API
-export([start_link/0]).
-export([start/0]).
-export([stop/0]).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-include_lib("kernel/include/logger.hrl").

-record(state, {
active = false :: boolean(),
timer :: undefined | timer:tref()
}).

% FixMe:
% Sending over ~30_000 bytes over WS breaks rtems I/O driver.
% We want avoid to return chunks that are bigger then that.
-define(MAX_CHUNK_BYTES, 30_000).

% API

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start() -> gen_server:cast(?MODULE, ?FUNCTION_NAME).
ziopio marked this conversation as resolved.
Show resolved Hide resolved

stop() -> gen_server:cast(?MODULE, ?FUNCTION_NAME).

% gen_server CALLBACKS ---------------------------------------------------------

init([]) -> {ok, #state{}}.

handle_call(_, _, _) ->
error(?FUNCTION_NAME).

handle_cast(start, #state{active = false, timer = undefined}) ->
{ok, Interval} = application:get_env(grisp_io, logs_interval),
{ok, Tref} = timer:send_interval(Interval, send_logs),
{noreply, #state{active = true, timer = Tref}};
handle_cast(stop, State) ->
timer:cancel(State#state.timer),
{noreply, #state{}}.

handle_info(send_logs, #state{active = true} = State) ->
{ok, Size} = application:get_env(grisp_io, logs_batch_size),
case grisp_io_logger_bin:chunk(Size, ?MAX_CHUNK_BYTES) of
{[], _Dropped} -> ok;
Chunk -> send_logs_chunk(Chunk)
end,
{noreply, State};
handle_info(send_logs, #state{active = false, timer = undefined} = State) ->
?LOG_WARNING(#{event => send_logs,
msg => "send_logs received when inactive"}),
{noreply, State}.

send_logs_chunk({Events, Dropped}) ->
LogUpdate = #{
events => [[Seq, E] || {Seq, E} <- Events],
dropped => Dropped
},
case grisp_io_client:request(post, logs, LogUpdate) of
{ok, #{seq := Seq, dropped := ServerDropped}} ->
grisp_io_logger_bin:sync(Seq, ServerDropped);
E ->
?LOG_ERROR(#{event => send_logs, data => E})
end.
Loading
Loading