Skip to content

Commit

Permalink
Merge branch 'maint'
Browse files Browse the repository at this point in the history
* maint:
  Improve range calculation to avoid signed integer overflow
  Don't clear send operation caller from recv code
  Test TCP send block/unblock with interfering recv
  • Loading branch information
RaimoNiskanen committed Oct 7, 2024
2 parents 9c77440 + 0e8510c commit 4a5ac69
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 8 deletions.
7 changes: 2 additions & 5 deletions erts/emulator/drivers/common/inet_drv.c
Original file line number Diff line number Diff line change
Expand Up @@ -3123,7 +3123,6 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i == 15);
/* desc->caller = am_undefined; XXX */
return erl_drv_send_term(desc->dport, caller, spec, i);
}
else {
Expand All @@ -3137,7 +3136,6 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 20);
/* desc->caller = am_undefined; XXX */
code = erl_drv_send_term(desc->dport, caller, spec, i);
return code;
}
Expand Down Expand Up @@ -3897,8 +3895,6 @@ inet_async_binary_data
i = LOAD_TUPLE(spec, i, 4);

ASSERT(i <= PACKET_ERL_DRV_TERM_DATA_LEN);
desc->caller = am_undefined;
end_caller_ref(&desc->caller_ref);
return erl_drv_send_term(desc->dport, caller, spec, i);
}

Expand Down Expand Up @@ -12638,7 +12634,8 @@ static int tcp_deliver(tcp_descriptor* desc, int len)
inet_input_count(INETP(desc), len);

/* deliver binary? */
if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
if (len >= /* >= 75% of buffer */
(desc->i_buf->orig_size - (desc->i_buf->orig_size >> 2))) {
code = tcp_reply_binary_data(desc, desc->i_buf,
(desc->i_ptr_start -
desc->i_buf->orig_bytes),
Expand Down
121 changes: 118 additions & 3 deletions lib/kernel/test/gen_tcp_misc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
%% %CopyrightEnd%
%%

%% Run the entire test suite:
%% Run the entire test suite:
%% ts:run(kernel, gen_tcp_misc_SUITE, [batch]).
%%
%% Run a specific group:
Expand Down Expand Up @@ -107,7 +107,8 @@
otp_17492/1,
otp_18357/1,
otp_18883/1,
otp_18707/1
otp_18707/1,
send_block_unblock/1
]).

%% Internal exports.
Expand Down Expand Up @@ -235,7 +236,8 @@ all_std_cases() ->
bidirectional_traffic,
{group, socket_monitor},
otp_17492,
otp_18707
otp_18707,
send_block_unblock
].

ticket_cases() ->
Expand Down Expand Up @@ -9382,6 +9384,119 @@ do_otp_18707(_Config) ->

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

send_block_unblock(Config) when is_list(Config) ->
Size = 2048,
Timeout = 500,
Payload = payload(Size),
BufSize = Size + (Size bsr 3), % + 12.5% - need to fill buffer > 75%
io:format(
"[~w] Payload size ~w, send timeout ~w~n", [self(), Size, Timeout]),
Opts =
[binary,
{active, false},
{exit_on_close, false},
{show_econnreset, true},
{high_watermark, Size * 6},
{low_watermark, Size * 2},
{sndbuf, BufSize},
{recbuf, BufSize},
{buffer, BufSize},
{packet, 4}],
{ok, L} = gen_tcp:listen(0, Opts),
{ok, P} = inet:port(L),
{ok, Sa} = gen_tcp:connect({127,0,0,1}, P, Opts),
{ok, Sb} = gen_tcp:accept(L),
ok = gen_tcp:close(L),
{ok, Pa} = inet:port(Sa),
io:format("[~w] Listen port ~w, connect port ~w~n", [self(), P, Pa]),
io:format("[~w] Accept socket ~w, connect socket ~w~n", [self(), Sb, Sa]),
inet:i(),
%% ok = inet:setopts(Sa, [{debug, true}]),
fill_send_buffers(Payload, Sa, Sb, Timeout, 0).

fill_send_buffers(Payload, Sa, Sb, Timeout, N) ->
N_1 = N + 1,
case send_maybe_block(Sa, <<N:16, Payload/binary>>, Timeout) of
ok ->
io:format("[~w] Sent ~w~n", [self(), N]),
fill_send_buffers(Payload, Sa, Sb, Timeout, N_1);
Ref when is_reference(Ref) ->
io:format("[~w] Send ~w pending ~w~n", [self(), N, Ref]),
case send_maybe_block(Sa, <<N_1:16, Payload/binary>>, Timeout) of
Ref2 when is_reference(Ref2) ->
io:format("[~w] Send ~w pending ~w~n", [self(), N_1, Ref2]),
_ =
spawn_link(
fun () ->
receive after Timeout -> ok end,
loop_data(Sb)
end),
recv_data(Payload, Sa, Ref, Ref2, N_1, 0)
end
end.

recv_data(_Payload, Sa, Ref, Ref2, M, N) when M < N ->
receive Msg ->
{send_result, Ref, ok} = Msg,
io:format("[~w] Send finished ~w~n", [self(), Ref]),
receive Msg2 ->
{send_result, Ref2, ok} = Msg2,
io:format("[~w] Send finished ~w~n", [self(), Ref2]),
gen_tcp:shutdown(Sa, write),
{error, closed} = gen_tcp:recv(Sa, 0),
io:format("[~w] Recv closed~n", [self()]),
ok = gen_tcp:close(Sa),
receive Unexpected ->
ct:fail({unexpected, Unexpected})
after 0 ->
inet:i()
end
end
end;
recv_data(Payload, Sa, Ref, Ref2, M, N) ->
Data = <<N:16, Payload/binary>>,
{ok, Data} = gen_tcp:recv(Sa, 0),
io:format("[~w] Recv ~w~n", [self(), N]),
recv_data(Payload, Sa, Ref, Ref2, M, N + 1).

send_maybe_block(S, Data, Timeout) ->
Parent = self(),
Ref = make_ref(),
_ =
spawn_link(
fun () ->
Parent ! {send_result, Ref, gen_tcp:send(S, Data)}
end),
receive
{send_result, Ref, Result} ->
false = is_reference(Result), %% Just to make it impossible
Result
after Timeout ->
Ref
end.

loop_data(S) ->
case gen_tcp:recv(S, 0) of
{ok, Data} ->
io:format("[~w] Echo recv~n", [self()]),
ok = gen_tcp:send(S, Data),
io:format("[~w] Echo sent~n", [self()]),
loop_data(S);
{error, closed} ->
io:format("[~w] Echo recv closed~n", [self()]),
ok = gen_tcp:close(S)
end.

payload(N) when is_integer(N), 0 =< N ->
payload(N, <<>>).
%%
payload(0, Bin) -> Bin;
payload(N, Bin) ->
C = rand:uniform($z - $0 + 1) + $0,
payload(N - 1, <<Bin/binary, C>>).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

is_windows() ->
case os:type() of
{win32, nt} ->
Expand Down

0 comments on commit 4a5ac69

Please sign in to comment.