Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_stream to create streams via generators #1548

Closed
wants to merge 15 commits into from
Closed
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ matrix:
- cargo build --manifest-path futures-channel/Cargo.toml --no-default-features --features alloc
- cargo build --manifest-path futures-util/Cargo.toml --no-default-features --features alloc

- name: cargo test (async-stream feature)
rust: nightly
script:
- cargo test --manifest-path futures/Cargo.toml --features async-stream,nightly --test async_stream_tests
- cargo clean --manifest-path futures/testcrate/Cargo.toml
- cargo test --manifest-path futures/testcrate/Cargo.toml

- name: cargo build --target=thumbv6m-none-eabi
rust: nightly
install:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"futures",
"futures-async-macro",
"futures-core",
"futures-channel",
"futures-executor",
Expand Down
24 changes: 24 additions & 0 deletions futures-async-macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "futures-async-macro-preview"
edition = "2018"
version = "0.3.0-alpha.17"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang-nursery/futures-rs"
homepage = "https://rust-lang-nursery.github.io/futures-rs"
documentation = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.14/futures_async_macro"
description = """
Definition of the `#[async_stream]` macro for the `futures-rs` crate as well as a few other assorted macros.
"""

[lib]
name = "futures_async_macro"
proc-macro = true

[features]
std = []

[dependencies]
proc-macro2 = "0.4"
quote = "0.6"
syn = { version = "0.15.34", features = ["full", "fold"] }
1 change: 1 addition & 0 deletions futures-async-macro/LICENSE-APACHE
1 change: 1 addition & 0 deletions futures-async-macro/LICENSE-MIT
55 changes: 55 additions & 0 deletions futures-async-macro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

**Note that these are experimental APIs.**

## Usage

Add this to your `Cargo.toml`:

```toml
[dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-stream", "nightly"] }
```

### \#\[for_await\]

Processes streams using a for loop.

This is a reimplement of [futures-await]'s `#[async]` for loops for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#for-await-and-processing-streams).

```rust
#![feature(async_await, stmt_expr_attributes, proc_macro_hygiene)]
use futures::for_await;
use futures::prelude::*;

#[for_await]
for value in stream::iter(1..=5) {
println!("{}", value);
}
```

`value` has the `Item` type of the stream passed in. Note that async for loops can only be used inside of `async` functions, closures, blocks, `#[async_stream]` functions and `async_stream_block!` macros.

### \#\[async_stream\]

Creates streams via generators.

This is a reimplement of [futures-await]'s `#[async_stream]` for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#generators-and-streams).

```rust
#![feature(async_await, generators)]
use futures::prelude::*;
use futures::async_stream;

// Returns a stream of i32
#[async_stream(item = i32)]
fn foo(stream: impl Stream<Item = String>) {
#[for_await]
for x in stream {
yield x.parse().unwrap();
}
}
```

`#[async_stream]` must have an item type specified via `item = some::Path` and the values output from the stream must be yielded via the `yield` expression.

[futures-await]: https://github.com/alexcrichton/futures-await
101 changes: 101 additions & 0 deletions futures-async-macro/src/elision.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use proc_macro2::Span;
use syn::fold::Fold;
use syn::punctuated::Punctuated;
use syn::token::Comma;
use syn::{ArgSelfRef, FnArg, GenericParam, Lifetime, LifetimeDef, TypeReference};

pub(super) fn unelide_lifetimes(
generics: &mut Punctuated<GenericParam, Comma>,
args: Vec<FnArg>,
) -> Vec<FnArg> {
let mut folder = UnelideLifetimes::new(generics);
args.into_iter().map(|arg| folder.fold_fn_arg(arg)).collect()
}

struct UnelideLifetimes<'a> {
generics: &'a mut Punctuated<GenericParam, Comma>,
lifetime_index: usize,
lifetime_name: String,
count: u32,
}

