Skip to content

Commit

Permalink
Fix intermittent chunked response hang
Browse files Browse the repository at this point in the history
When streaming a chunked response, it is possible to cause a TCP receive
hang under particular circumstances. If at one point the parser buffer
doesn't have the whole chunk, at a later point the buffer ends up empty
`<<>>`, and subsequently `hackney_response:stream_body/1` is called,
`hackney_response:recv/2` will hang if the expected remaining size
exceeds the remainder of the response. That expected size is actually
stale, from the earlier point when the parser did not have the whole
chunk. This issue slipped in with benoitc#710.

This was identified when using https://github.com/benoitc/couchbeam and
sending several chunked requests. If the last non-terminating chunk
completed the response JSON object, `hackney_response:skip_body/1` is
called to discard the remaining body, but is told to receive a number of
bytes equal to the expected remaining size which will frequently exceed
the small terminating chunk and trailers. As a result, the recv
operation hangs waiting for bytes that will never arrive.

Now, the transfer state (`BufSize`/`ExpectedSize`) are reset after each
successful chunk. The speed benefit of benoitc#710 is retained
(tested with the same approach as in that PR).

- correct some related typespecs
  • Loading branch information
danielfinke committed Sep 6, 2024
1 parent eca5fbb commit 1a13c8f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
38 changes: 20 additions & 18 deletions src/hackney_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -400,33 +400,27 @@ parse_body(St) ->


-spec transfer_decode(binary(), #hparser{})
-> {ok, binary(), #hparser{}} | {done, binary()} | {error, atom()}.
-> body_result() | {more, binary()} | {error, atom()}.
transfer_decode(Data, St=#hparser{
body_state={stream, TransferDecode,
TransferState, ContentDecode},
TransferState, ContentDecode}=BodyState,
buffer=Buf}) ->
case TransferDecode(Data, TransferState) of
{ok, Data2, TransferState2} ->
content_decode(ContentDecode, Data2,
St#hparser{body_state= {stream,
TransferDecode,
TransferState2,
ContentDecode}});
St#hparser{body_state=set_transfer_state(TransferState2, BodyState)});
{ok, Data2, Rest, TransferState2} ->
content_decode(ContentDecode, Data2,
St#hparser{buffer=Rest,
body_state={stream,
TransferDecode,
TransferState2,
ContentDecode}});
body_state=set_transfer_state(TransferState2, BodyState)});
{chunk_done, Rest} ->
parse_trailers(St#hparser{buffer=Rest, state=on_trailers, body_state=done});
{chunk_ok, Chunk, Rest} ->
{ok, Chunk, St#hparser{buffer=Rest}};
{ok, Chunk, St#hparser{buffer=Rest, body_state=reset_chunked_transfer_state(BodyState)}};
more ->
{more, St#hparser{buffer=Data}, Buf};
{more, St#hparser{buffer=Data, body_state=reset_chunked_transfer_state(BodyState)}, Buf};
{more, TransferState2} ->
{more, St#hparser{buffer=Data, body_state={stream, TransferDecode, TransferState2, ContentDecode}}, Buf};
{more, St#hparser{buffer=Data, body_state=set_transfer_state(TransferState2, BodyState)}, Buf};
{done, Rest} ->
{done, Rest};
{done, Data2, _Rest} ->
Expand All @@ -450,14 +444,22 @@ content_decode(ContentDecode, Data, St) ->
{error, Reason} -> {error, Reason}
end.

set_transfer_state(TransferState, {stream, TransferDecode, _, ContentDecode}) ->
{stream, TransferDecode, TransferState, ContentDecode}.

%% @doc Reset the transfer state (BufSize, ExpectedSize) of a chunked body
%% state. This must be done after each successful chunk. Otherwise, it is
%% possible to attempt a recv for more bytes than will be delivered by the end
%% of the response.
reset_chunked_transfer_state(BodyState={stream, _, _, _}) ->
set_transfer_state({0, 0}, BodyState).

%% @doc Decode a stream of chunks.
-spec te_chunked(binary(), any())
-> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}}
| {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}}
| {done, non_neg_integer(), binary()} | {error, badarg}.
te_chunked(<<>>, _) ->
done;
-> more
| {more, {non_neg_integer(), non_neg_integer()}}
| {chunk_ok, binary(), binary()}
| {chunk_done, binary()}.
te_chunked(Data, _) ->
case read_size(Data) of
{ok, 0, Rest} ->
Expand Down
2 changes: 1 addition & 1 deletion src/hackney_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ body(Client) ->
body(MaxLength, Client) ->
read_body(MaxLength, Client, <<>>).

-spec skip_body(#client{}) -> {ok, #client{}} | {skip, #client{}} | {error, atom()}.
-spec skip_body(#client{}) -> {skip, #client{}} | {error, atom()}.
skip_body(Client) ->
case stream_body(Client) of
{ok, _, Client2} -> skip_body(Client2);
Expand Down
32 changes: 32 additions & 0 deletions test/hackney_http_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,35 @@ parse_chunked_response_trailers_test() ->
{_, P3} = hackney_http:execute(P2, <<"\r\n">>),
{more, P4} = hackney_http:execute(P3, <<"0\r\nFoo: ">>),
?assertEqual({done, <<>>}, hackney_http:execute(P4, <<"Bar\r\n\r\n">>)).


%% Test that the transfer state of a chunked body state is properly reset.
%% Verify the fix for an edge case when calling `hackney_response:stream_body/1'
%% after receiving the last non-terminating chunk w/ specific buffer alignment.
reset_chunked_transfer_state_test() ->
P0 = hackney_http:parser([response]),
{_, _, _, _, P1} = hackney_http:execute(P0, <<"HTTP/1.1 200 OK\r\n">>),
{_, _, P2} = hackney_http:execute(P1, <<"Transfer-Encoding: chunked\r\n">>),
{_, P3} = hackney_http:execute(P2, <<"\r\n">>),

%% Buffer doesn't have whole chunk, transfer state is set to {2, 16}
{more, P4, <<>>} = hackney_http:execute(P3, <<"10\r\naa">>),

%% Chunk is read, transfer state should be reset to {0, 0}
{ok, <<"aaaaaaaaaaaaaaaa">>, P5} = hackney_http:execute(P4, <<"aaaaaaaaaaaaaa\r\n">>),

%% Simulate what would happen if `hackney_response:stream_body/1' was called
%% (e.g. from `skip_body/1')
{more, #hparser{buffer = Buffer,
body_state = {stream, _, TransferState, _}
}, <<>>} = hackney_http:execute(P5),

%% This edge case only cropped up when the buffer was empty at this stage
?assertEqual(Buffer, <<>>),

%% If not {0, 0}, the subsequent `Transport:recv/3' call from within
%% `hackney_response:recv/2' would attempt to receive additional bytes that
%% may not arrive and will hang until timeout. For this example, this is
%% because we are at the end of the response (aside from the terminating
%% chunk and an empty trailer).
?assertEqual({0, 0}, TransferState).

0 comments on commit 1a13c8f

Please sign in to comment.