Skip to content

Commit

Permalink
Wait for input to finish even when streaming, if necessary.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 23, 2024
1 parent 87a2a72 commit 6d6fb1a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
8 changes: 6 additions & 2 deletions lib/async/http/protocol/http1/finishable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ def close(error = nil)
super
end

def wait
def wait(persistent = true)
if @reading
@closed.wait
else
elsif persistent
# If the connection can be reused, let's gracefully discard the body:
self.discard
else
# Else, we don't care about the body, so we can close it immediately:
self.close
end
end

Expand Down
15 changes: 9 additions & 6 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def each(task: Task.current)
request = nil
response = nil

# We must return here as no further request processing can be done:
return body.call(stream)
# In the case of streaming, `finishable` should wrap a `Remainder` body, which we can safely discard later on.
body.call(stream)
elsif response.status == 101
# This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header.
write_response(@version, response.status, response.headers)
Expand All @@ -108,8 +108,7 @@ def each(task: Task.current)
request = nil
response = nil

# We must return here as no further request processing can be done:
return body&.call(stream)
body&.call(stream)
else
write_response(@version, response.status, response.headers)

Expand Down Expand Up @@ -143,8 +142,12 @@ def each(task: Task.current)
request&.finish
end

# Discard or wait for the input body to be consumed:
finishable&.wait
if finishable
finishable.wait(@persistent)
else
# Do not remove this line or you will unleash the gods of concurrency hell.
task.yield
end
rescue => error
raise
ensure
Expand Down

0 comments on commit 6d6fb1a

Please sign in to comment.