Skip to content

Commit

Permalink
Session Kill (#270)
Browse files Browse the repository at this point in the history
* Format

* Kill session after 35min
  • Loading branch information
macpie authored Nov 7, 2023
1 parent ae01061 commit f1fec7b
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 45 deletions.
31 changes: 12 additions & 19 deletions src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
]).

-define(REG_KEY(Gateway), {?MODULE, Gateway}).
-define(SESSION_TIMER, timer:minutes(30)).
-define(SESSION_RESET, session_reset).
-define(SESSION_TIMER, timer:minutes(35)).
-define(SESSION_KILL, session_kill).

-record(handler_state, {
started :: non_neg_integer(),
pubkey_bin :: undefined | binary(),
nonce :: undefined | binary(),
session_key :: undefined | binary(),
session_timer :: undefined | reference()
session_key :: undefined | binary()
}).

-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
init(_Rpc, StreamState) ->
HandlerState = #handler_state{started = erlang:system_time(millisecond)},
ok = schedule_session_kill(),
grpcbox_stream:stream_handler_state(StreamState, HandlerState).

-spec route(hpr_envelope_up:envelope(), grpcbox_stream:t()) ->
Expand Down Expand Up @@ -68,9 +68,9 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) ->
lager:info("give_away registration to ~p", [NewPid]),
gproc:give_away({n, l, ?REG_KEY(PubKeyBin)}, NewPid),
grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState);
handle_info(?SESSION_RESET, StreamState0) ->
{EnvDown, StreamState1} = create_session_offer(StreamState0),
grpcbox_stream:send(false, EnvDown, StreamState1);
handle_info(?SESSION_KILL, StreamState0) ->
lager:debug("received session kill for stream"),
grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState0);
handle_info(_Msg, StreamState) ->
StreamState.

Expand Down Expand Up @@ -185,12 +185,7 @@ handle_session_init(SessionInit, StreamState) ->
hpr_utils:bin_to_hex_string(Nonce),
libp2p_crypto:bin_to_b58(SessionKey)
]),
HandlerState1 = HandlerState0#handler_state{
session_key = SessionKey,
session_timer = schedule_session_reset(
HandlerState0#handler_state.session_timer
)
},
HandlerState1 = HandlerState0#handler_state{session_key = SessionKey},
{ok, grpcbox_stream:stream_handler_state(StreamState, HandlerState1)}
end
end.
Expand All @@ -209,12 +204,10 @@ create_session_offer(StreamState0) ->
]),
{EnvDown, StreamState1}.

-spec schedule_session_reset(OldTimer :: undefined | reference()) -> reference().
schedule_session_reset(OldTimer) when is_reference(OldTimer) ->
_ = erlang:cancel_timer(OldTimer),
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET);
schedule_session_reset(_OldTimer) ->
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET).
-spec schedule_session_kill() -> ok.
schedule_session_kill() ->
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_KILL),
ok.

%% ------------------------------------------------------------------
%% EUnit tests
Expand Down
61 changes: 59 additions & 2 deletions test/hpr_packet_router_service_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
]).

-export([
session_test/1
session_test/1,
session_timeout_test/1
]).

-include_lib("eunit/include/eunit.hrl").
Expand All @@ -24,7 +25,8 @@
%%--------------------------------------------------------------------
all() ->
[
session_test
session_test,
session_timeout_test
].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -137,6 +139,61 @@ session_test(_Config) ->

ok.

session_timeout_test(_Config) ->
RouteID = "8d502f32-4d58-4746-965e-8c7dfdcfc625",
Route = hpr_route:test_new(#{
id => RouteID,
net_id => 0,
oui => 4020,
server => #{
host => "127.0.0.1",
port => 8082,
protocol => {packet_router, #{}}
},
max_copies => 2
}),
EUIPairs = [
hpr_eui_pair:test_new(#{
route_id => RouteID, app_eui => 802041902051071031, dev_eui => 8942655256770396549
})
],
DevAddrRanges = [
hpr_devaddr_range:test_new(#{
route_id => RouteID, start_addr => 16#00000000, end_addr => 16#00000010
})
],

