From 884de5005175a5fb96cd86c89d62772f9c3cffc8 Mon Sep 17 00:00:00 2001 From: Luca Succi Date: Tue, 23 Jan 2024 11:21:53 +0100 Subject: [PATCH] WS Refactoring - Make `connect` call a cast so that the caller is not blocked by the connection attempt - Increase flexibility in GUN handling during WS upgrade. - Add `is_connected` call to allow user to check the internal state and track WS status with `ws_up` variable. --- src/grisp_seawater_sup.erl | 2 +- ...water_client.erl => grisp_seawater_ws.erl} | 117 ++++++++++-------- 2 files changed, 66 insertions(+), 53 deletions(-) rename src/{grisp_seawater_client.erl => grisp_seawater_ws.erl} (64%) diff --git a/src/grisp_seawater_sup.erl b/src/grisp_seawater_sup.erl index 70223f6..66b713f 100644 --- a/src/grisp_seawater_sup.erl +++ b/src/grisp_seawater_sup.erl @@ -36,7 +36,7 @@ init([]) -> }, ChildSpecs = [ worker(grisp_seawater_ntp, []), - worker(grisp_seawater_client, [])], + worker(grisp_seawater_ws, [])], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/grisp_seawater_client.erl b/src/grisp_seawater_ws.erl similarity index 64% rename from src/grisp_seawater_client.erl rename to src/grisp_seawater_ws.erl index 0417baa..eaa04d2 100644 --- a/src/grisp_seawater_client.erl +++ b/src/grisp_seawater_ws.erl @@ -1,8 +1,10 @@ --module(grisp_seawater_client). +%% @doc Websocket client to connect to grisp.io +-module(grisp_seawater_ws). -export([start_link/0]). -export([connect/0]). -export([connect/2]). +-export([is_connected/0]). -export([link_device/0]). -export([link_device/1]). -export([ping/0]). @@ -15,12 +17,17 @@ -export([handle_info/2]). -record(state, { - http_conn, + gun_pid, + gun_ref, ws_stream, + ws_up = false, requests = #{} }). -define(request_timeout, 5_000). +-define(call_timeout, ?request_timeout + 1000). +-define(disconnected_state, + #state{gun_pid = undefined, gun_ref = undefine, ws_up = false}). -include_lib("kernel/include/logger.hrl"). @@ -35,7 +42,10 @@ connect() -> connect(Domain, Port). connect(Server, Port) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Server, Port}, 60_000). + gen_server:cast(?MODULE, {?FUNCTION_NAME, Server, Port}). + +is_connected() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). link_device() -> case application:get_env(grisp_seawater, device_linking_token) of @@ -44,33 +54,18 @@ link_device() -> end. link_device(Token) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Token}). + gen_server:call(?MODULE, {?FUNCTION_NAME, Token}, ?call_timeout). ping() -> - gen_server:call(?MODULE, ?FUNCTION_NAME). + gen_server:call(?MODULE, ?FUNCTION_NAME, ?call_timeout). -% gen_server callbacks ----------------------------- - -init([]) -> - {ok, #state{}}. - -handle_call({connect, Server, Port}, _, #state{http_conn = undefined} = S) -> - case grisp_seawater_http:open(Server, Port) of - {ok, Conn} -> - case upgrade(Conn) of - {ok, WsStream} -> - {reply, ok, #state{http_conn = Conn, ws_stream = WsStream}}; - Error -> - ok = grisp_seawater_http:close(Conn), - {reply, Error, S} - end; - Error -> - {reply, Error, S} - end; -handle_call({connect, _Server, _Port}, _, S) -> - {reply, already_connected, S}; -handle_call(_, _, #state{http_conn = Conn, ws_stream = Stream} = S) -when Conn == undefined orelse Stream == undefined -> +% gen_server callbacks --------------------------------------------------------- + +init([]) -> {ok, #state{}}. + +handle_call(is_connected, _, #state{ws_up = Up} = S) -> + {reply, Up, S}; +handle_call(_, _, #state{ws_up = false} = S) -> {reply, {error, disconnected}, S}; handle_call(ping, From, S) -> {ok, NewS} = make_request(From, post, ping, #{}, S), @@ -80,17 +75,40 @@ handle_call({link_device, Token}, From, S) -> #{token => Token}, S), {noreply, NewS}. -handle_cast(connect, S) -> +handle_cast({connect, Server, Port}, #state{gun_pid = undefined} = S) -> + case grisp_seawater_tls:connect(Server, Port) of + {ok, GunPid} -> + {noreply, #state{gun_pid = GunPid}}; + Error -> + ?LOG_ERROR("Failed opening TLS connection with reason ~p",[Error]), + {noreply, S} + end; +handle_cast({connect, _Server, _Port}, S) -> {noreply, S}. +handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) -> + ?LOG_INFO("HTTP connection enstablished, upgrading to WS..."), + GunRef = monitor(process, GunPid), + WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws"), + NewState = S#state{gun_pid = GunPid, gun_ref = GunRef, ws_stream = WsStream}, + {noreply, NewState}; +handle_info({gun_upgrade, Pid, Stream, [<<"websocket">>], _}, + #state{gun_pid = Pid, ws_stream = Stream} = S) -> + ?LOG_INFO("WS Upgraded"), + {noreply, S#state{ws_up = true}}; +handle_info({gun_response, Pid, Stream, _, Status, Headers}, + #state{gun_pid = Pid, ws_stream = Stream} = S) -> + ?LOG_ERROR("WS Upgrade fail with status ~p", [Status]), + grisp_io_client ! {connection_fail, upgrade}, + {noreply, shutdown_gun(S)}; handle_info({gun_ws, Conn, Stream, {text, JSON}}, - #state{http_conn = Conn, ws_stream = Stream}= S) -> + #state{gun_pid = Conn, ws_stream = Stream}= S) -> JSON_RPC = grisp_seawater_jsonrpc:decode(JSON), case handle_jsonrpc(JSON_RPC, S) of {[], NewS} -> {noreply, NewS}; {Msgs, NewS} -> Text = grisp_seawater_jsonrpc:encode(Msgs), - gun:ws_send(S#state.http_conn, S#state.ws_stream, {text, Text}), + gun:ws_send(S#state.gun_pid, S#state.ws_stream, {text, Text}), {noreply, NewS} end; handle_info({timeout, TRef, ID}, #state{requests = Reqs} = State) -> @@ -101,33 +119,20 @@ handle_info({timeout, TRef, ID}, #state{requests = Reqs} = State) -> {noreply, NewS}; _ -> error(unexpected_timeout) end; -handle_info({gun_down, C, ws, closed, [Stream]}, - #state{http_conn = C, ws_stream = Stream, requests = Requests}) -> - grisp_seawater_http:close(C), +handle_info({gun_down, Pid, ws, closed, [Stream]}, + #state{gun_pid = Pid, ws_stream = Stream, requests = Requests} = S) -> + ?LOG_WARNING("Websocket down!"), [begin erlang:cancel_timer(Tref), gen_server:reply(Caller, {error, ws_closed}) end || {Caller, Tref} <- maps:values(Requests)], - {noreply, #state{}}; + {noreply, shutdown_gun(S#state{requests = #{}})}; handle_info(M, S) -> ?LOG_WARNING("Unandled WS message: ~p", [M]), {noreply, S}. % internal functions ----------------------------- -upgrade(Conn) -> - StreamRef = gun:ws_upgrade(Conn, "/grisp-connect/ws"), - receive - {gun_upgrade, Conn, StreamRef, [<<"websocket">>], _Headers} -> - {ok, StreamRef}; - {gun_response, Conn, _, _, Status, Headers} -> - {ws_upgrade_failed, Status, Headers}; - {gun_error, Conn, StreamRef, Reason} -> - {ws_upgrade_failed, Reason} - after 1000 -> - timeout - end. - handle_jsonrpc({batch, Batch}, S) -> handle_rpc_messages(Batch, [], S); handle_jsonrpc({single, Rpc}, S) -> @@ -141,7 +146,7 @@ handle_rpc_messages([{result, _, _} = Res| Batch], Replies, S) -> handle_rpc_messages(Batch, Replies, handle_response(Res, S)); handle_rpc_messages([{error, _Code, _Msg, _Data, _ID} = E | Batch], Replies, S) -> - ?LOG_DEBUG("Received JsonRPC error: ~p",[E]), + ?LOG_INFO("Received JsonRPC error: ~p",[E]), handle_rpc_messages(Batch, Replies, handle_response(E, S)); handle_rpc_messages([{internal_error, _, _} = E | Batch], Replies, S) -> ?LOG_ERROR("JsonRPC: ~p",[E]), @@ -156,10 +161,10 @@ handle_request(_, _, ID) -> grisp_seawater_jsonrpc:format_error({internal_error, method_not_found, ID}). make_request(Caller, Method, Type, Params, #state{requests = Reqs} = State) -> - ID = list_to_binary(integer_to_list(erlang:unique_integer())), + ID = id(), Rpc = {request, Method, maps:put(type, Type, Params), ID}, Msg = grisp_seawater_jsonrpc:encode(Rpc), - gun:ws_send(State#state.http_conn, State#state.ws_stream, {text, Msg}), + gun:ws_send(State#state.gun_pid, State#state.ws_stream, {text, Msg}), TRef = erlang:start_timer(?request_timeout, self(), ID), Request = {Caller, TRef}, {ok, State#state{requests = Reqs#{ID => Request}}}. @@ -170,11 +175,11 @@ handle_response(Response, #state{requests = Requests} = S) -> {error, Code, Message, Data, ID0} -> {{error, error_atom(Code), Message, Data}, ID0} end, - case maps:get(ID, Requests) of + case maps:get(ID, Requests, undefined) of {Caller, Tref} -> erlang:cancel_timer(Tref), gen_server:reply(Caller, Reply); - _ -> + undefined -> ?LOG_ERROR("Unexpected jsonrpc ~p",[Response]) end, S#state{requests = maps:remove(ID, Requests)}. @@ -193,3 +198,11 @@ error_atom(-2) -> token_expired; error_atom(-3) -> device_already_linked; error_atom(-4) -> invalid_token; error_atom(_) -> jsonrpc_error. + +id() -> + list_to_binary(integer_to_list(erlang:unique_integer())). + +shutdown_gun(#state{gun_pid = Pid} = State) -> + gun:shutdown(Pid), + grisp_io_client ! disconnected, + State?disconnected_state.