Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite (SSE) streams always raises hyper::Error(IncompleteMessage) / connection closed before message completed #3759

Open
peku33 opened this issue Oct 3, 2024 · 2 comments
Labels
C-bug Category: bug. Something is wrong. This is bad!

Comments

@peku33
Copy link

peku33 commented Oct 3, 2024

Version
1.4.1 full

Platform
windows 11 64b

Description
For (possibly infinite) SSE streams, when client disconnects (ex. closes connection), hyper connection always returns error of type hyper::Error(IncompleteMessage).

I tried this code:
I used server example from official website + tokio IntervalStream for body:

use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};

use futures::StreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Request, Response};
use hyper_util::rt::TokioIo;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_stream::wrappers::IntervalStream;

async fn hello(
    _: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, Infallible> {
    let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
        .enumerate()
        .map(|(count, _)| {
            let data = Bytes::from(format!("data: Hello {}!\n\n", count));
            let frame = Frame::data(data);
            Ok::<_, Infallible>(frame)
        });

    let http_response = Response::builder()
        .header(header::CONTENT_TYPE, "text/event-stream")
        .body(BodyExt::boxed(StreamBody::new(stream)))
        .unwrap();

    Ok(http_response)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(hello))
                .await
            {
                eprintln!("Error serving connection: {}", err);
            }
        });
    }

    Ok(())
}

I expected to see this happen: serve_connection shouldn't return Err on client disconnect. Ok should be returned and stream dropped.

Instead, this happened: Error of type hyper::Error(IncompleteMessage) is returned.

@peku33 peku33 added the C-bug Category: bug. Something is wrong. This is bad! label Oct 3, 2024
@seanmonstar
Copy link
Member

Have you been able to take a look at what triggers the error? My guess is that since the response isn't complete (because you can still stream more events to the client), when hyper notices the connection closed and determines it needs to tell you about the potential error condition (that the client didn't see all of the response). hyper has no special handling of server-sent-events, as they aren't actually special in the HTTP spec: they are just a long lived streaming response.

Thus, it's not immediately clear how hyper would know whether this is an error, or should be ignored. Suggestions are weclome. But also, since the application knows better what kind of streaming it is doing, it can choose to ignore these errors.

@peku33
Copy link
Author

peku33 commented Oct 26, 2024

My guess is that since the response isn't complete (because you can still stream more events to the client), when hyper notices the connection closed and determines it needs to tell you about the potential error condition (that the client didn't see all of the response)

IIRC this is exactly the case.

Probably to handle this correctly, Body should be aware if its finite or not. Maybe it should be separate method like is_finite_stream or maybe a special case of size_hint? But this makes interface more complicated, especially for typical case.

Also maybe something a bit less correct, but easier to implement would be to check what poll_frame returned, and if it's Pending (eg. no more body available at the moment) the error could be suppressed?

@hyperium hyperium deleted a comment Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: bug. Something is wrong. This is bad!
Projects
None yet
Development

No branches or pull requests

2 participants