Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated logging towards GRiSP.io #18

Merged
merged 22 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth
Accepts an integer that represents time in milliseconds, default value is `5_000`.
Allows to tweak the timeout of each API request going through the websocket.

### ws_logs_interval

Accepts an integer that represents time in milliseconds, default value is `2_000`.
Sets the intervall between each log batch dispatch to grisp.io.
ziopio marked this conversation as resolved.
Show resolved Hide resolved

### ws_logs_batch_size

Accepts an integer that represents the maximum number of logs that can be batched togheder, default value is `100`.
ziopio marked this conversation as resolved.
Show resolved Hide resolved

## API Usage example

ok = grisp_io:connect().
Expand Down
1 change: 1 addition & 0 deletions grisp/default/common/deploy/files/erl_inetrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

% Add hosts
{host, {149,248,205,211}, ["grisp.io"]}.
{host, {37, 16, 30, 91}, ["stage.grisp.io"]}.
{host, {212,25,1,1}, ["0.europe.pool.ntp.org"]}.

% Do not monitor the hosts file
Expand Down
12 changes: 11 additions & 1 deletion src/grisp_io.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@
{port, 7777},
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000}
{ws_requests_timeout, 5_000},
{ws_logs_interval, 2_000},
{ws_logs_batch_size, 100},
{logger, [
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_io_log_handler,
grisp_io_logger_bin,
#{formatter => {grisp_io_logger_bin, #{}}}}
ziopio marked this conversation as resolved.
Show resolved Hide resolved
]}
]},
{modules, []},
{links, []}
Expand Down
1 change: 1 addition & 0 deletions src/grisp_io_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
%--- Behaviour application Callback Functions ----------------------------------

start(_StartType, _StartArgs) ->
logger:add_handlers(grisp_io),
grisp_io_sup:start_link().

stop(_State) ->
Expand Down
120 changes: 120 additions & 0 deletions src/grisp_io_binlog.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
-module(grisp_io_binlog).
ziopio marked this conversation as resolved.
Show resolved Hide resolved

% API
-export([defaults/1]).
-export([new/0]).
-export([new/1]).
-export([insert/2]).
-export([items/1]).
-export([count/1]).
-export([bytes/1]).
-export([seq/1]).
-export([opts/1]).
-export([peek/2]).
-export([truncate/2]).

-define(DEFAULT_COUNT_MAX, 10_000).
% 10 MiB:
-define(DEFAULT_BYTES_MAX, 10 * 1_024 * 1_024).

-record(binlog, {
queue = queue:new(),
seq = -1,
count = 0,
count_max = ?DEFAULT_COUNT_MAX,
bytes = 0,
bytes_max = ?DEFAULT_BYTES_MAX
}).

%--- API -----------------------------------------------------------------------

defaults(count_max) -> ?DEFAULT_COUNT_MAX;
defaults(bytes_max) -> ?DEFAULT_BYTES_MAX.