%% Normal test with session reset
{ok, GatewayPid} = hpr_test_gateway:start(#{
forward => self(),
route => Route,
eui_pairs => EUIPairs,
devaddr_ranges => DevAddrRanges,
ignore_session_offer => false
}),

{ok, _} = hpr_test_gateway:receive_session_init(GatewayPid, timer:seconds(1)),
{error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid),

SessionKey = hpr_test_gateway:session_key(GatewayPid),
PubKeyBin = hpr_test_gateway:pubkey_bin(GatewayPid),
{ok, Pid} = hpr_packet_router_service:locate(PubKeyBin),

%% Checking that session keys are the same
?assertEqual(SessionKey, session_key_from_stream(Pid)),
Pid ! session_kill,

ok = hpr_test_gateway:receive_stream_down(GatewayPid),

ok.

%% ===================================================================
%% Helpers
%% ===================================================================

session_key_from_stream(Pid) ->
State = sys:get_state(Pid),
StreamState = erlang:element(2, State),
CallbackState = erlang:element(20, StreamState),
HandlerState = erlang:element(3, CallbackState),
erlang:element(5, HandlerState).
27 changes: 13 additions & 14 deletions test/hpr_protocol_router_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,19 @@ gateway_disconnect_test(_Config) ->
_ConnPid = h2_stream_set:connection(StreamSet),

ok = gen_server:stop(GatewayPid),
ok =
receive
{hpr_test_gateway, GatewayPid,
{terminate, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}}} ->
GatewayConnPid = h2_stream_set:connection(GatewayStreamSet),
ok = test_utils:wait_until(
fun() ->
false == erlang:is_process_alive(GatewayConnPid) andalso
false == erlang:is_process_alive(GatewayStreamPid) andalso
false == erlang:is_process_alive(GatewayPid)
end
)
after timer:seconds(3) -> ct:fail(no_terminate_rcvd)
end,
case hpr_test_gateway:receive_terminate(GatewayPid) of
{error, timeout} ->
ct:fail(no_terminate_rcvd);
{ok, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}} ->
GatewayConnPid = h2_stream_set:connection(GatewayStreamSet),
ok = test_utils:wait_until(
fun() ->
false == erlang:is_process_alive(GatewayConnPid) andalso
false == erlang:is_process_alive(GatewayStreamPid) andalso
false == erlang:is_process_alive(GatewayPid)
end
)
end,

