Skip to content

Commit

Permalink
WS Refactoring
Browse files Browse the repository at this point in the history
- Make `connect` call a cast so that the caller is not blocked by the connection attempt
- Increase flexibility in GUN handling during WS upgrade.
- Add `is_connected` call to allow user to check the internal state and track WS status with `ws_up` variable.
  • Loading branch information
ziopio committed Jan 25, 2024
1 parent eed42ed commit 884de50
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/grisp_seawater_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ init([]) ->
},
ChildSpecs = [
worker(grisp_seawater_ntp, []),
worker(grisp_seawater_client, [])],
worker(grisp_seawater_ws, [])],
{ok, {SupFlags, ChildSpecs}}.

%% internal functions
Expand Down
117 changes: 65 additions & 52 deletions src/grisp_seawater_client.erl → src/grisp_seawater_ws.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
-module(grisp_seawater_client).
%% @doc Websocket client to connect to grisp.io
-module(grisp_seawater_ws).

-export([start_link/0]).
-export([connect/0]).
-export([connect/2]).
-export([is_connected/0]).
-export([link_device/0]).
-export([link_device/1]).
-export([ping/0]).
Expand All @@ -15,12 +17,17 @@
-export([handle_info/2]).

-record(state, {
http_conn,
gun_pid,
gun_ref,
ws_stream,
ws_up = false,
requests = #{}
}).

-define(request_timeout, 5_000).
-define(call_timeout, ?request_timeout + 1000).
-define(disconnected_state,
#state{gun_pid = undefined, gun_ref = undefine, ws_up = false}).

-include_lib("kernel/include/logger.hrl").

Expand All @@ -35,7 +42,10 @@ connect() ->
connect(Domain, Port).

connect(Server, Port) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Server, Port}, 60_000).
gen_server:cast(?MODULE, {?FUNCTION_NAME, Server, Port}).

is_connected() ->
gen_server:call(?MODULE, ?FUNCTION_NAME).

link_device() ->
case application:get_env(grisp_seawater, device_linking_token) of
Expand All @@ -44,33 +54,18 @@ link_device() ->
end.

link_device(Token) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Token}).
gen_server:call(?MODULE, {?FUNCTION_NAME, Token}, ?call_timeout).

ping() ->
gen_server:call(?MODULE, ?FUNCTION_NAME).
gen_server:call(?MODULE, ?FUNCTION_NAME, ?call_timeout).

% gen_server callbacks -----------------------------