new() -> new(#{}).

new(Opts) ->
Log = #binlog{},
Log#binlog{
count_max = maps:get(count_max, Opts, Log#binlog.count_max),
bytes_max = maps:get(bytes_max, Opts, Log#binlog.bytes_max)
}.

insert({Seq, Bin} = Item, #binlog{bytes_max = BytesMax, count = Count} = L) when
is_integer(Seq), Seq >= 0, byte_size(Bin) >= BytesMax
->
{add(Item, clear(L)), Count};
insert({Seq, Bin} = Item, #binlog{} = L) when
is_integer(Seq), Seq >= 0, is_binary(Bin)
->
flush(add(Item, L));
insert(Item, #binlog{}) ->
error({invalid_item, Item});
insert(_Item, Log) ->
error({invalid_log, Log}).

items(#binlog{queue = Q}) ->
queue:to_list(Q).

count(#binlog{count = Count}) -> Count.

bytes(#binlog{bytes = Bytes}) -> Bytes.

seq(#binlog{seq = Seq}) -> Seq.

opts(#binlog{count_max = CountMax, bytes_max = BytesMax}) ->
#{count_max => CountMax, bytes_max => BytesMax}.

peek(Count, #binlog{queue = Queue}) when is_integer(Count), Count > 0 ->
peek(Count, Queue, []);
peek(Count, _L) ->
error({invalid_count, Count}).

truncate(To, #binlog{seq = Seq} = L) when To >= Seq ->
clear(L);
truncate(To, #binlog{queue = Q0} = L) ->
case queue:out(Q0) of
{{value, {Seq, Bin}}, Q1} when Seq < To ->
truncate(To, drop(Q1, Bin, L));
_ ->
L
end.

%--- Internal ------------------------------------------------------------------

flush(L) -> flush(L, 0).

flush(#binlog{queue = Q0, count = Count, bytes = Bytes} = L, N) when
Count > L#binlog.count_max; Bytes > L#binlog.bytes_max
->
{{value, {_Seq, Bin}}, Q1} = queue:out(Q0),
flush(drop(Q1, Bin, L), N + 1);
flush(L, N) ->
{L, N}.

drop(Q1, Bin, #binlog{count = Count, bytes = Bytes} = L) ->
L#binlog{
queue = Q1,
count = Count - 1,
bytes = Bytes - byte_size(Bin)
}.

add({Seq, Bin}, L) when Seq > L#binlog.seq ->
L#binlog{
seq = Seq,
queue = queue:in({Seq, Bin}, L#binlog.queue),
count = L#binlog.count + 1,
bytes = L#binlog.bytes + byte_size(Bin)
};
add({Seq, _Bin}, _L) ->
error({out_of_sequence, Seq}).

peek(0, _Q, Acc) ->
lists:reverse(Acc);
peek(Count, Q0, Acc) ->
case queue:out(Q0) of
{{value, Item}, Q1} -> peek(Count - 1, Q1, [Item | Acc]);
{empty, _} -> peek(0, Q0, Acc)
end.

clear(L) -> L#binlog{queue = queue:new(), count = 0, bytes = 0}.
37 changes: 36 additions & 1 deletion src/grisp_io_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
% Internal API
-export([disconnected/0]).
-export([handle_message/1]).
-export([trigger_logs/0]).

-behaviour(gen_statem).
-export([init/1, terminate/3, code_change/4, callback_mode/0, handle_event/4]).
Expand Down Expand Up @@ -46,6 +47,9 @@ disconnected() ->
handle_message(Payload) ->
gen_statem:cast(?MODULE, {?FUNCTION_NAME, Payload}).

trigger_logs() ->
gen_statem:cast(?MODULE, ?FUNCTION_NAME).

% gen_statem CALLBACKS ---------------------------------------------------------

init([]) ->
Expand Down Expand Up @@ -150,10 +154,19 @@ handle_event(state_timeout, retry, connecting, Data) ->

% CONNECTED
handle_event(enter, _OldState, connected, _Data) ->
trigger_logs(),
keep_state_and_data;
handle_event(cast, trigger_logs, connected, _Data) ->
{ok, Interval} = application:get_env(grisp_io, ws_logs_interval),
TriggerLogs = {state_timeout, Interval, send_logs},
{keep_state_and_data, [TriggerLogs]};
handle_event(cast, disconnected, connected, Data) ->
?LOG_WARNING(#{event => disconnected}),
{next_state, waiting_ip, Data};
StopLogs = {state_timeout, infinity, send_logs},
{next_state, waiting_ip, Data, [StopLogs]};
handle_event(state_timeout, send_logs, connected, _) ->
async_send_logs(),
keep_state_and_data;

handle_event(E, OldS, NewS, Data) ->
?LOG_ERROR(#{event => unhandled_gen_statem_event,
Expand All @@ -179,6 +192,28 @@ request_timeout() ->
{ok, V} = application:get_env(grisp_io, ws_requests_timeout),
V.

async_send_logs() ->
spawn(fun () ->
ziopio marked this conversation as resolved.
Show resolved Hide resolved
{ok, Size} = application:get_env(grisp_io, ws_logs_batch_size),
case grisp_io_logger_bin:chunk(Size) of
{[], _Dropped} -> ok;
Chunk -> send_logs_chunk(Chunk)
end,
trigger_logs()
ziopio marked this conversation as resolved.
Show resolved Hide resolved
end).

send_logs_chunk({Events, Dropped}) ->
LogUpdate = #{
events => [[Seq, E] || {Seq, E} <- Events],
dropped => Dropped
},
case request(post, logs, LogUpdate) of
{ok, #{seq := Seq, dropped := ServerDropped}} ->
grisp_io_logger_bin:sync(Seq, ServerDropped);
E ->
io:format("Error sending logs = ~p\n",[E])
ziopio marked this conversation as resolved.
Show resolved Hide resolved
end.

% IP check functions

check_inet_ipv4() ->
Expand Down
Loading
Loading