Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stream an HTTP response with connexion 3 and AsyncApp? #1928

Open
mr-flannery opened this issue May 16, 2024 · 8 comments
Open

How to stream an HTTP response with connexion 3 and AsyncApp? #1928

mr-flannery opened this issue May 16, 2024 · 8 comments

Comments

@mr-flannery
Copy link

Description

I want to stream HTTP responses with connexion 3 + AsyncApp.

Expected behaviour

I would expect something like this to work:

async def async_gen_numbers():
  for i in range(10):
    await asyncio.sleep(1)
    yield str(i)

async def streaming():
    # option 1
    return async_gen_numbers(), 200, {'Content-Type': 'text/plain'}
    # option 2
    return ConnexionResponse(status_code=200, content_type='text/plain', body=async_gen_numbers(), headers={'transfer-encoding': 'chunked'}, is_streamed=True)

Actual behaviour

Neither of the two approaches listed above works. In both cases there's an error like this:

ERROR:    Request ID e5e552a9-7835-406f-a7fe-89501b09f73e - 'async_generator' object has no attribute 'encode'

Also I couldn't find any docs on how to do this with AsyncApp, only with FlaskApp.

Steps to reproduce

  1. Create a minimum connexion 3 app with AsyncApp.
  2. Use above code

Additional info:

Output of the commands:

  • python --version: 3.11.9
  • pip show connexion | grep "^Version\:": 3.0.6
@Ruwann
Copy link
Member

Ruwann commented May 16, 2024

Hi @mr-flannery ,
I'll have to look into the is_streamed parameter, it seems that it is not correctly used.

In the meantime, can you try by using the underlying Starlette StreamingResponse?

@Ruwann Ruwann added the bug label May 16, 2024
@mr-flannery
Copy link
Author

Hi @Ruwann ,

thanks for the quick response! Your suggestion did solve the problem!

from starlette.responses import StreamingResponse

# ...

async def streaming():
    return StreamingResponse(async_gen_numbers(), status_code=200, media_type='text/plain')

@mr-flannery
Copy link
Author

Hi @Ruwann ,

unfortunately, my response was a bit premature. It looks like it doesn't actually work as expected. While the code does run, it does not actually stream. From a caller's perspective it's still synchronous.

When calling the endpoint with curl, it still blocks for 10 seconds before anything happens, i.e.:

$ curl -N http://localhost:8080/streaming
# nothing happens for 10 seconds
0123456789
$

I wrote an Express app to make sure this was actually the server's fault, not curls:

const Express = require('express');
const app = new Express();

async function* generateNumbers() {
  for (const i of [1, 2, 3, 4, 5]) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield await Promise.resolve(i);
  }
}

app.get('/streaming', async (req, res) => {
  for await (const i of generateNumbers()) {
    res.write(`data: ${i}\n\n`);
  }
  res.end();
});

app.listen(3456, () => {
  console.log('Server is running on port 3456');
})

When using curl here, this works as expected, i.e. it sends a chunk of data every second.

Therefore, I believe there might actually be a bug with sending StreamingResponses in connexion3 + AsyncApp.

@Ruwann
Copy link
Member

Ruwann commented May 22, 2024

I cannot seem to reproduce the issue. When I use StreamingResponse from starlette, the response is streamed to my terminal as expected.

I used the following sample code: https://github.com/Ruwann/connexion-streaming

@mr-flannery
Copy link
Author

Hi @Ruwann ,

thanks for the repo, I was able to track it down. This is what causes the behavior to change:

app.add_api("openapi.yaml", validate_responses=True)

Once I add validate_responses=True to your code, it exhibits the same problem. Vice versa, if I remove it from my code, streaming works as expected.

@mr-flannery
Copy link
Author

Hi @Ruwann ,

just wanted to check in with you and see if there are there any plans to fix this?

Best regards!

@nmoreaud
Copy link

An easy way to handle that would be to have the possibility to disable response body validation on per route basis

@nmoreaud
Copy link

nmoreaud commented Sep 10, 2024

This seem to work, though it is probably not future proof.

validator_map = {
    "response": MediaTypeDict(
        {
            'application/jsonlines+json': StreamResponseBodyValidator
        }
    ),
}

class StreamResponseBodyValidator(AbstractResponseBodyValidator):

    def wrap_send(self, send):
        """Disable validation, leaving stream untouched"""
        return send

    def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:  # type: ignore
        return stream

    def _validate(self, body: dict):
        pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants