Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_stream to create streams via generators #1548

Closed
wants to merge 15 commits into from

Conversation

taiki-e
Copy link
Member

@taiki-e taiki-e commented Apr 24, 2019

This adds a proc-macro crate for experimenting with the async stream API that we may introduce as a feature of language in the future.

The implementation is basically similar to futures-await's #[async] for and #[async_stream]. (re-added some of the files deleted by #1050 and updated it.)
EDIT: After introducing .await syntax, it was not possible to keep the same implementation in stream scope (#[async_stream]) and future scope (async fn), so it was necessary to adjust it.

cc alexcrichton/futures-await#111

Rendered

_phantom: PhantomData<U>,
// We rely on the fact that async/await streams are immovable in order to create
// self-referential borrows in the underlying generator.
_pin: PhantomPinned,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, that's my bad, I should have removed the explicit !Unpin implementation as part of rust-lang/rust#55704 (or transformed it into an explicit conditional implementation).

_phantom: PhantomData<U>,
// We rely on the fact that async/await streams are immovable in order to create
// self-referential borrows in the underlying generator.
_pin: PhantomPinned,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this, T: ?Unpin so Self: ?Unpin via the structural rules (although because of PhantomData<U> you might be getting the Unpinness of U as well), if the generator transform ever allows non-self-referential static-marked generators to implement Unpin automatically then this could potentially generate impl Stream + Unpin.

@Nemo157
Copy link
Member

Nemo157 commented Apr 24, 2019

One potential future addition to this would be #[async_sink], it seems much more niche than #[async_stream] though.

@Ekleog
Copy link

Ekleog commented Apr 24, 2019

FWIW, here is something that would likely benefit from #[async_sink], which I have rage-quit trying to write as a manual Sink wrapper for now: https://github.com/Ekleog/yuubind/blob/master/smtp-message/src/data/sink.rs#L18 (it's almost with_flat_map except it's also adding some data after the end of the incoming stream)

@Nemo157
Copy link
Member

Nemo157 commented Apr 24, 2019

(somewhat OT: @Ekleog it took a few upgrades to my async_sink_block implementation to workaround type inference issues, and I'm not sure how to deal with errors yet, but I got that example compiling with it: https://gist.github.com/Nemo157/0344ba1aa1d1d89ce2f5ae26ea7b1d46, so maybe it is worth considering adding something like it here in the future)

@taiki-e taiki-e force-pushed the async-stream branch 4 times, most recently from c7288aa to aca5bc0 Compare April 25, 2019 03:23
@yoshuawuyts
Copy link
Member

I would love to see this included; this would allow us to make our tcp-echo and tcp-proxy examples in Runtime so much nicer!


// Returns a stream of i32
#[async_stream]
fn foo(stream: impl Stream<Item = String>) -> i32 {
Copy link
Member Author

@taiki-e taiki-e Apr 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the concerns with current implementations is that there is an Item type at the "return" position. In the futures-await's implementation, this is specified in the form of an attribute argument #[async_stream(item = i32)].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taiki-e that approach seems reasonable to me; are there any reasons why we can't / shouldn't follow future-await's approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts There is no particular reason. Also, it is easy to change the implementation to that.

(Because the stream has no final return value, the return position is always empty. The current implementation is just trying to reuse this blank.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In futures-await we can use ? operator inside stream. If I understand correctly, with new syntax it's impossible. Is it true?

/// Fetch all provided urls one at a time
#[async_stream(item = String)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
    for url in urls {
        let s = await!(fetch(client, url))?;
        stream_yield!(s);
    }
    Ok(())
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kgv yes, because of streams no longer being required to have an error and item type that no longer works.

You can probably do something mostly equivalent with a try block though:

/// Fetch all provided urls one at a time
#[async_stream(item = io::Result<String>)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) {
    let res = try {
        for url in urls {
            yield Ok(fetch(client, url).await?);
        }
    };
    if let Err(err) = res {
         yield Err(err);
    }
}

and it may be possible to add more sugar for this in the future (an async_try_stream maybe?).

Copy link
Member Author

@taiki-e taiki-e May 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nemo157

and it may be possible to add more sugar for this in the future (an async_try_stream maybe?).

I will try it after this PR is merged. And I think it will be something like the following:

#[async_try_stream(ok = String, error = io::Error)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) {
    for url in urls {
         yield fetch(client, url).await?;
    }
}

// desugaring into

fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> impl Stream<Item = Result<String, io::Error>> {
    from_generator(static move || { // impl Generator<Yield = String, Return = Result<(), io::Error>>
        for url in urls {
             yield Poll::Ready(fetch(client, url).await?);
        }
        return Ok(()); // It is the same behavior as try_blocks that this is added automatically.
        loop { yield Poll::Pending }
    })
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, if we add async_try_stream, it may be preferable to write Item at the position of the attribute argument, not at the position of return, so I will revert it)

@taiki-e taiki-e force-pushed the async-stream branch 2 times, most recently from 3ecd7bc to 7db16a1 Compare April 27, 2019 15:18
@taiki-e taiki-e changed the title [WIP] Add async_stream to create streams via generators Add async_stream to create streams via generators Apr 28, 2019
@taiki-e taiki-e force-pushed the async-stream branch 3 times, most recently from 9e3d63d to 4f59a41 Compare May 4, 2019 01:30
@taiki-e taiki-e force-pushed the async-stream branch 4 times, most recently from 4f12883 to d45caae Compare May 8, 2019 02:23
@yoshuawuyts yoshuawuyts mentioned this pull request May 8, 2019
3 tasks
@taiki-e taiki-e changed the title Add async_stream to create streams via generators [WIP] Add async_stream to create streams via generators May 9, 2019
@JuanPotato
Copy link
Member

I really appreciate this feature and would be happy to help with anything. I haven't been working with futures 0.3 for too long but am very willing to help.

@taiki-e
Copy link
Member Author

taiki-e commented Jul 2, 2019

@cramertj Any thoughts about this pr?

@taiki-e
Copy link
Member Author

taiki-e commented Jul 31, 2019

Published as futures-async-stream -- Thanks everybody for your patience. It may not be what was expected, but I believe it would be better to polish it as a separate project and then decide if it should eventually merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants