diff --git a/src/grisp_io.app.src b/src/grisp_io.app.src index da34116..e3356da 100644 --- a/src/grisp_io.app.src +++ b/src/grisp_io.app.src @@ -22,11 +22,13 @@ {connect, true}, % keeps a constant connection with grisp.io {ntp, false}, % if set to true, starts the NTP client {ws_requests_timeout, 5_000}, + {ws_logs_interval, 2_000}, + {ws_logs_batch_size, 100}, {logger, [ % Enable our own default handler, % which will receive all events from boot {handler, - grisp_io, + grisp_io_log_handler, grisp_io_logger_bin, #{formatter => {grisp_io_logger_bin, #{}}}} ]} diff --git a/src/grisp_io_logtest.erl b/src/grisp_io_logtest.erl new file mode 100644 index 0000000..5c59a80 --- /dev/null +++ b/src/grisp_io_logtest.erl @@ -0,0 +1,63 @@ +% @doc temporary module to test logging +-module(grisp_io_logtest). + +-export([start_link/0]). + +-behaviour(gen_server). + +-export([init/1]). +-export([handle_call/3]). +-export([handle_cast/2]). +-export([handle_info/2]). + +-record(state, { + timer +}). + +% -include_lib("kernel/include/logger.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +% Callbacks + + +init(_) -> + {ok, Interval} = application:get_env(grisp_io, ws_logs_interval), + Timer = erlang:start_timer(Interval, self(), logs), + {ok, #state{timer = Timer}}. + +handle_call(_Request, _From, _State) -> + error(unknown_request). + +handle_cast(_Msg, _State) -> + error(unknown_cast). + +handle_info({timeout, OldRef, logs}, #state{timer = OldRef} = S)-> + {ok, Size} = application:get_env(grisp_io, ws_logs_batch_size), + {ok, Interval} = application:get_env(grisp_io, ws_logs_interval), + {Events, Dropped} = Chunk = grisp_io_logger_bin:chunk(Size), + io:format("Sending logs... events = ~p, dropped = ~p\n", + [length(Events), Dropped]), + case send_logs(Chunk) of + ok -> ok; + {ok, #{<<"seq">> := Seq, <<"dropped">> := ServerDropped}} -> + dab_logger_bin:sync(Seq, ServerDropped); + E -> + io:format("Error sending logs = ~p\n",[E]) + end, + erlang:cancel_timer(OldRef), + NewTimer = erlang:start_timer(Interval, self(), logs), + {noreply, S#state{timer = NewTimer}}. + +%--- Internals ----------------------------------------------------------------- + +send_logs({[], _Dropped}) -> + ok; +send_logs({Events, Dropped}) -> + LogUpdate = #{ + events => [[Seq, E] || {Seq, E} <- Events], + dropped => Dropped + }, + grisp_io_ws:request(post, logs, LogUpdate). diff --git a/src/grisp_io_sup.erl b/src/grisp_io_sup.erl index e99c4af..231b318 100644 --- a/src/grisp_io_sup.erl +++ b/src/grisp_io_sup.erl @@ -40,7 +40,8 @@ init([]) -> end, ChildSpecs = NTP ++ [ worker(grisp_io_ws, []), - worker(grisp_io_connection, []) + worker(grisp_io_connection, []), + worker(grisp_io_logtest, []) ], {ok, {SupFlags, ChildSpecs}}. diff --git a/src/grisp_io_ws.erl b/src/grisp_io_ws.erl index 5498f41..f6038bd 100644 --- a/src/grisp_io_ws.erl +++ b/src/grisp_io_ws.erl @@ -49,7 +49,7 @@ request(Method, Type, Params) -> {?FUNCTION_NAME, Method, Type, Params}, ?call_timeout). -% gen_server callbacks --------------------------------------------------------- +% gen_server callbacks ----------------------------- init([]) -> {ok, #state{}}. @@ -75,7 +75,9 @@ handle_cast({connect, Server, Port}, #state{gun_pid = undefined} = S) -> {noreply, S} end; handle_cast({connect, _Server, _Port}, S) -> - {noreply, S}. + {noreply, S}; +handle_cast(_Msg, _S) -> + error(unhandled_cast). handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) -> ?LOG_INFO(#{event => connection_enstablished}),