Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated logging towards GRiSP.io #18

Merged
merged 22 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth
Accepts an integer that represents time in milliseconds, default value is `5_000`.
Allows to tweak the timeout of each API request going through the websocket.

### logs_interval

Accepts an integer that represents time in milliseconds, default value is `2_000`.
Sets the intervall between each log batch dispatch to grisp.io.
ziopio marked this conversation as resolved.
Show resolved Hide resolved

### logs_batch_size

Accepts an integer that represents the maximum number of logs that can be batched together, default value is `100`.

## API Usage example

ok = grisp_io:connect().
Expand Down
1 change: 1 addition & 0 deletions grisp/default/common/deploy/files/erl_inetrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

% Add hosts
{host, {149,248,205,211}, ["grisp.io"]}.
{host, {37, 16, 30, 91}, ["stage.grisp.io"]}.
{host, {212,25,1,1}, ["0.europe.pool.ntp.org"]}.

% Do not monitor the hosts file
Expand Down
12 changes: 11 additions & 1 deletion src/grisp_io.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@
{port, 7777},
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000}
{ws_requests_timeout, 5_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_io_log_handler,
grisp_io_logger_bin,
#{formatter => {grisp_io_logger_bin, #{}}}}
ziopio marked this conversation as resolved.
Show resolved Hide resolved
]}
]},
{modules, []},
{links, []}
Expand Down
1 change: 1 addition & 0 deletions src/grisp_io_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
%--- Behaviour application Callback Functions ----------------------------------

start(_StartType, _StartArgs) ->
logger:add_handlers(grisp_io),
grisp_io_sup:start_link().

stop(_State) ->
Expand Down
126 changes: 126 additions & 0 deletions src/grisp_io_binlog.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
%% @doc Ring buffer for storing logger logs
%%
%% This module extends a standard queue into a ring buffer.
%% It can be truncated once a batch of logs has been sent out
%% and is not needed anymore.
%% @end
-module(grisp_io_binlog).
ziopio marked this conversation as resolved.
Show resolved Hide resolved

% API
-export([defaults/1]).
-export([new/0]).
-export([new/1]).
-export([insert/2]).
-export([items/1]).
-export([count/1]).
-export([bytes/1]).
-export([seq/1]).
-export([opts/1]).
-export([peek/2]).
-export([truncate/2]).

-define(DEFAULT_COUNT_MAX, 10_000).
% 10 MiB:
-define(DEFAULT_BYTES_MAX, 10 * 1_024 * 1_024).

-record(binlog, {
queue = queue:new(),
seq = -1,
count = 0,
count_max = ?DEFAULT_COUNT_MAX,
bytes = 0,
bytes_max = ?DEFAULT_BYTES_MAX
}).

%--- API -----------------------------------------------------------------------

defaults(count_max) -> ?DEFAULT_COUNT_MAX;
defaults(bytes_max) -> ?DEFAULT_BYTES_MAX.

