From e97e4dd2ee2a60a1c961a3755abbe0ad3cfd4e90 Mon Sep 17 00:00:00 2001 From: Luca Succi Date: Wed, 29 May 2024 17:29:55 +0200 Subject: [PATCH] Fix WS disconnection handling (#30) - Demonitor the gun process before shutting it down through gun. - Handle disconnections detected by monitor ('DOWN' monitor message) - Close and re-open connection if the server is not sending WS pings - A new option `ws_ping_timeout` enables to set the time to wait for the ws pings coming from the server. - Avoid crashing on tuples with jsx - Add dedicated test suite module for reconnection tests --- README.md | 5 ++ src/grisp_connect.app.src | 1 + src/grisp_connect_log_server.erl | 8 ++- src/grisp_connect_ws.erl | 46 ++++++++++--- test/grisp_connect_api_SUITE.erl | 6 +- test/grisp_connect_log_SUITE.erl | 3 +- test/grisp_connect_manager.erl | 19 ++++-- test/grisp_connect_reconnect_SUITE.erl | 94 ++++++++++++++++++++++++++ test/grisp_connect_test_client.erl | 21 +++++- 9 files changed, 184 insertions(+), 19 deletions(-) create mode 100644 test/grisp_connect_reconnect_SUITE.erl diff --git a/README.md b/README.md index be1a71a..c74f821 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,11 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth Accepts an integer that represents time in milliseconds, default value is `5_000`. Allows to tweak the timeout of each API request going through the websocket. +### ws_ping_timeout +Accepts an integer that represents time in milliseconds, default value is `60_000`. +Allows to tweak the timeout between expected ping frames from the server. +If the timeout is exceeded, the socket is closed and a new connection is attempted. + ### logs_interval Accepts an integer that represents time in milliseconds, default value is `2_000`. diff --git a/src/grisp_connect.app.src b/src/grisp_connect.app.src index 6198b16..7d3f0aa 100644 --- a/src/grisp_connect.app.src +++ b/src/grisp_connect.app.src @@ -22,6 +22,7 @@ {connect, true}, % keeps a constant connection with grisp.io {ntp, false}, % if set to true, starts the NTP client {ws_requests_timeout, 5_000}, + {ws_ping_timeout, 60_000}, {logs_interval, 2_000}, {logs_batch_size, 100}, {logger, [ diff --git a/src/grisp_connect_log_server.erl b/src/grisp_connect_log_server.erl index 0bf87be..616d3d5 100644 --- a/src/grisp_connect_log_server.erl +++ b/src/grisp_connect_log_server.erl @@ -78,7 +78,7 @@ jsonify(Event) -> jsonify_msg(#{msg := {string, String}} = Event) -> maps:put(msg, unicode:characters_to_binary(String), Event); jsonify_msg(#{msg := {report, Report}} = Event) -> - case jsx:is_term(Report) of + case is_json_compatible(Report) of true -> maps:put(msg, Report, Event); false -> @@ -108,3 +108,9 @@ jsonify_meta(#{meta := Meta} = Event) -> Optional = maps:without(maps:keys(Default), Meta), FilterFun = fun(Key, Value) -> jsx:is_term(#{Key => Value}) end, maps:put(meta, maps:merge(maps:filter(FilterFun, Optional), Default), Event). + +is_json_compatible(Term) -> + try jsx:is_term(Term) + catch error:_ -> + false + end. diff --git a/src/grisp_connect_ws.erl b/src/grisp_connect_ws.erl index 792f240..0243052 100644 --- a/src/grisp_connect_ws.erl +++ b/src/grisp_connect_ws.erl @@ -18,11 +18,13 @@ gun_pid, gun_ref, ws_stream, - ws_up = false + ws_up = false, + ping_timer }). -define(disconnected_state, - #state{gun_pid = undefined, gun_ref = undefine, ws_up = false}). + #state{gun_pid = undefined, gun_ref = undefine, + ws_up = false, ping_timer = undefined}). -include_lib("kernel/include/logger.hrl"). @@ -72,31 +74,59 @@ handle_cast({send, Payload}, #state{gun_pid = Pid, ws_stream = Stream} = S) -> handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) -> ?LOG_INFO(#{event => connection_enstablished}), GunRef = monitor(process, GunPid), - WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws"), + WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws",[], + #{silence_pings => false}), NewState = S#state{gun_pid = GunPid, gun_ref = GunRef, ws_stream = WsStream}, {noreply, NewState}; +handle_info({gun_up, Pid, http}, #state{gun_pid = GunPid} = S) -> + ?LOG_WARNING("Ignoring unexpected gun_up http message" + " from pid ~p, current pid is ~p", [Pid, GunPid]), + {noreply, S}; handle_info({gun_upgrade, Pid, Stream, [<<"websocket">>], _}, #state{gun_pid = Pid, ws_stream = Stream} = S) -> ?LOG_INFO(#{event => ws_upgrade}), - {noreply, S#state{ws_up = true}}; + {noreply, S#state{ws_up = true, ping_timer = start_ping_timer()}}; handle_info({gun_response, Pid, Stream, _, Status, _Headers}, #state{gun_pid = Pid, ws_stream = Stream} = S) -> ?LOG_ERROR(#{event => ws_upgrade_failure, status => Status}), {noreply, shutdown_gun(S)}; -handle_info({gun_ws, Conn, Stream, {text, Text}}, - #state{gun_pid = Conn, ws_stream = Stream} = S) -> +handle_info({gun_ws, Pid, Stream, ping}, + #state{gun_pid = Pid, ws_stream = Stream, + ping_timer = PingTimer} = S) -> + timer:cancel(PingTimer), + {noreply, S#state{ping_timer = start_ping_timer()}}; +handle_info({gun_ws, Pid, Stream, {text, Text}}, + #state{gun_pid = Pid, ws_stream = Stream} = S) -> grisp_connect_client:handle_message(Text), {noreply, S}; handle_info({gun_down, Pid, ws, closed, [Stream]}, #state{gun_pid = Pid, ws_stream = Stream} = S) -> ?LOG_WARNING(#{event => ws_closed}), grisp_connect_client:disconnected(), {noreply, shutdown_gun(S)}; +handle_info({'DOWN', _, process, Pid, Reason}, #state{gun_pid = Pid, + ping_timer = Tref} = S) -> + ?LOG_WARNING(#{event => gun_crash, reason => Reason}), + timer:cancel(Tref), + grisp_connect_client:disconnected(), + {noreply, S?disconnected_state}; +handle_info(ping_timeout, S) -> + ?LOG_WARNING(#{event => ping_timeout}), + grisp_connect_client:disconnected(), + {noreply, shutdown_gun(S)}; handle_info(M, S) -> - ?LOG_WARNING(#{event => unhandled_info, info => M}), + ?LOG_WARNING(#{event => unhandled_info, info => M, state => S}), {noreply, S}. % internal functions ----------------------------------------------------------- -shutdown_gun(#state{gun_pid = Pid} = State) -> +shutdown_gun(#state{gun_pid = Pid, gun_ref = GunRef, + ping_timer = PingTimer} = State) -> + timer:cancel(PingTimer), + demonitor(GunRef), gun:shutdown(Pid), State?disconnected_state. + +start_ping_timer() -> + {ok, Timeout} = application:get_env(grisp_connect, ws_ping_timeout), + {ok, Tref} = timer:send_after(Timeout, ping_timeout), + Tref. diff --git a/test/grisp_connect_api_SUITE.erl b/test/grisp_connect_api_SUITE.erl index 66550f8..dc5f4be 100644 --- a/test/grisp_connect_api_SUITE.erl +++ b/test/grisp_connect_api_SUITE.erl @@ -7,6 +7,9 @@ -compile([export_all, nowarn_export_all]). -import(grisp_connect_test_client, [wait_connection/0]). +-import(grisp_connect_test_client, [wait_connection/1]). +-import(grisp_connect_test_client, [wait_disconnection/0]). +-import(grisp_connect_test_client, [wait_disconnection/1]). -import(grisp_connect_test_client, [serial_number/0]). -import(grisp_connect_test_client, [cert_dir/0]). @@ -28,7 +31,8 @@ init_per_suite(Config) -> ?assertEqual(ok, file:write_file(PolicyFile, <<>>)), application:set_env(seabac, policy_file, PolicyFile), - Config2 = grisp_connect_manager:start(CertDir, Config), + Config2 = grisp_connect_manager:start(Config), + grisp_connect_manager:kraft_start(CertDir), [{cert_dir, CertDir} | Config2]. end_per_suite(Config) -> diff --git a/test/grisp_connect_log_SUITE.erl b/test/grisp_connect_log_SUITE.erl index 9ff22b7..31d4eb8 100644 --- a/test/grisp_connect_log_SUITE.erl +++ b/test/grisp_connect_log_SUITE.erl @@ -28,7 +28,8 @@ init_per_suite(Config) -> ?assertEqual(ok, file:write_file(PolicyFile, <<>>)), application:set_env(seabac, policy_file, PolicyFile), - Config2 = grisp_connect_manager:start(CertDir, Config), + Config2 = grisp_connect_manager:start(Config), + grisp_connect_manager:kraft_start(CertDir), grisp_connect_manager:link_device(), [{cert_dir, CertDir} | Config2]. diff --git a/test/grisp_connect_manager.erl b/test/grisp_connect_manager.erl index 151fd08..debd1f7 100644 --- a/test/grisp_connect_manager.erl +++ b/test/grisp_connect_manager.erl @@ -5,7 +5,7 @@ -include_lib("common_test/include/ct.hrl"). -start(CertDir, Config) -> +start(Config) -> PrivDir = ?config(priv_dir, Config), application:set_env(mnesia, dir, PrivDir), @@ -18,25 +18,32 @@ start(CertDir, Config) -> application:start(mnesia), {ok, Started2} = application:ensure_all_started(kraft), + + {ok, Started3} = application:ensure_all_started(grisp_manager), + Apps = Started1 ++ Started2 ++ Started3, + [{apps, Apps} | Config]. + +kraft_start(CertDir) -> + kraft_start(CertDir, #{}). + +kraft_start(CertDir, OverrideOpts) -> SslOpts = [ {verify, verify_peer}, {keyfile, filename:join(CertDir, "server.key")}, {certfile, filename:join(CertDir, "server.crt")}, {cacertfile, filename:join(CertDir, "CA.crt")} ], - KraftOpts = #{ + Opts = #{ port => 3030, ssl_opts => SslOpts, app => grisp_manager }, + KraftOpts = mapz:deep_merge(Opts, OverrideOpts), KraftRoutes = [ {"/grisp-connect/ws", {ws, grisp_manager_device_api}, #{}, #{type => json_rpc}} ], - kraft:start(KraftOpts, KraftRoutes), - - {ok, Started3} = application:ensure_all_started(grisp_manager), - [{apps, Started1 ++ Started2 ++ Started3} | Config]. + kraft:start(KraftOpts, KraftRoutes). cleanup_apps(Apps) -> mnesia:delete_table(eresu_user), diff --git a/test/grisp_connect_reconnect_SUITE.erl b/test/grisp_connect_reconnect_SUITE.erl new file mode 100644 index 0000000..429284c --- /dev/null +++ b/test/grisp_connect_reconnect_SUITE.erl @@ -0,0 +1,94 @@ +-module(grisp_connect_reconnect_SUITE). + +-behaviour(ct_suite). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-compile([export_all, nowarn_export_all]). + +-import(grisp_connect_test_client, [wait_connection/0]). +-import(grisp_connect_test_client, [wait_connection/1]). +-import(grisp_connect_test_client, [wait_disconnection/0]). +-import(grisp_connect_test_client, [wait_disconnection/1]). +-import(grisp_connect_test_client, [serial_number/0]). +-import(grisp_connect_test_client, [cert_dir/0]). + +%--- API ----------------------------------------------------------------------- + +all() -> + [ + F + || + {F, 1} <- ?MODULE:module_info(exports), + lists:suffix("_test", atom_to_list(F)) + ]. + +init_per_suite(Config) -> + PrivDir = ?config(priv_dir, Config), + CertDir = cert_dir(), + + PolicyFile = filename:join(PrivDir, "policies.term"), + ?assertEqual(ok, file:write_file(PolicyFile, <<>>)), + application:set_env(seabac, policy_file, PolicyFile), + + Config2 = grisp_connect_manager:start(Config), + [{cert_dir, CertDir} | Config2]. + +end_per_suite(Config) -> + grisp_connect_manager:cleanup_apps(?config(apps, Config)). + +init_per_testcase(_, Config) -> + % the kraf instance links to this process + process_flag(trap_exit, true), + {ok, _} = application:ensure_all_started(kraft), + KraftRef = grisp_connect_manager:kraft_start(?config(cert_dir, Config)), + {ok, _} = application:ensure_all_started(grisp_emulation), + application:set_env(grisp_connect, test_cert_dir, ?config(cert_dir, Config)), + {ok, _} = application:ensure_all_started(grisp_connect), + [{kraft_instance, KraftRef} | Config]. + +end_per_testcase(_, Config) -> + ok = application:stop(grisp_connect), + kraft:stop(?config(kraft_instance, Config)), + ok = application:stop(kraft), + mnesia:activity(transaction, fun() -> + mnesia:delete({grisp_device, serial_number()}) + end), + flush(), + Config. + +%--- Tests --------------------------------------------------------------------- + +reconnect_on_gun_crash_test(_) -> + ?assertMatch(ok, wait_connection(100)), + {state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws), + proc_lib:stop(GunPid), + ?assertMatch(ok, wait_disconnection()), + ?assertMatch(ok, wait_connection()). + +reconnect_on_disconnection_test(Config) -> + ?assertMatch(ok, wait_connection()), + ok = kraft:stop(?config(kraft_instance, Config)), + ?assertMatch(ok, wait_disconnection()), + KraftRef2 = grisp_connect_manager:kraft_start(cert_dir()), + ?assertMatch(ok, wait_connection(100)), + [{kraft_instance, KraftRef2} | proplists:delete(kraft_instance, Config)]. + +reconnect_on_ping_timeout_test(_) -> + ?assertMatch(ok, wait_connection()), + {state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws), + proc_lib:stop(GunPid), + % Now decrease ping timeout so that the WS closes after just 1 second + application:set_env(grisp_connect, ws_ping_timeout, 1000), + ?assertMatch(ok, wait_disconnection()), + ?assertMatch(ok, wait_connection(100)), + ?assertMatch(ok, wait_disconnection()), + ?assertMatch(ok, wait_connection(100)), + ?assertMatch(ok, wait_disconnection()). + +%--- Internal ------------------------------------------------------------------ + +flush() -> + receive Any -> ct:pal("Flushed: ~p", [Any]), flush() + after 0 -> ok + end. diff --git a/test/grisp_connect_test_client.erl b/test/grisp_connect_test_client.erl index 5e3c7ca..587e26d 100644 --- a/test/grisp_connect_test_client.erl +++ b/test/grisp_connect_test_client.erl @@ -6,10 +6,13 @@ -export([cert_dir/0]). -export([serial_number/0]). -export([wait_connection/0]). +-export([wait_connection/1]). +-export([wait_disconnection/0]). +-export([wait_disconnection/1]). %--- API ----------------------------------------------------------------------- -cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs"). +cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs"). serial_number() -> <<"0000">>. @@ -17,7 +20,7 @@ wait_connection() -> wait_connection(20). wait_connection(0) -> - ct:pal("grisp_connect state:~n~p~n", [sys:get_state(grisp_connect_client)]), + ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]), {error, timeout}; wait_connection(N) -> case grisp_connect:is_connected() of @@ -26,3 +29,17 @@ wait_connection(N) -> ct:sleep(100), wait_connection(N - 1) end. + +wait_disconnection() -> + wait_disconnection(20). + +wait_disconnection(0) -> + ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]), + {error, timeout}; +wait_disconnection(N) -> + case grisp_connect:is_connected() of + true -> + ct:sleep(100), + wait_disconnection(N - 1); + false -> ok + end.