Skip to content

Commit

Permalink
Fix WS disconnection handling (#30)
Browse files Browse the repository at this point in the history
- Demonitor the gun process before shutting it down through gun.
- Handle disconnections detected by monitor ('DOWN' monitor message)
- Close and re-open connection if the server is not sending WS pings
- A new option `ws_ping_timeout` enables to set the time to wait for the ws pings coming from the server.
- Avoid crashing on tuples with jsx
- Add dedicated test suite module for reconnection tests
  • Loading branch information
ziopio authored May 29, 2024
1 parent f9f40f7 commit e97e4dd
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 19 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth
Accepts an integer that represents time in milliseconds, default value is `5_000`.
Allows to tweak the timeout of each API request going through the websocket.

### ws_ping_timeout
Accepts an integer that represents time in milliseconds, default value is `60_000`.
Allows to tweak the timeout between expected ping frames from the server.
If the timeout is exceeded, the socket is closed and a new connection is attempted.

### logs_interval

Accepts an integer that represents time in milliseconds, default value is `2_000`.
Expand Down
1 change: 1 addition & 0 deletions src/grisp_connect.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000},
{ws_ping_timeout, 60_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
Expand Down
8 changes: 7 additions & 1 deletion src/grisp_connect_log_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jsonify(Event) ->
jsonify_msg(#{msg := {string, String}} = Event) ->
maps:put(msg, unicode:characters_to_binary(String), Event);
jsonify_msg(#{msg := {report, Report}} = Event) ->
case jsx:is_term(Report) of
case is_json_compatible(Report) of
true ->
maps:put(msg, Report, Event);
false ->
Expand Down Expand Up @@ -108,3 +108,9 @@ jsonify_meta(#{meta := Meta} = Event) ->
Optional = maps:without(maps:keys(Default), Meta),
FilterFun = fun(Key, Value) -> jsx:is_term(#{Key => Value}) end,
maps:put(meta, maps:merge(maps:filter(FilterFun, Optional), Default), Event).

is_json_compatible(Term) ->
try jsx:is_term(Term)
catch error:_ ->
false
end.
46 changes: 38 additions & 8 deletions src/grisp_connect_ws.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
gun_pid,
gun_ref,
ws_stream,
ws_up = false
ws_up = false,
ping_timer
}).

-define(disconnected_state,
#state{gun_pid = undefined, gun_ref = undefine, ws_up = false}).
#state{gun_pid = undefined, gun_ref = undefine,
ws_up = false, ping_timer = undefined}).

-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -72,31 +74,59 @@ handle_cast({send, Payload}, #state{gun_pid = Pid, ws_stream = Stream} = S) ->
handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) ->
?LOG_INFO(#{event => connection_enstablished}),
GunRef = monitor(process, GunPid),
WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws"),
WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws",[],
#{silence_pings => false}),
NewState = S#state{gun_pid = GunPid, gun_ref = GunRef, ws_stream = WsStream},
{noreply, NewState};
handle_info({gun_up, Pid, http}, #state{gun_pid = GunPid} = S) ->
?LOG_WARNING("Ignoring unexpected gun_up http message"
" from pid ~p, current pid is ~p", [Pid, GunPid]),
{noreply, S};
handle_info({gun_upgrade, Pid, Stream, [<<"websocket">>], _},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_INFO(#{event => ws_upgrade}),
{noreply, S#state{ws_up = true}};
{noreply, S#state{ws_up = true, ping_timer = start_ping_timer()}};
handle_info({gun_response, Pid, Stream, _, Status, _Headers},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_ERROR(#{event => ws_upgrade_failure, status => Status}),
{noreply, shutdown_gun(S)};
handle_info({gun_ws, Conn, Stream, {text, Text}},
#state{gun_pid = Conn, ws_stream = Stream} = S) ->
handle_info({gun_ws, Pid, Stream, ping},
#state{gun_pid = Pid, ws_stream = Stream,
ping_timer = PingTimer} = S) ->
timer:cancel(PingTimer),
{noreply, S#state{ping_timer = start_ping_timer()}};
handle_info({gun_ws, Pid, Stream, {text, Text}},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
grisp_connect_client:handle_message(Text),
{noreply, S};
handle_info({gun_down, Pid, ws, closed, [Stream]}, #state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_WARNING(#{event => ws_closed}),
grisp_connect_client:disconnected(),
{noreply, shutdown_gun(S)};
handle_info({'DOWN', _, process, Pid, Reason}, #state{gun_pid = Pid,
ping_timer = Tref} = S) ->
?LOG_WARNING(#{event => gun_crash, reason => Reason}),
timer:cancel(Tref),
grisp_connect_client:disconnected(),
{noreply, S?disconnected_state};
handle_info(ping_timeout, S) ->
?LOG_WARNING(#{event => ping_timeout}),
grisp_connect_client:disconnected(),
{noreply, shutdown_gun(S)};
handle_info(M, S) ->
?LOG_WARNING(#{event => unhandled_info, info => M}),
?LOG_WARNING(#{event => unhandled_info, info => M, state => S}),
{noreply, S}.

% internal functions -----------------------------------------------------------

shutdown_gun(#state{gun_pid = Pid} = State) ->
shutdown_gun(#state{gun_pid = Pid, gun_ref = GunRef,
ping_timer = PingTimer} = State) ->
timer:cancel(PingTimer),
demonitor(GunRef),
gun:shutdown(Pid),
State?disconnected_state.

start_ping_timer() ->
{ok, Timeout} = application:get_env(grisp_connect, ws_ping_timeout),
{ok, Tref} = timer:send_after(Timeout, ping_timeout),
Tref.
6 changes: 5 additions & 1 deletion test/grisp_connect_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
-compile([export_all, nowarn_export_all]).

-import(grisp_connect_test_client, [wait_connection/0]).
-import(grisp_connect_test_client, [wait_connection/1]).
-import(grisp_connect_test_client, [wait_disconnection/0]).
-import(grisp_connect_test_client, [wait_disconnection/1]).
-import(grisp_connect_test_client, [serial_number/0]).
-import(grisp_connect_test_client, [cert_dir/0]).

Expand All @@ -28,7 +31,8 @@ init_per_suite(Config) ->
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(CertDir, Config),
Config2 = grisp_connect_manager:start(Config),
grisp_connect_manager:kraft_start(CertDir),
[{cert_dir, CertDir} | Config2].

end_per_suite(Config) ->
Expand Down
3 changes: 2 additions & 1 deletion test/grisp_connect_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ init_per_suite(Config) ->
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(CertDir, Config),
Config2 = grisp_connect_manager:start(Config),
grisp_connect_manager:kraft_start(CertDir),
grisp_connect_manager:link_device(),
[{cert_dir, CertDir} | Config2].

Expand Down
19 changes: 13 additions & 6 deletions test/grisp_connect_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-include_lib("common_test/include/ct.hrl").

start(CertDir, Config) ->
start(Config) ->
PrivDir = ?config(priv_dir, Config),
application:set_env(mnesia, dir, PrivDir),

Expand All @@ -18,25 +18,32 @@ start(CertDir, Config) ->
application:start(mnesia),

{ok, Started2} = application:ensure_all_started(kraft),

{ok, Started3} = application:ensure_all_started(grisp_manager),
Apps = Started1 ++ Started2 ++ Started3,
[{apps, Apps} | Config].

kraft_start(CertDir) ->
kraft_start(CertDir, #{}).

kraft_start(CertDir, OverrideOpts) ->
SslOpts = [
{verify, verify_peer},
{keyfile, filename:join(CertDir, "server.key")},
{certfile, filename:join(CertDir, "server.crt")},
{cacertfile, filename:join(CertDir, "CA.crt")}
],
KraftOpts = #{
Opts = #{
port => 3030,
ssl_opts => SslOpts,
app => grisp_manager
},
KraftOpts = mapz:deep_merge(Opts, OverrideOpts),
KraftRoutes = [
{"/grisp-connect/ws",
{ws, grisp_manager_device_api}, #{}, #{type => json_rpc}}
],
kraft:start(KraftOpts, KraftRoutes),

{ok, Started3} = application:ensure_all_started(grisp_manager),
[{apps, Started1 ++ Started2 ++ Started3} | Config].
kraft:start(KraftOpts, KraftRoutes).

cleanup_apps(Apps) ->
mnesia:delete_table(eresu_user),
Expand Down
94 changes: 94 additions & 0 deletions test/grisp_connect_reconnect_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
-module(grisp_connect_reconnect_SUITE).

-behaviour(ct_suite).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").

-compile([export_all, nowarn_export_all]).

-import(grisp_connect_test_client, [wait_connection/0]).
-import(grisp_connect_test_client, [wait_connection/1]).
-import(grisp_connect_test_client, [wait_disconnection/0]).
-import(grisp_connect_test_client, [wait_disconnection/1]).
-import(grisp_connect_test_client, [serial_number/0]).
-import(grisp_connect_test_client, [cert_dir/0]).

%--- API -----------------------------------------------------------------------

all() ->
[
F
||
{F, 1} <- ?MODULE:module_info(exports),
lists:suffix("_test", atom_to_list(F))
].

init_per_suite(Config) ->
PrivDir = ?config(priv_dir, Config),
CertDir = cert_dir(),

PolicyFile = filename:join(PrivDir, "policies.term"),
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(Config),
[{cert_dir, CertDir} | Config2].

end_per_suite(Config) ->
grisp_connect_manager:cleanup_apps(?config(apps, Config)).

init_per_testcase(_, Config) ->
% the kraf instance links to this process
process_flag(trap_exit, true),
{ok, _} = application:ensure_all_started(kraft),
KraftRef = grisp_connect_manager:kraft_start(?config(cert_dir, Config)),
{ok, _} = application:ensure_all_started(grisp_emulation),
application:set_env(grisp_connect, test_cert_dir, ?config(cert_dir, Config)),
{ok, _} = application:ensure_all_started(grisp_connect),
[{kraft_instance, KraftRef} | Config].

end_per_testcase(_, Config) ->
ok = application:stop(grisp_connect),
kraft:stop(?config(kraft_instance, Config)),
ok = application:stop(kraft),
mnesia:activity(transaction, fun() ->
mnesia:delete({grisp_device, serial_number()})
end),
flush(),
Config.

%--- Tests ---------------------------------------------------------------------

reconnect_on_gun_crash_test(_) ->
?assertMatch(ok, wait_connection(100)),
{state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws),
proc_lib:stop(GunPid),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection()).

reconnect_on_disconnection_test(Config) ->
?assertMatch(ok, wait_connection()),
ok = kraft:stop(?config(kraft_instance, Config)),
?assertMatch(ok, wait_disconnection()),
KraftRef2 = grisp_connect_manager:kraft_start(cert_dir()),
?assertMatch(ok, wait_connection(100)),
[{kraft_instance, KraftRef2} | proplists:delete(kraft_instance, Config)].

reconnect_on_ping_timeout_test(_) ->
?assertMatch(ok, wait_connection()),
{state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws),
proc_lib:stop(GunPid),
% Now decrease ping timeout so that the WS closes after just 1 second
application:set_env(grisp_connect, ws_ping_timeout, 1000),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(100)),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(100)),
?assertMatch(ok, wait_disconnection()).

%--- Internal ------------------------------------------------------------------

flush() ->
receive Any -> ct:pal("Flushed: ~p", [Any]), flush()
after 0 -> ok
end.
21 changes: 19 additions & 2 deletions test/grisp_connect_test_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
-export([cert_dir/0]).
-export([serial_number/0]).
-export([wait_connection/0]).
-export([wait_connection/1]).
-export([wait_disconnection/0]).
-export([wait_disconnection/1]).

%--- API -----------------------------------------------------------------------

cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs").
cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs").

serial_number() -> <<"0000">>.

wait_connection() ->
wait_connection(20).

wait_connection(0) ->
ct:pal("grisp_connect state:~n~p~n", [sys:get_state(grisp_connect_client)]),
ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]),
{error, timeout};
wait_connection(N) ->
case grisp_connect:is_connected() of
Expand All @@ -26,3 +29,17 @@ wait_connection(N) ->
ct:sleep(100),
wait_connection(N - 1)
end.

wait_disconnection() ->
wait_disconnection(20).

wait_disconnection(0) ->
ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]),
{error, timeout};
wait_disconnection(N) ->
case grisp_connect:is_connected() of
true ->
ct:sleep(100),
wait_disconnection(N - 1);
false -> ok
end.

0 comments on commit e97e4dd

Please sign in to comment.