ok = test_utils:wait_until(
fun() ->
Expand Down
67 changes: 57 additions & 10 deletions test/hpr_test_gateway.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
-export([
start/1,
pubkey_bin/1,
session_key/1,
send_packet/2,
receive_send_packet/1,
receive_env_down/1,
receive_register/1
receive_register/1,
receive_session_init/2,
receive_stream_down/1,
receive_terminate/1
]).

%% ------------------------------------------------------------------
Expand All @@ -31,6 +35,8 @@
-define(RCV_TIMEOUT, 100).
-define(SEND_PACKET, send_packet).
-define(REGISTER, register).
-define(SESSION_INIT, session_init).
-define(STREAM_DOWN, stream_down).

-record(state, {
forward :: pid(),
Expand All @@ -57,6 +63,10 @@ start(Args) ->
pubkey_bin(Pid) ->
gen_server:call(Pid, pubkey_bin).

-spec session_key(Pid :: pid()) -> binary().
session_key(Pid) ->
gen_server:call(Pid, session_key).

-spec send_packet(Pid :: pid(), Args :: map()) -> ok.
send_packet(Pid, Args) ->
gen_server:cast(Pid, {?SEND_PACKET, Args}).
Expand All @@ -82,7 +92,7 @@ receive_env_down(GatewayPid) ->
end.

-spec receive_register(GatewayPid :: pid()) ->
{ok, EnvDown :: hpr_envelope_up:envelope()} | {error, timeout}.
{ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}.
receive_register(GatewayPid) ->
receive
{?MODULE, GatewayPid, {?REGISTER, EnvUp}} ->
Expand All @@ -91,6 +101,34 @@ receive_register(GatewayPid) ->
{error, timeout}
end.

-spec receive_session_init(GatewayPid :: pid(), Timeout :: non_neg_integer()) ->
{ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}.
receive_session_init(GatewayPid, Timeout) ->
receive
{?MODULE, GatewayPid, {?SESSION_INIT, EnvUp}} ->
{ok, EnvUp}
after Timeout ->
{error, timeout}
end.

-spec receive_stream_down(GatewayPid :: pid()) -> ok | {error, timeout}.
receive_stream_down(GatewayPid) ->
receive
{?MODULE, GatewayPid, ?STREAM_DOWN} ->
ok
after timer:seconds(2) ->
{error, timeout}
end.

-spec receive_terminate(GatewayPid :: pid()) -> {ok, any()} | {error, timeout}.
receive_terminate(GatewayPid) ->
receive
{?MODULE, GatewayPid, {terminate, Stream}} ->
{ok, Stream}
after timer:seconds(2) ->
{error, timeout}
end.

%% ------------------------------------------------------------------
%%% gen_server Function Definitions
%% ------------------------------------------------------------------
Expand All @@ -115,6 +153,8 @@ init(

handle_call(pubkey_bin, _From, #state{pubkey_bin = PubKeyBin} = State) ->
{reply, PubKeyBin, State};
handle_call(session_key, _From, #state{session_key = {SessionKey, _}} = State) ->
{reply, SessionKey, State};
handle_call(_Msg, _From, State) ->
lager:debug("unknown call ~p", [_Msg]),
{reply, ok, State}.
Expand Down Expand Up @@ -199,7 +239,7 @@ handle_info(?CONNECT, #state{forward = Pid, pubkey_bin = PubKeyBin, sig_fun = Si
EnvUp = hpr_envelope_up:new(SignedReg),
ok = grpcbox_client:send(Stream, EnvUp),
Pid ! {?MODULE, self(), {?REGISTER, EnvUp}},
lager:debug("connected and registered"),
lager:debug("connected and registering"),
{noreply, State#state{stream = Stream}}
end;
%% GRPC stream callbacks
Expand All @@ -209,6 +249,8 @@ handle_info(
) ->
lager:debug("got EnvDown ~p", [EnvDown]),
case hpr_envelope_down:data(EnvDown) of
undefined ->
{noreply, State};
{packet, _Packet} ->
Pid ! {?MODULE, self(), {data, EnvDown}},
{noreply, State};
Expand All @@ -224,25 +266,30 @@ handle_info(
lager:debug("session initialized"),
{noreply, State#state{session_key = {SessionKey, libp2p_crypto:mk_sig_fun(PrivKey)}}}
end;
handle_info(
{'DOWN', Ref, process, Pid, _Reason},
#state{stream = #{stream_pid := Pid, monitor_ref := Ref}} = State
) ->
lager:debug("test gateway stream went down"),
{noreply, State#state{stream = undefined}};
handle_info({headers, _StreamID, _Headers}, State) ->
lager:debug("test gateway got headers ~p for ~w", [_Headers, _StreamID]),
{noreply, State};
handle_info({trailers, _StreamID, _Trailers}, State) ->
lager:debug("test gateway got trailers ~p for ~w", [_Trailers, _StreamID]),
{noreply, State};
handle_info({eos, StreamID}, #state{forward = ForwardPid} = State) ->
lager:debug("test gateway got eos for ~w", [StreamID]),
ForwardPid ! {?MODULE, self(), ?STREAM_DOWN},
{noreply, State#state{stream = undefined}};
handle_info(_Msg, State) ->
lager:debug("unknown info ~p", [_Msg]),
{noreply, State}.

terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = undefined}) ->
ok = grpcbox_channel:stop(PubKeyBin),
lager:debug("terminate ~p", [_Reason]),
Pid ! {?MODULE, self(), {terminate, undefined}},
ok;
terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = Stream}) ->
ok = grpcbox_client:close_send(Stream),
ok = grpcbox_channel:stop(PubKeyBin),
Pid ! {?MODULE, self(), {terminate, Stream}},
lager:debug("terminate ~p", [_Reason]),
Pid ! {?MODULE, self(), {terminate, Stream}},
ok.

%% ------------------------------------------------------------------
Expand Down

0 comments on commit f1fec7b

Please sign in to comment.