impl<'a> UnelideLifetimes<'a> {
fn new(generics: &'a mut Punctuated<GenericParam, Comma>) -> UnelideLifetimes<'a> {
let lifetime_index = lifetime_index(generics);
let lifetime_name = lifetime_name(generics);
UnelideLifetimes { generics, lifetime_index, lifetime_name, count: 0 }
}

// Constitute a new lifetime
fn new_lifetime(&mut self) -> Lifetime {
let lifetime_name = format!("{}{}", self.lifetime_name, self.count);
let lifetime = Lifetime::new(&lifetime_name, Span::call_site());

let idx = self.lifetime_index + self.count as usize;
self.generics.insert(idx, GenericParam::Lifetime(LifetimeDef::new(lifetime.clone())));
self.count += 1;

lifetime
}

// Take an Option<Lifetime> and guarantee its an unelided lifetime
fn expand_lifetime(&mut self, lifetime: Option<Lifetime>) -> Lifetime {
match lifetime {
Some(l) => self.fold_lifetime(l),
None => self.new_lifetime(),
}
}
}

impl Fold for UnelideLifetimes<'_> {
// Handling self arguments
fn fold_arg_self_ref(&mut self, arg: ArgSelfRef) -> ArgSelfRef {
let ArgSelfRef { and_token, lifetime, mutability, self_token } = arg;
let lifetime = Some(self.expand_lifetime(lifetime));
ArgSelfRef { and_token, lifetime, mutability, self_token }
}

// If the lifetime is `'_`, replace it with a new unelided lifetime
fn fold_lifetime(&mut self, lifetime: Lifetime) -> Lifetime {
if lifetime.ident == "_" {
self.new_lifetime()
} else {
lifetime
}
}

// If the reference's lifetime is elided, replace it with a new unelided lifetime
fn fold_type_reference(&mut self, ty_ref: TypeReference) -> TypeReference {
let TypeReference { and_token, lifetime, mutability, elem } = ty_ref;
let lifetime = Some(self.expand_lifetime(lifetime));
let elem = Box::new(self.fold_type(*elem));
TypeReference { and_token, lifetime, mutability, elem }
}
}

fn lifetime_index(generics: &Punctuated<GenericParam, Comma>) -> usize {
generics
.iter()
.take_while(|param| if let GenericParam::Lifetime(_) = param { true } else { false })
.count()
}

// Determine the prefix for all lifetime names. Ensure it doesn't
// overlap with any existing lifetime names.
fn lifetime_name(generics: &Punctuated<GenericParam, Comma>) -> String {
let mut lifetime_name = String::from("'_async");
let existing_lifetimes: Vec<String> = generics
.iter()
.filter_map(|param| {
if let GenericParam::Lifetime(LifetimeDef { lifetime, .. }) = param {
Some(lifetime.to_string())
} else {
None
}
})
.collect();
while existing_lifetimes.iter().any(|name| name.starts_with(&lifetime_name)) {
lifetime_name.push('_');
}
lifetime_name
}
48 changes: 48 additions & 0 deletions futures-async-macro/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Make error messages a little more readable than `panic!`.
macro_rules! error {
($msg:expr) => {
return proc_macro::TokenStream::from(
syn::Error::new(proc_macro2::Span::call_site(), $msg).to_compile_error(),
)
};
($span:expr, $msg:expr) => {
return proc_macro::TokenStream::from(
syn::Error::new_spanned($span, $msg).to_compile_error(),
)
};
}

// TODO: Should we give another name?
// `assert!` that call `error!` instead of `panic!`.
macro_rules! assert_ {
($e:expr, $msg:expr) => {
if !$e {
error!($msg)
}
};
}

pub(super) fn expr_compile_error(e: &syn::Error) -> syn::Expr {
syn::parse2(e.to_compile_error()).unwrap()
}

// Long error messages and error messages that are called multiple times

macro_rules! args_is_not_empty {
($name:expr) => {
concat!("attribute must be of the form `#[", $name, "]`")
};
}

macro_rules! outside_of_async_error {
($tokens:expr, $name:expr) => {
$crate::error::expr_compile_error(&syn::Error::new_spanned(
$tokens,
concat!(
$name,
" cannot be allowed outside of \
async closures, blocks, functions, async_stream blocks, and functions."
),
))
};
}
Loading