new() -> new(#{}).

new(Opts) ->
Log = #binlog{},
Log#binlog{
count_max = maps:get(count_max, Opts, Log#binlog.count_max),
bytes_max = maps:get(bytes_max, Opts, Log#binlog.bytes_max)
}.

insert({Seq, Bin} = Item, #binlog{bytes_max = BytesMax, count = Count} = L) when
is_integer(Seq), Seq >= 0, byte_size(Bin) >= BytesMax
->
{add(Item, clear(L)), Count};
insert({Seq, Bin} = Item, #binlog{} = L) when
is_integer(Seq), Seq >= 0, is_binary(Bin)
->
flush(add(Item, L));
insert(Item, #binlog{}) ->
error({invalid_item, Item});
insert(_Item, Log) ->
error({invalid_log, Log}).

items(#binlog{queue = Q}) ->
queue:to_list(Q).

count(#binlog{count = Count}) -> Count.

bytes(#binlog{bytes = Bytes}) -> Bytes.

seq(#binlog{seq = Seq}) -> Seq.

opts(#binlog{count_max = CountMax, bytes_max = BytesMax}) ->
#{count_max => CountMax, bytes_max => BytesMax}.

peek(Count, #binlog{queue = Queue}) when is_integer(Count), Count > 0 ->
peek(Count, Queue, []);
peek(Count, _L) ->
error({invalid_count, Count}).

truncate(To, #binlog{seq = Seq} = L) when To >= Seq ->
clear(L);
truncate(To, #binlog{queue = Q0} = L) ->
case queue:out(Q0) of
{{value, {Seq, Bin}}, Q1} when Seq < To ->
truncate(To, drop(Q1, Bin, L));
_ ->
L
end.

%--- Internal ------------------------------------------------------------------

flush(L) -> flush(L, 0).

flush(#binlog{queue = Q0, count = Count, bytes = Bytes} = L, N) when
Count > L#binlog.count_max; Bytes > L#binlog.bytes_max
->
{{value, {_Seq, Bin}}, Q1} = queue:out(Q0),
flush(drop(Q1, Bin, L), N + 1);
flush(L, N) ->
{L, N}.

drop(Q1, Bin, #binlog{count = Count, bytes = Bytes} = L) ->
L#binlog{
queue = Q1,
count = Count - 1,
bytes = Bytes - byte_size(Bin)
}.

add({Seq, Bin}, L) when Seq > L#binlog.seq ->
L#binlog{
seq = Seq,
queue = queue:in({Seq, Bin}, L#binlog.queue),
count = L#binlog.count + 1,
bytes = L#binlog.bytes + byte_size(Bin)
};
add({Seq, _Bin}, _L) ->
error({out_of_sequence, Seq}).

peek(0, _Q, Acc) ->
lists:reverse(Acc);
peek(Count, Q0, Acc) ->
case queue:out(Q0) of
{{value, Item}, Q1} -> peek(Count - 1, Q1, [Item | Acc]);
{empty, _} -> peek(0, Q0, Acc)
end.

clear(L) -> L#binlog{queue = queue:new(), count = 0, bytes = 0}.
4 changes: 3 additions & 1 deletion src/grisp_io_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,17 @@ handle_event(state_timeout, retry, connecting, Data) ->
?LOG_NOTICE(#{event => connected}),
{next_state, connected, Data};
false ->
?LOG_NOTICE(#{event => waiting_ws_connection}),
?LOG_INFO(#{event => waiting_ws_connection}),
{keep_state_and_data, [{state_timeout, ?STD_TIMEOUT, retry}]}
end;

% CONNECTED
handle_event(enter, _OldState, connected, _Data) ->
grisp_io_log_server:start(),
keep_state_and_data;
handle_event(cast, disconnected, connected, Data) ->
?LOG_WARNING(#{event => disconnected}),
grisp_io_log_server:stop(),
{next_state, waiting_ip, Data};

handle_event(E, OldS, NewS, Data) ->
Expand Down
68 changes: 68 additions & 0 deletions src/grisp_io_log_server.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
%% @doc gen_server that sends logs to GRiSP.io on a timed intervall.
%%
%% This process is controlled by grisp_io_client.
%% @end
-module(grisp_io_log_server).

% API
-export([start_link/0]).
-export([start/0]).
-export([stop/0]).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-include_lib("kernel/include/logger.hrl").

-record(state, {
active = false :: boolean(),
timer :: undefined | timer:tref()
}).

% API

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start() -> gen_server:cast(?MODULE, ?FUNCTION_NAME).
ziopio marked this conversation as resolved.
Show resolved Hide resolved

stop() -> gen_server:cast(?MODULE, ?FUNCTION_NAME).

% gen_server CALLBACKS ---------------------------------------------------------

init([]) -> {ok, #state{}}.

handle_call(_, _, _) ->
error(?FUNCTION_NAME).

handle_cast(start, #state{active = false, timer = undefined}) ->
{ok, Interval} = application:get_env(grisp_io, logs_interval),
{ok, Tref} = timer:send_interval(Interval, send_logs),
{noreply, #state{active = true, timer = Tref}};
handle_cast(stop, State) ->
timer:cancel(State#state.timer),
{noreply, #state{}}.

handle_info(send_logs, #state{active = true} = State) ->
{ok, Size} = application:get_env(grisp_io, logs_batch_size),
case grisp_io_logger_bin:chunk(Size) of
{[], _Dropped} -> ok;
Chunk -> send_logs_chunk(Chunk)
end,
{noreply, State};
handle_info(send_logs, #state{active = false, timer = undefined} = State) ->
?LOG_WARNING(#{event => send_logs,
msg => "send_logs received when inactive"}),
{noreply, State}.

send_logs_chunk({Events, Dropped}) ->
LogUpdate = #{
events => [[Seq, E] || {Seq, E} <- Events],
dropped => Dropped
},
case grisp_io_client:request(post, logs, LogUpdate) of
{ok, #{seq := Seq, dropped := ServerDropped}} ->
grisp_io_logger_bin:sync(Seq, ServerDropped);
E ->
?LOG_ERROR(#{event => send_logs, data => E})
end.
Loading
Loading