init([]) ->
{ok, #state{}}.

handle_call({connect, Server, Port}, _, #state{http_conn = undefined} = S) ->
case grisp_seawater_http:open(Server, Port) of
{ok, Conn} ->
case upgrade(Conn) of
{ok, WsStream} ->
{reply, ok, #state{http_conn = Conn, ws_stream = WsStream}};
Error ->
ok = grisp_seawater_http:close(Conn),
{reply, Error, S}
end;
Error ->
{reply, Error, S}
end;
handle_call({connect, _Server, _Port}, _, S) ->
{reply, already_connected, S};
handle_call(_, _, #state{http_conn = Conn, ws_stream = Stream} = S)
when Conn == undefined orelse Stream == undefined ->
% gen_server callbacks ---------------------------------------------------------

init([]) -> {ok, #state{}}.

handle_call(is_connected, _, #state{ws_up = Up} = S) ->
{reply, Up, S};
handle_call(_, _, #state{ws_up = false} = S) ->
{reply, {error, disconnected}, S};
handle_call(ping, From, S) ->
{ok, NewS} = make_request(From, post, ping, #{}, S),
Expand All @@ -80,17 +75,40 @@ handle_call({link_device, Token}, From, S) ->
#{token => Token}, S),
{noreply, NewS}.

handle_cast(connect, S) ->
handle_cast({connect, Server, Port}, #state{gun_pid = undefined} = S) ->
case grisp_seawater_tls:connect(Server, Port) of
{ok, GunPid} ->
{noreply, #state{gun_pid = GunPid}};
Error ->
?LOG_ERROR("Failed opening TLS connection with reason ~p",[Error]),
{noreply, S}
end;
handle_cast({connect, _Server, _Port}, S) ->
{noreply, S}.

handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) ->
?LOG_INFO("HTTP connection enstablished, upgrading to WS..."),
GunRef = monitor(process, GunPid),
WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws"),
NewState = S#state{gun_pid = GunPid, gun_ref = GunRef, ws_stream = WsStream},
{noreply, NewState};
handle_info({gun_upgrade, Pid, Stream, [<<"websocket">>], _},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_INFO("WS Upgraded"),
{noreply, S#state{ws_up = true}};
handle_info({gun_response, Pid, Stream, _, Status, Headers},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_ERROR("WS Upgrade fail with status ~p", [Status]),
grisp_io_client ! {connection_fail, upgrade},
{noreply, shutdown_gun(S)};
handle_info({gun_ws, Conn, Stream, {text, JSON}},
#state{http_conn = Conn, ws_stream = Stream}= S) ->
#state{gun_pid = Conn, ws_stream = Stream}= S) ->
JSON_RPC = grisp_seawater_jsonrpc:decode(JSON),
case handle_jsonrpc(JSON_RPC, S) of
{[], NewS} -> {noreply, NewS};
{Msgs, NewS} ->
Text = grisp_seawater_jsonrpc:encode(Msgs),
gun:ws_send(S#state.http_conn, S#state.ws_stream, {text, Text}),
gun:ws_send(S#state.gun_pid, S#state.ws_stream, {text, Text}),
{noreply, NewS}
end;
handle_info({timeout, TRef, ID}, #state{requests = Reqs} = State) ->
Expand All @@ -101,33 +119,20 @@ handle_info({timeout, TRef, ID}, #state{requests = Reqs} = State) ->
{noreply, NewS};
_ -> error(unexpected_timeout)
end;
handle_info({gun_down, C, ws, closed, [Stream]},
#state{http_conn = C, ws_stream = Stream, requests = Requests}) ->
grisp_seawater_http:close(C),
handle_info({gun_down, Pid, ws, closed, [Stream]},
#state{gun_pid = Pid, ws_stream = Stream, requests = Requests} = S) ->
?LOG_WARNING("Websocket down!"),
[begin
erlang:cancel_timer(Tref),
gen_server:reply(Caller, {error, ws_closed})
end || {Caller, Tref} <- maps:values(Requests)],
{noreply, #state{}};
{noreply, shutdown_gun(S#state{requests = #{}})};
handle_info(M, S) ->
?LOG_WARNING("Unandled WS message: ~p", [M]),
{noreply, S}.

% internal functions -----------------------------

upgrade(Conn) ->
StreamRef = gun:ws_upgrade(Conn, "/grisp-connect/ws"),
receive
{gun_upgrade, Conn, StreamRef, [<<"websocket">>], _Headers} ->
{ok, StreamRef};
{gun_response, Conn, _, _, Status, Headers} ->
{ws_upgrade_failed, Status, Headers};
{gun_error, Conn, StreamRef, Reason} ->
{ws_upgrade_failed, Reason}
after 1000 ->
timeout
end.

handle_jsonrpc({batch, Batch}, S) ->
handle_rpc_messages(Batch, [], S);
handle_jsonrpc({single, Rpc}, S) ->
Expand All @@ -141,7 +146,7 @@ handle_rpc_messages([{result, _, _} = Res| Batch], Replies, S) ->
handle_rpc_messages(Batch, Replies, handle_response(Res, S));
handle_rpc_messages([{error, _Code, _Msg, _Data, _ID} = E | Batch],
Replies, S) ->
?LOG_DEBUG("Received JsonRPC error: ~p",[E]),
?LOG_INFO("Received JsonRPC error: ~p",[E]),
handle_rpc_messages(Batch, Replies, handle_response(E, S));
handle_rpc_messages([{internal_error, _, _} = E | Batch], Replies, S) ->
?LOG_ERROR("JsonRPC: ~p",[E]),
Expand All @@ -156,10 +161,10 @@ handle_request(_, _, ID) ->
grisp_seawater_jsonrpc:format_error({internal_error, method_not_found, ID}).

make_request(Caller, Method, Type, Params, #state{requests = Reqs} = State) ->
ID = list_to_binary(integer_to_list(erlang:unique_integer())),
ID = id(),
Rpc = {request, Method, maps:put(type, Type, Params), ID},
Msg = grisp_seawater_jsonrpc:encode(Rpc),
gun:ws_send(State#state.http_conn, State#state.ws_stream, {text, Msg}),
gun:ws_send(State#state.gun_pid, State#state.ws_stream, {text, Msg}),
TRef = erlang:start_timer(?request_timeout, self(), ID),
Request = {Caller, TRef},
{ok, State#state{requests = Reqs#{ID => Request}}}.
Expand All @@ -170,11 +175,11 @@ handle_response(Response, #state{requests = Requests} = S) ->
{error, Code, Message, Data, ID0} ->
{{error, error_atom(Code), Message, Data}, ID0}
end,
case maps:get(ID, Requests) of
case maps:get(ID, Requests, undefined) of
{Caller, Tref} ->
erlang:cancel_timer(Tref),
gen_server:reply(Caller, Reply);
_ ->
undefined ->
?LOG_ERROR("Unexpected jsonrpc ~p",[Response])
end,
S#state{requests = maps:remove(ID, Requests)}.
Expand All @@ -193,3 +198,11 @@ error_atom(-2) -> token_expired;
error_atom(-3) -> device_already_linked;
error_atom(-4) -> invalid_token;
error_atom(_) -> jsonrpc_error.

id() ->
list_to_binary(integer_to_list(erlang:unique_integer())).

shutdown_gun(#state{gun_pid = Pid} = State) ->
gun:shutdown(Pid),
grisp_io_client ! disconnected,
State?disconnected_state.

0 comments on commit 884de50

Please sign in to comment.