Skip to content

Commit

Permalink
Optionally queue outgoing data
Browse files Browse the repository at this point in the history
Support queueing outgoing stanzas and stream management elements for up
to a configurable number of milliseconds (with a configurable queue size
limit).  This allows for batching up multiple XML elements into a single
TCP packet in order to reduce the TCP/IP overhead.
  • Loading branch information
weiss committed Jul 6, 2022
1 parent a90c53e commit 530cfe3
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 47 deletions.
7 changes: 7 additions & 0 deletions src/xmpp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
compress/1,
compress/2,
reset_stream/1,
send_elements/2,
send_element/2,
send_header/2,
send_trailer/1,
Expand Down Expand Up @@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream,
SocketData#socket_state{socket = Socket1}
end.

-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}.
send_elements(#socket_state{xml_stream = undefined}, _Els) ->
erlang:error(not_implemented);
send_elements(SocketData, Els) ->
send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])).

-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_element(#socket_state{xml_stream = undefined} = SocketData, El) ->
send_xml(SocketData, {xmlstreamelement, El});
Expand Down
152 changes: 127 additions & 25 deletions src/xmpp_stream_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
%% API
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
send_ws_ping/1]).
get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2,
format_error/1, send_ws_ping/1]).

%% gen_server callbacks
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
Expand Down Expand Up @@ -58,6 +58,9 @@
stream_encrypted => boolean(),
stream_version => {non_neg_integer(), non_neg_integer()},
stream_authenticated => boolean(),
stream_queue := [xmpp_element() | xmlel()],
stream_queue_max := non_neg_integer(),
stream_queue_timeout => {non_neg_integer(), integer()},
ip => {inet:ip_address(), inet:port_number()},
codec_options => [xmpp:decode_option()],
xmlns => binary(),
Expand Down Expand Up @@ -226,7 +229,21 @@ close(Pid, Reason) ->
establish(State) ->
process_stream_established(State).

-spec set_timeout(state(), non_neg_integer() | infinity) -> state().
-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state().
configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay)
when Owner == self() ->
flush_queue(State), % Support reconfiguration.
if MaxSize == 0; MaxDelay == 0 ->
State#{stream_queue_max => 0};
true ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue_max => MaxSize,
stream_queue_timeout => {MaxDelay, CurrentTime}}
end;
configure_queue(_, _, _) ->
erlang:error(badarg).

-spec set_timeout(state(), timeout()) -> state().
set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
case Timeout of
infinity -> State#{stream_timeout => infinity};
Expand Down Expand Up @@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) ->
socket_mod => SockMod,
socket_opts => Opts,
stream_timeout => {Timeout, Time},
stream_state => accepting},
stream_state => accepting,
stream_queue => [],
stream_queue_max => 0},
{ok, State, Timeout}.

-spec handle_cast(term(), state()) -> next_state().
Expand Down Expand Up @@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
noreply(try callback(handle_cdata, Data, State)
catch _:{?MODULE, undef} -> State
end);
handle_info(timeout, #{stream_queue := [_|_]} = State) ->
noreply(flush_queue(State));
handle_info(timeout, #{lang := Lang} = State) ->
Disconnected = is_disconnected(State),
noreply(try callback(handle_timeout, State)
Expand Down Expand Up @@ -522,15 +543,32 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
end.

-spec noreply(state()) -> noreply();
({stop, state()}) -> {stop, normal, state()}.
({stop, state()}) -> {stop, normal, state()};
({stop, normal, state()}) -> {stop, normal, state()}.
noreply({stop, State}) ->
{stop, normal, State};
noreply(#{stream_timeout := infinity} = State) ->
{noreply, State, infinity};
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
noreply({stop, normal, State}) ->
{stop, normal, State};
noreply(State) ->
{noreply, State, get_timeout(State)}.

-spec get_timeout(state()) -> timeout().
get_timeout(State) ->
min(get_stream_timeout(State), get_queue_timeout(State)).

-spec get_stream_timeout(state()) -> timeout().
get_stream_timeout(#{stream_timeout := infinity}) ->
infinity;
get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
Timeout = max(0, MSecs - CurrentTime + StartTime),
{noreply, State, Timeout}.
max(0, MSecs - CurrentTime + StartTime).

-spec get_queue_timeout(state()) -> timeout().
get_queue_timeout(#{stream_queue := []}) ->
infinity;
get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
max(0, MSecs - CurrentTime + StartTime).

-spec is_disconnected(state()) -> boolean().
is_disconnected(#{stream_state := StreamState}) ->
Expand Down Expand Up @@ -1193,21 +1231,29 @@ send_header(State, _) ->

-spec send_pkt(state(), xmpp_element() | xmlel()) -> state().
send_pkt(State, Pkt) ->
Result = socket_send(State, Pkt),
State1 = try callback(handle_send, Pkt, Result, State)
catch _:{?MODULE, undef} -> State
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State1);
ok ->
State1;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have opportunity to process incoming queued messages before
% terminating session.
self() ! {'$gen_event', closed},
State1
case check_queue(State, Pkt) of
flush ->
flush_queue(State, Pkt);
queue ->
queue_pkt(State, Pkt);
noqueue ->
State1 = flush_queue(State),
Result = socket_send(State1, Pkt),
State2 = try callback(handle_send, Pkt, Result, State1)
catch _:{?MODULE, undef} -> State1
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State2);
ok ->
State2;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have the opportunity to process incoming queued
% messages before terminating the session.
self() ! {'$gen_event', closed},
State2
end
end.

-spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state().
Expand Down Expand Up @@ -1258,6 +1304,62 @@ close_socket(#{socket := Socket} = State) ->
close_socket(State) ->
State.

-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue.
check_queue(#{stream_queue_max := 0}, _Pkt) ->
noqueue;
check_queue(#{stream_state := StreamState}, _Pkt)
when StreamState /= established->
noqueue;
check_queue(_State, Pkt)
when not ?is_stanza(Pkt),
not is_record(Pkt, sm_a),
not is_record(Pkt, sm_r) ->
noqueue;
check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt)
when length(Q) >= MaxQueue ->
flush;
check_queue(_State, _Pkt) ->
queue.

-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state().
queue_pkt(#{stream_queue := [],
stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue := [Pkt],
stream_queue_timeout := {MSecs, CurrentTime}};
queue_pkt(#{stream_queue := Q} = State, Pkt) ->
State#{stream_queue := [Pkt|Q]}.

-spec flush_queue(state(), xmpp_element() | xmlel()) -> state().
flush_queue(State, Pkt) ->
flush_queue(queue_pkt(State, Pkt)).

-spec flush_queue(state()) -> state().
flush_queue(#{stream_queue := []} = State) ->
State;
flush_queue(#{stream_queue := Q0,
socket := Sock,
xmlns := NS} = State0) ->
Q = lists:reverse(Q0),
Els = [xmpp:encode(Pkt, NS) || Pkt <- Q],
Result = xmpp_socket:send_elements(Sock, Els),
State1 = State0#{stream_queue := []},
State2 = try lists:foldl(
fun(Pkt, State) ->
callback(handle_send, Pkt, Result, State)
end, State1, Q)
catch _:{?MODULE, undef} -> State1
end,
case Result of
ok ->
State2;
{error, _Why} ->
self() ! {'$gen_event', closed},
State2
end;
flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released.
State#{stream_queue := []}.

-spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang;
select_lang(_, Lang) -> Lang.
Expand Down
Loading

0 comments on commit 530cfe3

Please sign in to comment.