diff --git a/.travis.yml b/.travis.yml index 2441997963..469b1daa93 100644 --- a/.travis.yml +++ b/.travis.yml @@ -97,6 +97,13 @@ matrix: - cargo build --manifest-path futures-channel/Cargo.toml --no-default-features --features alloc - cargo build --manifest-path futures-util/Cargo.toml --no-default-features --features alloc + - name: cargo test (async-stream feature) + rust: nightly + script: + - cargo test --manifest-path futures/Cargo.toml --features async-stream,nightly --test async_stream_tests + - cargo clean --manifest-path futures/testcrate/Cargo.toml + - cargo test --manifest-path futures/testcrate/Cargo.toml + - name: cargo build --target=thumbv6m-none-eabi rust: nightly install: diff --git a/Cargo.toml b/Cargo.toml index 430205d8ca..47e4f81a2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "futures", + "futures-async-macro", "futures-core", "futures-channel", "futures-executor", diff --git a/futures-async-macro/Cargo.toml b/futures-async-macro/Cargo.toml new file mode 100644 index 0000000000..49cfc8844a --- /dev/null +++ b/futures-async-macro/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "futures-async-macro-preview" +edition = "2018" +version = "0.3.0-alpha.17" +authors = ["Alex Crichton "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://rust-lang-nursery.github.io/futures-rs" +documentation = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.14/futures_async_macro" +description = """ +Definition of the `#[async_stream]` macro for the `futures-rs` crate as well as a few other assorted macros. +""" + +[lib] +name = "futures_async_macro" +proc-macro = true + +[features] +std = [] + +[dependencies] +proc-macro2 = "0.4" +quote = "0.6" +syn = { version = "0.15.34", features = ["full", "fold"] } diff --git a/futures-async-macro/LICENSE-APACHE b/futures-async-macro/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-async-macro/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-async-macro/LICENSE-MIT b/futures-async-macro/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-async-macro/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-async-macro/README.md b/futures-async-macro/README.md new file mode 100644 index 0000000000..49610f96e6 --- /dev/null +++ b/futures-async-macro/README.md @@ -0,0 +1,55 @@ + +**Note that these are experimental APIs.** + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-preview = { version = "=0.3.0-alpha.16", features = ["async-stream", "nightly"] } +``` + +### \#\[for_await\] + +Processes streams using a for loop. + +This is a reimplement of [futures-await]'s `#[async]` for loops for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#for-await-and-processing-streams). + +```rust +#![feature(async_await, stmt_expr_attributes, proc_macro_hygiene)] +use futures::for_await; +use futures::prelude::*; + +#[for_await] +for value in stream::iter(1..=5) { + println!("{}", value); +} +``` + +`value` has the `Item` type of the stream passed in. Note that async for loops can only be used inside of `async` functions, closures, blocks, `#[async_stream]` functions and `async_stream_block!` macros. + +### \#\[async_stream\] + +Creates streams via generators. + +This is a reimplement of [futures-await]'s `#[async_stream]` for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#generators-and-streams). + +```rust +#![feature(async_await, generators)] +use futures::prelude::*; +use futures::async_stream; + +// Returns a stream of i32 +#[async_stream(item = i32)] +fn foo(stream: impl Stream) { + #[for_await] + for x in stream { + yield x.parse().unwrap(); + } +} +``` + +`#[async_stream]` must have an item type specified via `item = some::Path` and the values output from the stream must be yielded via the `yield` expression. + +[futures-await]: https://github.com/alexcrichton/futures-await diff --git a/futures-async-macro/src/elision.rs b/futures-async-macro/src/elision.rs new file mode 100644 index 0000000000..4d4d6c13e6 --- /dev/null +++ b/futures-async-macro/src/elision.rs @@ -0,0 +1,101 @@ +use proc_macro2::Span; +use syn::fold::Fold; +use syn::punctuated::Punctuated; +use syn::token::Comma; +use syn::{ArgSelfRef, FnArg, GenericParam, Lifetime, LifetimeDef, TypeReference}; + +pub(super) fn unelide_lifetimes( + generics: &mut Punctuated, + args: Vec, +) -> Vec { + let mut folder = UnelideLifetimes::new(generics); + args.into_iter().map(|arg| folder.fold_fn_arg(arg)).collect() +} + +struct UnelideLifetimes<'a> { + generics: &'a mut Punctuated, + lifetime_index: usize, + lifetime_name: String, + count: u32, +} + +impl<'a> UnelideLifetimes<'a> { + fn new(generics: &'a mut Punctuated) -> UnelideLifetimes<'a> { + let lifetime_index = lifetime_index(generics); + let lifetime_name = lifetime_name(generics); + UnelideLifetimes { generics, lifetime_index, lifetime_name, count: 0 } + } + + // Constitute a new lifetime + fn new_lifetime(&mut self) -> Lifetime { + let lifetime_name = format!("{}{}", self.lifetime_name, self.count); + let lifetime = Lifetime::new(&lifetime_name, Span::call_site()); + + let idx = self.lifetime_index + self.count as usize; + self.generics.insert(idx, GenericParam::Lifetime(LifetimeDef::new(lifetime.clone()))); + self.count += 1; + + lifetime + } + + // Take an Option and guarantee its an unelided lifetime + fn expand_lifetime(&mut self, lifetime: Option) -> Lifetime { + match lifetime { + Some(l) => self.fold_lifetime(l), + None => self.new_lifetime(), + } + } +} + +impl Fold for UnelideLifetimes<'_> { + // Handling self arguments + fn fold_arg_self_ref(&mut self, arg: ArgSelfRef) -> ArgSelfRef { + let ArgSelfRef { and_token, lifetime, mutability, self_token } = arg; + let lifetime = Some(self.expand_lifetime(lifetime)); + ArgSelfRef { and_token, lifetime, mutability, self_token } + } + + // If the lifetime is `'_`, replace it with a new unelided lifetime + fn fold_lifetime(&mut self, lifetime: Lifetime) -> Lifetime { + if lifetime.ident == "_" { + self.new_lifetime() + } else { + lifetime + } + } + + // If the reference's lifetime is elided, replace it with a new unelided lifetime + fn fold_type_reference(&mut self, ty_ref: TypeReference) -> TypeReference { + let TypeReference { and_token, lifetime, mutability, elem } = ty_ref; + let lifetime = Some(self.expand_lifetime(lifetime)); + let elem = Box::new(self.fold_type(*elem)); + TypeReference { and_token, lifetime, mutability, elem } + } +} + +fn lifetime_index(generics: &Punctuated) -> usize { + generics + .iter() + .take_while(|param| if let GenericParam::Lifetime(_) = param { true } else { false }) + .count() +} + +// Determine the prefix for all lifetime names. Ensure it doesn't +// overlap with any existing lifetime names. +fn lifetime_name(generics: &Punctuated) -> String { + let mut lifetime_name = String::from("'_async"); + let existing_lifetimes: Vec = generics + .iter() + .filter_map(|param| { + if let GenericParam::Lifetime(LifetimeDef { lifetime, .. }) = param { + Some(lifetime.to_string()) + } else { + None + } + }) + .collect(); + while existing_lifetimes.iter().any(|name| name.starts_with(&lifetime_name)) { + lifetime_name.push('_'); + } + lifetime_name +} diff --git a/futures-async-macro/src/error.rs b/futures-async-macro/src/error.rs new file mode 100644 index 0000000000..ffd0417325 --- /dev/null +++ b/futures-async-macro/src/error.rs @@ -0,0 +1,48 @@ +// Make error messages a little more readable than `panic!`. +macro_rules! error { + ($msg:expr) => { + return proc_macro::TokenStream::from( + syn::Error::new(proc_macro2::Span::call_site(), $msg).to_compile_error(), + ) + }; + ($span:expr, $msg:expr) => { + return proc_macro::TokenStream::from( + syn::Error::new_spanned($span, $msg).to_compile_error(), + ) + }; +} + +// TODO: Should we give another name? +// `assert!` that call `error!` instead of `panic!`. +macro_rules! assert_ { + ($e:expr, $msg:expr) => { + if !$e { + error!($msg) + } + }; +} + +pub(super) fn expr_compile_error(e: &syn::Error) -> syn::Expr { + syn::parse2(e.to_compile_error()).unwrap() +} + +// Long error messages and error messages that are called multiple times + +macro_rules! args_is_not_empty { + ($name:expr) => { + concat!("attribute must be of the form `#[", $name, "]`") + }; +} + +macro_rules! outside_of_async_error { + ($tokens:expr, $name:expr) => { + $crate::error::expr_compile_error(&syn::Error::new_spanned( + $tokens, + concat!( + $name, + " cannot be allowed outside of \ + async closures, blocks, functions, async_stream blocks, and functions." + ), + )) + }; +} diff --git a/futures-async-macro/src/lib.rs b/futures-async-macro/src/lib.rs new file mode 100644 index 0000000000..9ad6874e5d --- /dev/null +++ b/futures-async-macro/src/lib.rs @@ -0,0 +1,423 @@ +//! Procedural macro for the `#[async_stream]` attribute. + +#![recursion_limit = "128"] +#![warn(rust_2018_idioms, unreachable_pub)] +// It cannot be included in the published code because this lints have false positives in the minimum required version. +#![cfg_attr(test, warn(single_use_lifetimes))] +#![warn(clippy::all)] + +extern crate proc_macro; + +use proc_macro::{Delimiter, Group, TokenStream, TokenTree}; +use proc_macro2::{Span, TokenStream as TokenStream2, TokenTree as TokenTree2}; +use quote::{quote, ToTokens}; +use syn::{ + fold::{self, Fold}, + parse::{Parse, ParseStream}, + token, ArgCaptured, Error, Expr, ExprCall, ExprField, ExprForLoop, ExprMacro, ExprYield, FnArg, + FnDecl, Ident, Item, ItemFn, Member, Pat, PatIdent, ReturnType, Token, Type, TypeTuple, +}; + +#[macro_use] +mod error; + +mod elision; + +/// Processes streams using a for loop. +#[proc_macro_attribute] +pub fn for_await(args: TokenStream, input: TokenStream) -> TokenStream { + assert_!(args.is_empty(), args_is_not_empty!("for_await")); + + let mut expr: ExprForLoop = syn::parse_macro_input!(input); + expr.attrs.push(syn::parse_quote!(#[for_await])); + Expand(Future).fold_expr(Expr::ForLoop(expr)).into_token_stream().into() +} + +/// Creates streams via generators. +#[proc_macro_attribute] +pub fn async_stream(args: TokenStream, input: TokenStream) -> TokenStream { + let arg: Arg = syn::parse_macro_input!(args); + let function: ItemFn = syn::parse_macro_input!(input); + expand_async_stream_fn(function, &arg.0) +} + +fn expand_async_stream_fn(function: ItemFn, item_ty: &Type) -> TokenStream { + // Parse our item, expecting a function. This function may be an actual + // top-level function or it could be a method (typically dictated by the + // arguments). We then extract everything we'd like to use. + let ItemFn { ident, vis, constness, unsafety, abi, block, decl, attrs, .. } = function; + let FnDecl { inputs, output, variadic, mut generics, fn_token, .. } = *decl; + let where_clause = &generics.where_clause; + assert_!(variadic.is_none(), "variadic functions cannot be async"); + if let ReturnType::Type(_, t) = output { + match &*t { + Type::Tuple(TypeTuple { elems, .. }) if elems.is_empty() => {} + _ => error!(t, "async stream functions must return the unit type"), + } + } + + // We've got to get a bit creative with our handling of arguments. For a + // number of reasons we translate this: + // + // fn foo(ref a: u32) -> u32 { + // // ... + // } + // + // into roughly: + // + // fn foo(__arg_0: u32) -> impl Stream { + // from_generator(static move || { + // let ref a = __arg_0; + // + // // ... + // }) + // } + // + // The intention here is to ensure that all local function variables get + // moved into the generator we're creating, and they're also all then bound + // appropriately according to their patterns and whatnot. + // + // We notably skip everything related to `self` which typically doesn't have + // many patterns with it and just gets captured naturally. + let mut inputs_no_patterns = Vec::new(); + let mut patterns = Vec::new(); + let mut temp_bindings = Vec::new(); + for (i, input) in inputs.into_iter().enumerate() { + // `self: Box` will get captured naturally + let mut is_input_no_pattern = false; + if let FnArg::Captured(ArgCaptured { pat: Pat::Ident(pat), .. }) = &input { + if pat.ident == "self" { + is_input_no_pattern = true; + } + } + if is_input_no_pattern { + inputs_no_patterns.push(input); + continue; + } + + if let FnArg::Captured(ArgCaptured { + pat: pat @ Pat::Ident(PatIdent { by_ref: Some(_), .. }), + ty, + colon_token, + }) = input + { + // `ref a: B` (or some similar pattern) + patterns.push(pat); + let ident = Ident::new(&format!("__arg_{}", i), Span::call_site()); + temp_bindings.push(ident.clone()); + let pat = PatIdent { by_ref: None, mutability: None, ident, subpat: None }.into(); + inputs_no_patterns.push(ArgCaptured { pat, ty, colon_token }.into()); + } else { + // Other arguments get captured naturally + inputs_no_patterns.push(input); + } + } + + // This is the point where we handle + // + // #[for_await] + // for x in y { + // } + // + // Basically just take all those expression and expand them. + // + // Also, in some items, it needs to adjust the type to be generated depending on whether it is + // called in the scope of async or the scope of async-stream, it is processed here. + let block = Expand(Stream).fold_block(*block); + + let block_inner = quote! { + #( let #patterns = #temp_bindings; )* + #block + }; + let mut result = TokenStream2::new(); + block.brace_token.surround(&mut result, |tokens| { + block_inner.to_tokens(tokens); + }); + token::Semi([block.brace_token.span]).to_tokens(&mut result); + + let gen_body_inner = quote! { + let (): () = #result + + // Ensure that this closure is a generator, even if it doesn't + // have any `yield` statements. + #[allow(unreachable_code)] + { + return; + loop { yield ::futures::core_reexport::task::Poll::Pending } + } + }; + let mut gen_body = TokenStream2::new(); + block.brace_token.surround(&mut gen_body, |tokens| { + gen_body_inner.to_tokens(tokens); + }); + + // Give the invocation of the `from_generator` function the same span as the `item_ty` + // as currently errors related to it being a result are targeted here. Not + // sure if more errors will highlight this function call... + let output_span = first_last(item_ty); + let gen_function = quote! { ::futures::async_stream::from_generator }; + let gen_function = respan(gen_function, output_span); + let body_inner = quote! { + #gen_function (static move || -> () #gen_body) + }; + let mut body = TokenStream2::new(); + block.brace_token.surround(&mut body, |tokens| { + body_inner.to_tokens(tokens); + }); + + let inputs_no_patterns = elision::unelide_lifetimes(&mut generics.params, inputs_no_patterns); + let lifetimes: Vec<_> = generics.lifetimes().map(|l| &l.lifetime).collect(); + + // Raw `impl` breaks syntax highlighting in some editors. + let impl_token = token::Impl::default(); + let return_ty = quote! { + #impl_token ::futures::stream::Stream + #(#lifetimes +)* + }; + let return_ty = respan(return_ty, output_span); + TokenStream::from(quote! { + #(#attrs)* + #vis #unsafety #abi #constness + #fn_token #ident #generics (#(#inputs_no_patterns),*) + -> #return_ty + #where_clause + #body + }) +} + +/// Creates streams via generators. +#[proc_macro] +pub fn async_stream_block(input: TokenStream) -> TokenStream { + let input = TokenStream::from(TokenTree::Group(Group::new(Delimiter::Brace, input))); + let expr = syn::parse_macro_input!(input); + let expr = Expand(Stream).fold_expr(expr); + + let mut tokens = quote! { ::futures::async_stream::from_generator }; + + // Use some manual token construction here instead of `quote!` to ensure + // that we get the `call_site` span instead of the default span. + let span = Span::call_site(); + token::Paren(span).surround(&mut tokens, |tokens| { + token::Static(span).to_tokens(tokens); + token::Move(span).to_tokens(tokens); + token::OrOr([span, span]).to_tokens(tokens); + token::Brace(span).surround(tokens, |tokens| { + (quote! { + if false { yield ::futures::core_reexport::task::Poll::Pending } + }) + .to_tokens(tokens); + expr.to_tokens(tokens); + }); + }); + + tokens.into() +} + +struct Arg(Type); + +mod kw { + syn::custom_keyword!(item); +} + +impl Parse for Arg { + fn parse(input: ParseStream<'_>) -> syn::Result { + let _: kw::item = input.parse()?; + let _: Token![=] = input.parse()?; + input.parse().map(Self) + } +} + +/// The scope in which `#[for_await]`, `.await` was called. +/// +/// The type of generator depends on which scope is called. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Scope { + /// `async fn`, `async {}`, `async ||` + Future, + /// `#[async_stream]`, `async_stream_block!` + Stream, + /// `static move ||`, `||` + /// + /// It cannot call `#[for_await]`, `.await` in this scope. + Closure, +} + +use Scope::{Closure, Future, Stream}; + +struct Expand(Scope); + +impl Expand { + /// Expands `#[for_await] for pat in expr { .. }`. + fn expand_for_await(&self, mut expr: ExprForLoop) -> Expr { + if !(expr.attrs.len() == 1 && expr.attrs[0].path.is_ident("for_await")) { + return Expr::ForLoop(expr); + } else if !expr.attrs[0].tts.is_empty() { + return error::expr_compile_error(&Error::new_spanned( + expr.attrs.pop(), + args_is_not_empty!("for_await"), + )); + } + + let ExprForLoop { label, pat, expr, body, .. } = &expr; + // It needs to adjust the type yielded by the macro because generators used internally by + // async fn yield `()` type, but generators used internally by `async_stream` yield + // `Poll` type. + match self.0 { + Future => { + // Basically just expand to a `poll` loop + syn::parse_quote! {{ + let mut __pinned = #expr; + let mut __pinned = unsafe { + ::futures::core_reexport::pin::Pin::new_unchecked(&mut __pinned) + }; + #label + loop { + let #pat = { + match ::futures::stream::StreamExt::next(&mut __pinned).await { + ::futures::core_reexport::option::Option::Some(e) => e, + ::futures::core_reexport::option::Option::None => break, + } + }; + + #body + } + }} + } + Stream => { + // Basically just expand to a `poll` loop + syn::parse_quote! {{ + let mut __pinned = #expr; + #label + loop { + let #pat = { + match ::futures::async_stream::poll_next_with_tls_context(unsafe { + ::futures::core_reexport::pin::Pin::new_unchecked(&mut __pinned) + }) + { + ::futures::core_reexport::task::Poll::Ready(e) => { + match e { + ::futures::core_reexport::option::Option::Some(e) => e, + ::futures::core_reexport::option::Option::None => break, + } + } + ::futures::core_reexport::task::Poll::Pending => { + yield ::futures::core_reexport::task::Poll::Pending; + continue + } + } + }; + + #body + } + }} + } + Closure => return outside_of_async_error!(expr, "#[for_await]"), + } + } + + /// Expands `yield expr` in `async_stream` scope. + fn expand_yield(&self, expr: ExprYield) -> ExprYield { + if self.0 != Stream { return expr } + + let ExprYield { attrs, yield_token, expr } = expr; + let expr = expr.map_or_else(|| quote!(()), ToTokens::into_token_stream); + let expr = syn::parse_quote! { + ::futures::core_reexport::task::Poll::Ready(#expr) + }; + ExprYield { attrs, yield_token, expr: Some(Box::new(expr)) } + } + + /// Expands `async_stream_block!` macro. + fn expand_macro(&mut self, mut expr: ExprMacro) -> Expr { + if expr.mac.path.is_ident("async_stream_block") { + let mut e: ExprCall = syn::parse(async_stream_block(expr.mac.tts.into())).unwrap(); + e.attrs.append(&mut expr.attrs); + Expr::Call(e) + } else { + Expr::Macro(expr) + } + } + + /// Expands `expr.await` in `async_stream` scope. + /// + /// It needs to adjust the type yielded by the macro because generators used internally by + /// async fn yield `()` type, but generators used internally by `async_stream` yield + /// `Poll` type. + fn expand_await(&mut self, expr: ExprField) -> Expr { + if self.0 != Stream { return Expr::Field(expr) } + + match &expr.member { + Member::Named(x) if x == "await" => {} + _ => return Expr::Field(expr), + } + let expr = expr.base; + + // Because macro input (`#expr`) is untrusted, use `syn::parse2` + `expr_compile_error` + // instead of `syn::parse_quote!` to generate better error messages (`syn::parse_quote!` + // panics if fail to parse). + syn::parse2(quote! {{ + let mut __pinned = #expr; + loop { + if let ::futures::core_reexport::task::Poll::Ready(x) = + ::futures::async_stream::poll_with_tls_context(unsafe { + ::futures::core_reexport::pin::Pin::new_unchecked(&mut __pinned) + }) + { + break x; + } + + yield ::futures::core_reexport::task::Poll::Pending + } + }}) + .unwrap_or_else(|e| error::expr_compile_error(&e)) + } +} + +impl Fold for Expand { + fn fold_expr(&mut self, expr: Expr) -> Expr { + // Backup current scope and adjust the scope. This must be done before folding expr. + let tmp = self.0; + match &expr { + Expr::Async(_) => self.0 = Future, + Expr::Closure(expr) => self.0 = if expr.asyncness.is_some() { Future } else { Closure }, + Expr::Macro(expr) if expr.mac.path.is_ident("async_stream_block") => self.0 = Stream, + _ => {} + } + + let expr = match fold::fold_expr(self, expr) { + Expr::Yield(expr) => Expr::Yield(self.expand_yield(expr)), + Expr::Field(expr) => self.expand_await(expr), + Expr::ForLoop(expr) => self.expand_for_await(expr), + Expr::Macro(expr) => self.expand_macro(expr), + expr => expr, + }; + + // Restore the backup. + self.0 = tmp; + expr + } + + // Don't recurse into items + fn fold_item(&mut self, item: Item) -> Item { + item + } +} + +fn first_last(tokens: &T) -> (Span, Span) { + let mut spans = TokenStream2::new(); + tokens.to_tokens(&mut spans); + let good_tokens = spans.into_iter().collect::>(); + let first_span = good_tokens.first().map_or_else(Span::call_site, TokenTree2::span); + let last_span = good_tokens.last().map_or_else(|| first_span, TokenTree2::span); + (first_span, last_span) +} + +fn respan(input: TokenStream2, (first_span, last_span): (Span, Span)) -> TokenStream2 { + let mut new_tokens = input.into_iter().collect::>(); + if let Some(token) = new_tokens.first_mut() { + token.set_span(first_span); + } + for token in new_tokens.iter_mut().skip(1) { + token.set_span(last_span); + } + new_tokens.into_iter().collect() +} diff --git a/futures-select-macro/Cargo.toml b/futures-select-macro/Cargo.toml index fd81124a28..ad7ff2907e 100644 --- a/futures-select-macro/Cargo.toml +++ b/futures-select-macro/Cargo.toml @@ -22,4 +22,4 @@ std = [] proc-macro2 = "0.4" proc-macro-hack = "0.5.3" quote = "0.6" -syn = { version = "0.15.25", features = ["full"] } +syn = { version = "0.15.34", features = ["full"] } diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index a28bc5c718..97ea4293f6 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -28,6 +28,7 @@ sink = ["futures-sink-preview"] io = ["std", "futures-io-preview", "memchr"] channel = ["std", "futures-channel-preview"] select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack", "proc-macro-nested", "rand"] +async-stream = ["std", "async-await"] [dependencies] futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.17", default-features = false } diff --git a/futures-util/src/async_stream/mod.rs b/futures-util/src/async_stream/mod.rs new file mode 100644 index 0000000000..1391ab4a4a --- /dev/null +++ b/futures-util/src/async_stream/mod.rs @@ -0,0 +1,72 @@ +//! Await +//! +//! This module contains a number of functions and combinators for working +//! with `async`/`await` code. + +use futures_core::future::Future; +use futures_core::stream::Stream; +use std::future::{self, get_task_context, set_task_context}; +use std::marker::PhantomData; +use std::ops::{Generator, GeneratorState}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Wrap a generator in a stream. +/// +/// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give +/// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`). +pub fn from_generator(x: T) -> impl Stream +where + T: Generator, Return = ()>, +{ + GenStream { gen: x, done: false, _phantom: PhantomData } +} + +/// A wrapper around generators used to implement `Stream` for `async`/`await` code. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct GenStream, Return = ()>> { + gen: T, + // If resuming after Complete, std generators panic. This is natural when using the generators, + // but in the streams, we may want to call `poll_next` after `poll_next` returns `None` + // because it is possible to call `next` after `next` returns `None` in many iterators. + done: bool, + _phantom: PhantomData, +} + +impl, Return = ()>> Stream for GenStream { + type Item = U; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + return Poll::Ready(None); + } + // Safe because we're !Unpin + !Drop mapping to a ?Unpin value + let Self { gen, done, .. } = unsafe { Pin::get_unchecked_mut(self) }; + let gen = unsafe { Pin::new_unchecked(gen) }; + set_task_context(cx, || match gen.resume() { + GeneratorState::Yielded(x) => x.map(Some), + GeneratorState::Complete(()) => { + *done = true; + Poll::Ready(None) + } + }) + } +} + +/// Polls a stream in the current thread-local task waker. +pub fn poll_next_with_tls_context(s: Pin<&mut S>) -> Poll> +where + S: Stream, +{ + get_task_context(|cx| S::poll_next(s, cx)) +} + +// The `await!` called in `async_stream` needs to be adjust to yield `Poll`, +// but for this purpose, we don't want the user to use `#![feature(gen_future)]`. +/// Polls a future in the current thread-local task waker. +#[inline] +pub fn poll_with_tls_context(f: Pin<&mut F>) -> Poll +where + F: Future +{ + future::poll_with_tls_context(f) +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 82d6a44763..766efad0e2 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -3,6 +3,7 @@ #![cfg_attr(feature = "async-await", feature(async_await))] #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] +#![cfg_attr(feature = "async-stream", feature(gen_future, generator_trait, generators))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] @@ -20,6 +21,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `nightly` featu #[cfg(all(feature = "async-await", not(feature = "nightly")))] compile_error!("The `async-await` feature requires the `nightly` feature as an explicit opt-in to unstable features"); +#[cfg(all(feature = "async-stream", not(feature = "nightly")))] +compile_error!("The `async-stream` feature requires the `nightly` feature as an explicit opt-in to unstable features"); + #[cfg(feature = "alloc")] extern crate alloc; @@ -38,6 +42,14 @@ pub mod async_await; #[doc(hidden)] pub use self::async_await::*; +#[cfg(feature = "async-stream")] +#[macro_use] +#[doc(hidden)] +pub mod async_stream; +#[cfg(feature = "async-stream")] +#[doc(hidden)] +pub use self::async_stream::*; + #[cfg(feature = "select-macro")] #[doc(hidden)] pub mod rand_reexport { // used by select! diff --git a/futures/Cargo.toml b/futures/Cargo.toml index b598bfb6fc..47d3b48784 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -28,6 +28,7 @@ futures-executor-preview = { path = "../futures-executor", version = "=0.3.0-alp futures-io-preview = { path = "../futures-io", version = "=0.3.0-alpha.17", default-features = false } futures-sink-preview = { path = "../futures-sink", version = "=0.3.0-alpha.17", default-features = false } futures-util-preview = { path = "../futures-util", version = "=0.3.0-alpha.17", default-features = false, features = ["sink"] } +futures-async-macro-preview = { path = "../futures-async-macro", version = "=0.3.0-alpha.17", optional = true } [dev-dependencies] pin-utils = "0.1.0-alpha.4" @@ -44,6 +45,7 @@ async-await = ["futures-util-preview/async-await", "futures-util-preview/select- compat = ["std", "futures-util-preview/compat"] io-compat = ["compat", "futures-util-preview/io-compat"] cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic", "futures-channel-preview/cfg-target-has-atomic", "futures-util-preview/cfg-target-has-atomic"] +async-stream = ["std", "async-await", "futures-util-preview/async-stream", "futures-async-macro-preview"] [package.metadata.docs.rs] all-features = true diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 675b2656bd..dff675eb84 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -40,6 +40,9 @@ compile_error!("The `async-await` feature requires the `nightly` feature as an e #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "nightly")))] compile_error!("The `cfg-target-has-atomic` feature requires the `nightly` feature as an explicit opt-in to unstable features"); +#[cfg(all(feature = "async-stream", not(feature = "nightly")))] +compile_error!("The `async-stream` feature requires the `nightly` feature as an explicit opt-in to unstable features"); + #[doc(hidden)] pub use futures_core::core_reexport; #[doc(hidden)] pub use futures_core::future::Future; @@ -72,6 +75,13 @@ pub use futures_util::{ // Async-await join, try_join, pending, poll, }; +// Async stream +#[cfg(feature = "async-stream")] +#[doc(hidden)] +pub use futures_util::async_stream; +#[cfg(feature = "async-stream")] +#[doc(hidden)] // https://github.com/rust-lang/rust/issues/58696 +pub use futures_async_macro::*; #[cfg_attr( feature = "cfg-target-has-atomic", diff --git a/futures/testcrate/Cargo.toml b/futures/testcrate/Cargo.toml index 96ed23238c..13a6665dad 100644 --- a/futures/testcrate/Cargo.toml +++ b/futures/testcrate/Cargo.toml @@ -2,16 +2,17 @@ name = "testcrate" version = "0.1.0" authors = ["Alex Crichton "] +edition = "2018" +publish = false [lib] path = "lib.rs" -[dependencies.futures] -features = ["std", "nightly"] -path = ".." +[dependencies] +futures-preview = { path = "..", features = ["async-stream", "nightly"] } [dev-dependencies] -compiletest_rs = "0.3.7" +compiletest = { version = "0.3.21", package = "compiletest_rs", features = ["stable"] } [[test]] name = "ui" diff --git a/futures/testcrate/lib.rs b/futures/testcrate/lib.rs index e69de29bb2..8b13789179 100644 --- a/futures/testcrate/lib.rs +++ b/futures/testcrate/lib.rs @@ -0,0 +1 @@ + diff --git a/futures/testcrate/tests/ui.rs b/futures/testcrate/tests/ui.rs index dc1f49044a..fc4f116121 100644 --- a/futures/testcrate/tests/ui.rs +++ b/futures/testcrate/tests/ui.rs @@ -2,18 +2,24 @@ fn run_mode(mode: &'static str) { use std::env; use std::path::PathBuf; - let mut config = compiletest_rs::Config::default(); + let mut config = compiletest::Config::default(); config.mode = mode.parse().expect("invalid mode"); let mut me = env::current_exe().unwrap(); me.pop(); - config.target_rustcflags = Some(format!("-L {}", me.display())); + config.target_rustcflags = Some(format!( + "--edition=2018 \ + -Z unstable-options \ + --extern futures \ + -L {}", + me.display() + )); let src = PathBuf::from(env!("CARGO_MANIFEST_DIR")); config.src_base = src.join(mode); me.pop(); me.pop(); config.build_base = me.join("tests").join(mode); - compiletest_rs::run_tests(&config); + compiletest::run_tests(&config); } fn main() { diff --git a/futures/testcrate/ui/bad-input.rs b/futures/testcrate/ui/bad-input.rs new file mode 100644 index 0000000000..7479a652c3 --- /dev/null +++ b/futures/testcrate/ui/bad-input.rs @@ -0,0 +1,22 @@ + +#![feature(async_await, generators)] + +use futures::*; + +#[async_stream(item = i32)] +fn foo() { + #[for_await(bar)] + for i in stream::iter(vec![1, 2]) { + yield i; + } +} + +#[async_stream(baz, item = i32)] +fn bar() { + #[for_await] + for i in stream::iter(vec![1, 2]) { + yield i; + } +} + +fn main() {} diff --git a/futures/testcrate/ui/bad-input.stderr b/futures/testcrate/ui/bad-input.stderr new file mode 100644 index 0000000000..1b36885022 --- /dev/null +++ b/futures/testcrate/ui/bad-input.stderr @@ -0,0 +1,14 @@ +error: attribute must be of the form `#[for_await]` + --> $DIR/bad-input.rs:8:5 + | +8 | #[for_await(bar)] + | ^^^^^^^^^^^^^^^^^ + +error: expected `item` + --> $DIR/bad-input.rs:14:16 + | +14 | #[async_stream(baz, item = i32)] + | ^^^ + +error: aborting due to 2 previous errors + diff --git a/futures/testcrate/ui/bad-item-type.rs b/futures/testcrate/ui/bad-item-type.rs new file mode 100644 index 0000000000..eebf91cdfc --- /dev/null +++ b/futures/testcrate/ui/bad-item-type.rs @@ -0,0 +1,24 @@ +#![feature(async_await, generators)] + +use futures::*; + +#[async_stream(item = Option)] +fn foobar() { + let val = Some(42); + if val.is_none() { + yield None; + return; + } + let val = val.unwrap(); + yield val; +} + +#[async_stream(item = (i32, i32))] +fn tuple() { + if false { + yield 3; + } + yield (1, 2) +} + +fn main() {} diff --git a/futures/testcrate/ui/bad-item-type.stderr b/futures/testcrate/ui/bad-item-type.stderr new file mode 100644 index 0000000000..3de92a4dc8 --- /dev/null +++ b/futures/testcrate/ui/bad-item-type.stderr @@ -0,0 +1,131 @@ +error[E0308]: mismatched types + --> $DIR/bad-item-type.rs:13:11 + | +13 | yield val; + | ^^^ + | | + | expected enum `std::option::Option`, found integer + | help: try using a variant of the expected type: `Some(val)` + | + = note: expected type `std::option::Option<_>` + found type `{integer}` + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:7:9 + | +7 | let val = Some(42); + | ^^^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:5:1 + | +5 | #[async_stream(item = Option)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:12:9 + | +12 | let val = val.unwrap(); + | ^^^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:5:1 + | +5 | #[async_stream(item = Option)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:13:11 + | +13 | yield val; + | ^^^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:5:1 + | +5 | #[async_stream(item = Option)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0308]: mismatched types + --> $DIR/bad-item-type.rs:21:11 + | +21 | yield (1, 2) + | ^^^^^^ expected integer, found tuple + | + = note: expected type `{integer}` + found type `({integer}, {integer})` + +error[E0271]: type mismatch resolving `::Item == (i32, i32)` + --> $DIR/bad-item-type.rs:16:23 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^ expected integer, found tuple + | + = note: expected type `{integer}` + found type `(i32, i32)` + = note: the return type of a function must have a statically known size + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:19:15 + | +19 | yield 3; + | ^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:21:12 + | +21 | yield (1, 2) + | ^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:21:15 + | +21 | yield (1, 2) + | ^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error[E0698]: type inside generator must be known in this context + --> $DIR/bad-item-type.rs:21:11 + | +21 | yield (1, 2) + | ^^^^^^ cannot infer type for `{integer}` + | +note: the type is part of the generator because of this `yield` + --> $DIR/bad-item-type.rs:16:1 + | +16 | #[async_stream(item = (i32, i32))] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +error: aborting due to 11 previous errors + +Some errors have detailed explanations: E0271, E0308, E0698. +For more information about an error, try `rustc --explain E0271`. diff --git a/futures/testcrate/ui/bad-return-type.rs b/futures/testcrate/ui/bad-return-type.rs index 0fd2187064..fdf1101b22 100644 --- a/futures/testcrate/ui/bad-return-type.rs +++ b/futures/testcrate/ui/bad-return-type.rs @@ -1,33 +1,11 @@ -#![feature(proc_macro, generators)] +#![feature(async_await, generators)] -#[async] -fn foobar() -> Result, ()> { - let val = Some(42); - if val.is_none() { - return Ok(None) - } - let val = val.unwrap(); - Ok(val) -} +use futures::*; #[async_stream(item = Option)] -fn foobars() -> Result<(), ()> { - let val = Some(42); - if val.is_none() { - stream_yield!(None); - return Ok(()) - } - let val = val.unwrap(); - stream_yield!(val); - Ok(()) -} +fn foo() -> i32 {} // ERROR -#[async] -fn tuple() -> Result<(i32, i32), ()> { - if false { - return Ok(3); - } - Ok((1, 2)) -} +#[async_stream(item = (i32, i32))] +fn tuple() -> () {} // OK fn main() {} diff --git a/futures/testcrate/ui/bad-return-type.stderr b/futures/testcrate/ui/bad-return-type.stderr index 1e71aa7ab8..bd53bbde07 100644 --- a/futures/testcrate/ui/bad-return-type.stderr +++ b/futures/testcrate/ui/bad-return-type.stderr @@ -1,90 +1,8 @@ -error[E0308]: mismatched types - --> $DIR/bad-return-type.rs:14:8 - | -14 | Ok(val) - | ^^^ - | | - | expected enum `std::option::Option`, found integral variable - | help: try using a variant of the expected type: `Some(val)` - | - = note: expected type `std::option::Option` - found type `{integer}` +error: async stream functions must return the unit type + --> $DIR/bad-return-type.rs:6:13 + | +6 | fn foo() -> i32 {} // ERROR + | ^^^ -error[E0308]: mismatched types - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - | | - | expected enum `std::option::Option`, found integral variable - | help: try using a variant of the expected type: `Some(e)` - | - = note: expected type `std::option::Option<_>` - found type `{integer}` - = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) +error: aborting due to previous error -error[E0907]: type inside generator must be known in this context - --> $DIR/bad-return-type.rs:19:9 - | -19 | let val = Some(42); - | ^^^ - | -note: the type is part of the generator because of this `yield` - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) - -error[E0907]: type inside generator must be known in this context - --> $DIR/bad-return-type.rs:24:9 - | -24 | let val = val.unwrap(); - | ^^^ - | -note: the type is part of the generator because of this `yield` - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) - -error[E0907]: type inside generator must be known in this context - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - | -note: the type is part of the generator because of this `yield` - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) - -error[E0907]: type inside generator must be known in this context - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - | -note: the type is part of the generator because of this `yield` - --> $DIR/bad-return-type.rs:25:5 - | -25 | stream_yield!(val); - | ^^^^^^^^^^^^^^^^^^^ - = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) - -error[E0308]: mismatched types - --> $DIR/bad-return-type.rs:32:19 - | -32 | return Ok(3); - | ^ expected tuple, found integral variable - | - = note: expected type `(i32, i32)` - found type `{integer}` - -error: aborting due to 7 previous errors - -Some errors occurred: E0308, E0907. -For more information about an error, try `rustc --explain E0308`. diff --git a/futures/testcrate/ui/forget-ok.rs b/futures/testcrate/ui/forget-ok.rs deleted file mode 100644 index eb86ce7f76..0000000000 --- a/futures/testcrate/ui/forget-ok.rs +++ /dev/null @@ -1,11 +0,0 @@ -#![feature(proc_macro, generators)] - -#[async] -fn foo() -> Result<(), ()> { -} - -#[async_stream(item = i32)] -fn foos() -> Result<(), ()> { -} - -fn main() {} diff --git a/futures/testcrate/ui/forget-ok.stderr b/futures/testcrate/ui/forget-ok.stderr deleted file mode 100644 index 3cb1ad5f34..0000000000 --- a/futures/testcrate/ui/forget-ok.stderr +++ /dev/null @@ -1,25 +0,0 @@ -error[E0308]: mismatched types - --> $DIR/forget-ok.rs:8:28 - | -8 | fn foo() -> Result<(), ()> { - | ____________________________^ -9 | | } - | |_^ expected enum `std::result::Result`, found () - | - = note: expected type `std::result::Result<(), ()>` - found type `()` - -error[E0308]: mismatched types - --> $DIR/forget-ok.rs:12:29 - | -12 | fn foos() -> Result<(), ()> { - | _____________________________^ -13 | | } - | |_^ expected enum `std::result::Result`, found () - | - = note: expected type `std::result::Result<(), ()>` - found type `()` - -error: aborting due to 2 previous errors - -For more information about this error, try `rustc --explain E0308`. diff --git a/futures/testcrate/ui/forget-semicolon.rs b/futures/testcrate/ui/forget-semicolon.rs new file mode 100644 index 0000000000..f671a941f5 --- /dev/null +++ b/futures/testcrate/ui/forget-semicolon.rs @@ -0,0 +1,11 @@ +#![feature(async_await, generators)] + +use futures::*; + +#[async_stream(item = ())] +fn foo() { + yield; + Some(()) +} + +fn main() {} diff --git a/futures/testcrate/ui/forget-semicolon.stderr b/futures/testcrate/ui/forget-semicolon.stderr new file mode 100644 index 0000000000..1061bdc7b9 --- /dev/null +++ b/futures/testcrate/ui/forget-semicolon.stderr @@ -0,0 +1,14 @@ +error[E0308]: mismatched types + --> $DIR/forget-semicolon.rs:8:5 + | +8 | Some(()) + | ^^^^^^^^- help: try adding a semicolon: `;` + | | + | expected (), found enum `std::option::Option` + | + = note: expected type `()` + found type `std::option::Option<()>` + +error: aborting due to previous error + +For more information about this error, try `rustc --explain E0308`. diff --git a/futures/testcrate/ui/missing-item.rs b/futures/testcrate/ui/missing-item.rs index 60972fbe43..426f619481 100644 --- a/futures/testcrate/ui/missing-item.rs +++ b/futures/testcrate/ui/missing-item.rs @@ -1,9 +1,8 @@ -#![allow(warnings)] -#![feature(proc_macro, generators)] +#![feature(async_await, generators)] + +use futures::*; #[async_stream] -fn foos(a: String) -> Result<(), u32> { - Ok(()) -} +fn foo(a: String) {} fn main() {} diff --git a/futures/testcrate/ui/missing-item.stderr b/futures/testcrate/ui/missing-item.stderr index 2a446ba00b..d0eb3bb434 100644 --- a/futures/testcrate/ui/missing-item.stderr +++ b/futures/testcrate/ui/missing-item.stderr @@ -1,10 +1,8 @@ -error: custom attribute panicked - --> $DIR/missing-item.rs:8:1 +error: unexpected end of input, expected `item` + --> $DIR/missing-item.rs:5:1 | -8 | #[async_stream] +5 | #[async_stream] | ^^^^^^^^^^^^^^^ - | - = help: message: #[async_stream] requires item type to be specified error: aborting due to previous error diff --git a/futures/testcrate/ui/move-captured-variable.rs b/futures/testcrate/ui/move-captured-variable.rs index cf14818633..a0c3106872 100644 --- a/futures/testcrate/ui/move-captured-variable.rs +++ b/futures/testcrate/ui/move-captured-variable.rs @@ -1,12 +1,14 @@ -#![feature(proc_macro, generators)] +#![feature(async_await, generators, proc_macro_hygiene)] + +use futures::*; fn foo(_f: F) {} fn main() { let a = String::new(); foo(|| { - async_block! { - Ok::(a) + async_stream_block! { + yield a }; }); } diff --git a/futures/testcrate/ui/move-captured-variable.stderr b/futures/testcrate/ui/move-captured-variable.stderr index 30828ab534..956dba9012 100644 --- a/futures/testcrate/ui/move-captured-variable.stderr +++ b/futures/testcrate/ui/move-captured-variable.stderr @@ -1,13 +1,17 @@ -error[E0507]: cannot move out of captured outer variable in an `FnMut` closure - --> $DIR/move-captured-variable.rs:12:9 +error[E0507]: cannot move out of `a`, a captured variable in an `FnMut` closure + --> $DIR/move-captured-variable.rs:10:9 | -10 | let a = String::new(); +8 | let a = String::new(); | - captured outer variable -11 | foo(|| { -12 | / async_block! { -13 | | Ok::(a) -14 | | }; - | |__________^ cannot move out of captured outer variable in an `FnMut` closure +9 | foo(|| { +10 | / async_stream_block! { +11 | | yield a + | | - + | | | + | | move occurs because `a` has type `std::string::String`, which does not implement the `Copy` trait + | | move occurs due to use in generator +12 | | }; + | |__________^ move out of `a` occurs here error: aborting due to previous error diff --git a/futures/testcrate/ui/nested.rs b/futures/testcrate/ui/nested.rs new file mode 100644 index 0000000000..e2fa2039a1 --- /dev/null +++ b/futures/testcrate/ui/nested.rs @@ -0,0 +1,15 @@ +#![feature(async_await, generators)] + +use futures::*; + +#[async_stream(item = i32)] +fn _stream1() { + let _ = async { + #[for_await] + for i in stream::iter(vec![1, 2]) { + yield i * i; + } + }; +} + +fn main() {} diff --git a/futures/testcrate/ui/nested.stderr b/futures/testcrate/ui/nested.stderr new file mode 100644 index 0000000000..444c03e5d1 --- /dev/null +++ b/futures/testcrate/ui/nested.stderr @@ -0,0 +1,8 @@ +error[E0727]: `async` generators are not yet supported + --> $DIR/nested.rs:10:13 + | +10 | yield i * i; + | ^^^^^^^^^^^ + +error: aborting due to previous error + diff --git a/futures/testcrate/ui/not-a-result.rs b/futures/testcrate/ui/not-a-result.rs deleted file mode 100644 index 8bbc52bf64..0000000000 --- a/futures/testcrate/ui/not-a-result.rs +++ /dev/null @@ -1,23 +0,0 @@ -#![feature(proc_macro, generators)] - -#[async] -fn foo() -> u32 { - 3 -} - -#[async(boxed)] -fn bar() -> u32 { - 3 -} - -#[async_stream(item = u32)] -fn foos() -> u32 { - 3 -} - -#[async_stream(boxed, item = u32)] -fn bars() -> u32 { - 3 -} - -fn main() {} diff --git a/futures/testcrate/ui/not-a-result.stderr b/futures/testcrate/ui/not-a-result.stderr deleted file mode 100644 index 7f95faff36..0000000000 --- a/futures/testcrate/ui/not-a-result.stderr +++ /dev/null @@ -1,61 +0,0 @@ -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:8:13 - | -8 | fn foo() -> u32 { - | ^^^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - = note: required by `futures::__rt::gen_future` - -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:13:17 - | -13 | fn bar() -> u32 { - | _________________^ -14 | | 3 -15 | | } - | |_^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:13:13 - | -13 | fn bar() -> u32 { - | ^^^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - = note: required by `futures::__rt::gen_future` - -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:18:14 - | -18 | fn foos() -> u32 { - | ^^^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - = note: required by `futures::__rt::gen_stream` - -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:23:18 - | -23 | fn bars() -> u32 { - | __________________^ -24 | | 3 -25 | | } - | |_^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - -error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied - --> $DIR/not-a-result.rs:23:14 - | -23 | fn bars() -> u32 { - | ^^^ async functions must return a `Result` or a typedef of `Result` - | - = help: the trait `futures::__rt::IsResult` is not implemented for `u32` - = note: required by `futures::__rt::gen_stream` - -error: aborting due to 6 previous errors - -For more information about this error, try `rustc --explain E0277`. diff --git a/futures/testcrate/ui/type_error.rs b/futures/testcrate/ui/type_error.rs index 419bc4e630..b075c2aa60 100644 --- a/futures/testcrate/ui/type_error.rs +++ b/futures/testcrate/ui/type_error.rs @@ -1,9 +1,11 @@ -#![feature(proc_macro, generators)] +#![feature(async_await, generators)] -#[async] -fn foo() -> Result { +use futures::*; + +#[async_stream(item = i32)] +fn foo() { let a: i32 = "a"; //~ ERROR: mismatched types - Ok(1) + yield 1; } fn main() {} diff --git a/futures/testcrate/ui/type_error.stderr b/futures/testcrate/ui/type_error.stderr index efb4b224f6..4cac5e9608 100644 --- a/futures/testcrate/ui/type_error.stderr +++ b/futures/testcrate/ui/type_error.stderr @@ -1,7 +1,7 @@ error[E0308]: mismatched types - --> $DIR/type_error.rs:9:18 + --> $DIR/type_error.rs:7:18 | -9 | let a: i32 = "a"; //~ ERROR: mismatched types +7 | let a: i32 = "a"; //~ ERROR: mismatched types | ^^^ expected i32, found reference | = note: expected type `i32` diff --git a/futures/testcrate/ui/unresolved-type.rs b/futures/testcrate/ui/unresolved-type.rs index 00317b1f73..b33fcad725 100644 --- a/futures/testcrate/ui/unresolved-type.rs +++ b/futures/testcrate/ui/unresolved-type.rs @@ -1,13 +1,8 @@ -#![feature(proc_macro, generators)] +#![feature(async_await, generators)] -#[async] -fn foo() -> Result { - Err(3) -} +use futures::*; #[async_stream(item = Left)] -fn foos() -> Result<(), u32> { - Err(3) -} +fn foo() {} fn main() {} diff --git a/futures/testcrate/ui/unresolved-type.stderr b/futures/testcrate/ui/unresolved-type.stderr index da37f5757f..af4917f3bf 100644 --- a/futures/testcrate/ui/unresolved-type.stderr +++ b/futures/testcrate/ui/unresolved-type.stderr @@ -1,34 +1,20 @@ error[E0412]: cannot find type `Left` in this scope - --> $DIR/unresolved-type.rs:8:20 + --> $DIR/unresolved-type.rs:5:23 | -8 | fn foo() -> Result { - | ^^^^ not found in this scope +5 | #[async_stream(item = Left)] + | ^^^^ not found in this scope +help: there is an enum variant `core::fmt::Alignment::Left` and 7 others; try using the variant's enum | - = help: there is an enum variant `futures::future::Either::Left`, try using `futures::future::Either`? - = help: there is an enum variant `std::fmt::rt::v1::Alignment::Left`, try using `std::fmt::rt::v1::Alignment`? +5 | #[async_stream(item = core::fmt::Alignment)] + | ^^^^^^^^^^^^^^^^^^^^ +5 | #[async_stream(item = core::fmt::rt::v1::Alignment)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +5 | #[async_stream(item = futures::core_reexport::fmt::Alignment)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +5 | #[async_stream(item = futures::core_reexport::fmt::rt::v1::Alignment)] + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +and 3 other candidates -error[E0412]: cannot find type `Left` in this scope - --> $DIR/unresolved-type.rs:12:23 - | -12 | #[async_stream(item = Left)] - | ^^^^ not found in this scope - | - = help: there is an enum variant `futures::future::Either::Left`, try using `futures::future::Either`? - = help: there is an enum variant `std::fmt::rt::v1::Alignment::Left`, try using `std::fmt::rt::v1::Alignment`? - -error[E0907]: type inside generator must be known in this context - --> $DIR/unresolved-type.rs:12:1 - | -12 | #[async_stream(item = Left)] - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - | -note: the type is part of the generator because of this `yield` - --> $DIR/unresolved-type.rs:12:1 - | -12 | #[async_stream(item = Left)] - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -error: aborting due to 3 previous errors +error: aborting due to previous error -Some errors occurred: E0412, E0907. -For more information about an error, try `rustc --explain E0412`. +For more information about this error, try `rustc --explain E0412`. diff --git a/futures/testcrate/ui/update-all-references.sh b/futures/testcrate/ui/update-all-references.sh index 7b757a4790..207a562c7e 100755 --- a/futures/testcrate/ui/update-all-references.sh +++ b/futures/testcrate/ui/update-all-references.sh @@ -21,3 +21,4 @@ MY_DIR=$(dirname $0) cd $MY_DIR find . -name '*.rs' | xargs ./update-references.sh +cd - diff --git a/futures/testcrate/ui/update-references.sh b/futures/testcrate/ui/update-references.sh index 13c35dc044..fa83607914 100755 --- a/futures/testcrate/ui/update-references.sh +++ b/futures/testcrate/ui/update-references.sh @@ -21,7 +21,7 @@ MYDIR=$(dirname $0) -BUILD_DIR="../../target/tests/ui" +BUILD_DIR="../target/tests/ui" while [[ "$1" != "" ]]; do STDERR_NAME="${1/%.rs/.stderr}" @@ -38,5 +38,3 @@ while [[ "$1" != "" ]]; do cp $BUILD_DIR/$STDERR_NAME $MYDIR/$STDERR_NAME fi done - - diff --git a/futures/tests/async_stream/elisions.rs b/futures/tests/async_stream/elisions.rs new file mode 100644 index 0000000000..855c9f4623 --- /dev/null +++ b/futures/tests/async_stream/elisions.rs @@ -0,0 +1,66 @@ +use futures::executor::block_on; +use futures::*; + +struct Ref<'a, T>(&'a T); + +#[async_stream(item = i32)] +fn references(x: &i32) { + yield *x; +} + +#[async_stream(item = i32)] +fn new_types(x: Ref<'_, i32>) { + yield *x.0; +} + +struct Foo(i32); + +impl Foo { + #[async_stream(item = &i32)] + fn foo(&self) { + yield &self.0 + } +} + +#[async_stream(item = &i32)] +fn single_ref(x: &i32) { + yield x +} + +#[async_stream(item = ())] +fn check_for_name_colision<'_async0, T>(_x: &T, _y: &'_async0 i32) { + yield +} + +#[test] +fn main() { + block_on(async { + let x = 0; + let foo = Foo(x); + + #[for_await] + for y in references(&x) { + assert_eq!(y, x); + } + + #[for_await] + for y in new_types(Ref(&x)) { + assert_eq!(y, x); + } + + #[for_await] + for y in single_ref(&x) { + assert_eq!(y, &x); + } + + #[for_await] + for y in foo.foo() { + assert_eq!(y, &x); + } + + #[for_await] + for y in check_for_name_colision(&x, &x) { + assert_eq!(y, ()); + } + }); +} diff --git a/futures/tests_disabled/async_await/mod.rs b/futures/tests/async_stream/mod.rs similarity index 75% rename from futures/tests_disabled/async_await/mod.rs rename to futures/tests/async_stream/mod.rs index e7c8866eab..2791fdc408 100644 --- a/futures/tests_disabled/async_await/mod.rs +++ b/futures/tests/async_stream/mod.rs @@ -1,3 +1,4 @@ mod elisions; +mod nested; mod pinned; mod smoke; diff --git a/futures/tests/async_stream/nested.rs b/futures/tests/async_stream/nested.rs new file mode 100644 index 0000000000..a1c304676d --- /dev/null +++ b/futures/tests/async_stream/nested.rs @@ -0,0 +1,13 @@ + +use futures::*; + +#[async_stream(item = i32)] // impl Generator, Return = ()> +fn _stream1() { + let _ = async { // impl Generator + #[for_await] + for i in stream::iter(vec![1, 2]) { + future::lazy(|_| i * i).await; + } + }; + future::lazy(|_| ()).await; +} diff --git a/futures/tests/async_stream/pinned.rs b/futures/tests/async_stream/pinned.rs new file mode 100644 index 0000000000..58548bc7a0 --- /dev/null +++ b/futures/tests/async_stream/pinned.rs @@ -0,0 +1,45 @@ +use futures::executor::block_on; +use futures::*; + +#[async_stream(item = u64)] +fn stream1() { + fn integer() -> u64 { + 1 + } + let x = &integer(); + yield 0; + yield *x; +} + +fn stream1_block() -> impl Stream { + async_stream_block! { + #[for_await] + for item in stream1() { + yield item + } + } +} + +async fn uses_async_for() -> Vec { + let mut v = vec![]; + #[for_await] + for i in stream1() { + v.push(i); + } + v +} + +async fn uses_async_for_block() -> Vec { + let mut v = vec![]; + #[for_await] + for i in stream1_block() { + v.push(i); + } + v +} + +#[test] +fn main() { + assert_eq!(block_on(uses_async_for()), vec![0, 1]); + assert_eq!(block_on(uses_async_for_block()), vec![0, 1]); +} diff --git a/futures/tests/async_stream/smoke.rs b/futures/tests/async_stream/smoke.rs new file mode 100644 index 0000000000..e55aadcfab --- /dev/null +++ b/futures/tests/async_stream/smoke.rs @@ -0,0 +1,133 @@ +//! A bunch of ways to use async/await syntax. +//! +//! This is mostly a test f r this repository itself, not necessarily serving +//! much more purpose than that. + +use futures::executor::block_on; +use futures::*; + +async fn future1() -> i32 { + let mut cnt = 0; + #[for_await] + for x in stream::iter(vec![1, 2, 3, 4]) { + cnt += x; + } + cnt +} + +#[async_stream(item = u64)] +fn stream1() { + yield 0; + yield 1; +} + +#[async_stream(item = T)] +fn stream2(t: T) { + yield t.clone(); + yield t.clone(); +} + +#[async_stream(item = i32)] +fn stream3() { + let mut cnt = 0; + #[for_await] + for x in stream::iter(vec![1, 2, 3, 4]) { + cnt += x; + yield x; + } + yield cnt; +} + +mod foo { + pub struct _Foo(pub i32); +} + +#[async_stream(item = foo::_Foo)] +fn _stream5() { + yield foo::_Foo(0); + yield foo::_Foo(1); +} + +#[async_stream(item = i32)] +fn _stream6() { + #[for_await] + for foo::_Foo(i) in _stream5() { + yield i * i; + } +} + +#[async_stream(item = ())] +fn _stream7() { + yield (); +} + +#[async_stream(item = [u32; 4])] +fn _stream8() { + yield [1, 2, 3, 4]; +} + +struct A(i32); + +impl A { + #[async_stream(item = i32)] + fn a_foo(self) { + yield self.0 + } + + #[async_stream(item = i32)] + fn _a_foo2(self: Box) { + yield self.0 + } +} + +async fn loop_in_loop() -> bool { + let mut cnt = 0; + let vec = vec![1, 2, 3, 4]; + #[for_await] + for x in stream::iter(vec.clone()) { + #[for_await] + for y in stream::iter(vec.clone()) { + cnt += x * y; + } + } + + let sum = (1..5).map(|x| (1..5).map(|y| x * y).sum::()).sum::(); + cnt == sum +} + +#[test] +fn main() { + // https://github.com/alexcrichton/futures-await/issues/45 + #[async_stream(item = ())] + fn _stream10() { + yield; + } + + block_on(async { + let mut v = 0..=1; + #[for_await] + for x in stream1() { + assert_eq!(x, v.next().unwrap()); + } + + let mut v = [(), ()].iter(); + #[for_await] + for x in stream2(()) { + assert_eq!(x, *v.next().unwrap()); + } + + let mut v = [1, 2, 3, 4, 10].iter(); + #[for_await] + for x in stream3() { + assert_eq!(x, *v.next().unwrap()); + } + + #[for_await] + for x in A(11).a_foo() { + assert_eq!(x, 11); + } + }); + + assert_eq!(block_on(future1()), 10); + assert_eq!(block_on(loop_in_loop()), true); +} diff --git a/futures/tests/async_stream_tests.rs b/futures/tests/async_stream_tests.rs new file mode 100644 index 0000000000..9d68393268 --- /dev/null +++ b/futures/tests/async_stream_tests.rs @@ -0,0 +1,5 @@ +#![feature(async_await)] +#![cfg_attr(all(feature = "async-stream", feature = "nightly"), feature(generators, stmt_expr_attributes, proc_macro_hygiene))] + +#[cfg(all(feature = "async-stream", feature = "nightly"))] +mod async_stream; diff --git a/futures/tests_disabled/async_await/elisions.rs b/futures/tests_disabled/async_await/elisions.rs deleted file mode 100644 index 15c3a88ca0..0000000000 --- a/futures/tests_disabled/async_await/elisions.rs +++ /dev/null @@ -1,49 +0,0 @@ -use futures::stable::block_on_stable; - -struct Ref<'a, T: 'a>(&'a T); - -#[async] -fn references(x: &i32) -> Result { - Ok(*x) -} - -#[async] -fn new_types(x: Ref<'_, i32>) -> Result { - Ok(*x.0) -} - -#[async_stream(item = i32)] -fn _streams(x: &i32) -> Result<(), i32> { - stream_yield!(*x); - Ok(()) -} - -struct Foo(i32); - -impl Foo { - #[async] - fn foo(&self) -> Result<&i32, i32> { - Ok(&self.0) - } -} - -#[async] -fn single_ref(x: &i32) -> Result<&i32, i32> { - Ok(x) -} - -#[async] -fn check_for_name_colision<'_async0, T>(_x: &T, _y: &'_async0 i32) -> Result<(), ()> { - Ok(()) -} - -#[test] -fn main() { - let x = 0; - let foo = Foo(x); - assert_eq!(block_on_stable(references(&x)), Ok(x)); - assert_eq!(block_on_stable(new_types(Ref(&x))), Ok(x)); - assert_eq!(block_on_stable(single_ref(&x)), Ok(&x)); - assert_eq!(block_on_stable(foo.foo()), Ok(&x)); - assert_eq!(block_on_stable(check_for_name_colision(&x, &x)), Ok(())); -} diff --git a/futures/tests_disabled/async_await/pinned.rs b/futures/tests_disabled/async_await/pinned.rs deleted file mode 100644 index 39357c4274..0000000000 --- a/futures/tests_disabled/async_await/pinned.rs +++ /dev/null @@ -1,116 +0,0 @@ -use futures::stable::block_on_stable; -use futures::executor::{block_on, ThreadPool}; - -#[async] -fn foo() -> Result { - Ok(1) -} - -#[async] -fn bar(x: &i32) -> Result { - Ok(*x) -} - -#[async] -fn baz(x: i32) -> Result { - bar(&x).await -} - -#[async(boxed)] -fn boxed(x: i32) -> Result { - Ok(x) -} - -#[async(boxed)] -fn boxed_borrow(x: &i32) -> Result { - Ok(*x) -} - -#[async(boxed, send)] -fn boxed_send(x: i32) -> Result { - Ok(x) -} - -#[async(boxed, send)] -fn boxed_send_borrow(x: &i32) -> Result { - Ok(*x) -} - -#[async(boxed, send)] -fn spawnable() -> Result<(), Never> { - Ok(()) -} - -fn baz_block(x: i32) -> impl StableFuture { - async_block! { - bar(&x).await - } -} - -#[async_stream(item = u64)] -fn stream1() -> Result<(), i32> { - fn integer() -> u64 { 1 } - let x = &integer(); - stream_yield!(0); - stream_yield!(*x); - Ok(()) -} - -fn stream1_block() -> impl StableStream { - async_stream_block! { - #[async] - for item in stream1() { - stream_yield!(item) - } - Ok(()) - } -} - -#[async_stream(boxed, item = u64)] -fn _stream_boxed() -> Result<(), i32> { - fn integer() -> u64 { 1 } - let x = &integer(); - stream_yield!(0); - stream_yield!(*x); - Ok(()) -} - -#[async] -pub fn uses_async_for() -> Result, i32> { - let mut v = vec![]; - #[async] - for i in stream1() { - v.push(i); - } - Ok(v) -} - -#[async] -pub fn uses_async_for_block() -> Result, i32> { - let mut v = vec![]; - #[async] - for i in stream1_block() { - v.push(i); - } - Ok(v) -} - -#[test] -fn main() { - assert_eq!(block_on_stable(foo()), Ok(1)); - assert_eq!(block_on_stable(bar(&1)), Ok(1)); - assert_eq!(block_on_stable(baz(17)), Ok(17)); - assert_eq!(block_on(boxed(17)), Ok(17)); - assert_eq!(block_on(boxed_send(17)), Ok(17)); - assert_eq!(block_on(boxed_borrow(&17)), Ok(17)); - assert_eq!(block_on(boxed_send_borrow(&17)), Ok(17)); - assert_eq!(block_on_stable(baz_block(18)), Ok(18)); - assert_eq!(block_on_stable(uses_async_for()), Ok(vec![0, 1])); - assert_eq!(block_on_stable(uses_async_for_block()), Ok(vec![0, 1])); -} - -#[test] -fn run_pinned_future_in_thread_pool() { - let mut pool = ThreadPool::new().unwrap(); - pool.spawn_pinned(spawnable()).unwrap(); -} diff --git a/futures/tests_disabled/async_await/smoke.rs b/futures/tests_disabled/async_await/smoke.rs deleted file mode 100644 index 86566eb7f6..0000000000 --- a/futures/tests_disabled/async_await/smoke.rs +++ /dev/null @@ -1,223 +0,0 @@ -//! A bunch of ways to use async/await syntax. -//! -//! This is mostly a test f r this repository itself, not necessarily serving -//! much more purpose than that. - -use futures; -use futures::executor; -use futures::stable::block_on_stable; - -use std::io; - -use futures::Never; -use futures::future::poll_fn; - -#[async] -fn foo() -> Result { - Ok(1) -} - -#[async] -extern fn _foo1() -> Result { - Ok(1) -} - -#[async] -unsafe fn _foo2() -> io::Result { - Ok(1) -} - -#[async] -unsafe extern fn _foo3() -> io::Result { - Ok(1) -} - -#[async] -pub fn _foo4() -> io::Result { - Ok(1) -} - -#[async] -fn _foo5(t: T) -> Result { - Ok(t.clone()) -} - -#[async] -fn _foo6(ref a: i32) -> Result { - Err(*a) -} - -#[async] -fn _foo7(t: T) -> Result - where T: Clone + 'static, -{ - Ok(t.clone()) -} - -#[async(boxed)] -fn _foo8(a: i32, b: i32) -> Result { - return Ok(a + b) -} - -#[async(boxed, send)] -fn _foo9() -> Result<(), Never> { - Ok(()) -} - -#[async] -fn _bar() -> Result { - foo().await -} - -#[async] -fn _bar2() -> Result { - let a = foo().await?; - let b = foo().await?; - Ok(a + b) -} - -#[async] -fn _bar4() -> Result { - let mut cnt = 0; - #[async] - for x in futures::stream::iter_ok::<_, i32>(vec![1, 2, 3, 4]) { - cnt += x; - } - Ok(cnt) -} - -#[async_stream(item = u64)] -fn _stream1() -> Result<(), i32> { - stream_yield!(0); - stream_yield!(1); - Ok(()) -} - -#[async_stream(item = T)] -fn _stream2(t: T) -> Result<(), i32> { - stream_yield!(t.clone()); - stream_yield!(t.clone()); - Ok(()) -} - -#[async_stream(item = i32)] -fn _stream3() -> Result<(), i32> { - let mut cnt = 0; - #[async] - for x in futures::stream::iter_ok::<_, i32>(vec![1, 2, 3, 4]) { - cnt += x; - stream_yield!(x); - } - Err(cnt) -} - -#[async_stream(boxed, item = u64)] -fn _stream4() -> Result<(), i32> { - stream_yield!(0); - stream_yield!(1); - Ok(()) -} - -#[allow(dead_code)] -mod foo { pub struct Foo(pub i32); } - -#[allow(dead_code)] -#[async_stream(boxed, item = foo::Foo)] -pub fn stream5() -> Result<(), i32> { - stream_yield!(foo::Foo(0)); - stream_yield!(foo::Foo(1)); - Ok(()) -} - -#[async_stream(boxed, item = i32)] -pub fn _stream6() -> Result<(), i32> { - #[async] - for foo::Foo(i) in stream5() { - stream_yield!(i * i); - } - Ok(()) -} - -#[async_stream(item = ())] -pub fn _stream7() -> Result<(), i32> { - stream_yield!(()); - Ok(()) -} - -#[async_stream(item = [u32; 4])] -pub fn _stream8() -> Result<(), i32> { - stream_yield!([1, 2, 3, 4]); - Ok(()) -} - -struct A(i32); - -impl A { - #[async] - fn a_foo(self) -> Result { - Ok(self.0) - } - - #[async] - fn _a_foo2(self: Box) -> Result { - Ok(self.0) - } -} - -fn await_item_stream() -> impl Stream + Send { - ::futures::stream::iter_ok(vec![0, 1]) -} - -#[async] -fn test_await_item() -> Result<(), Never> { - let mut stream = await_item_stream(); - - assert_eq!(await_item!(stream), Ok(Some(0))); - assert_eq!(await_item!(stream), Ok(Some(1))); - assert_eq!(await_item!(stream), Ok(None)); - - Ok(()) -} - -#[test] -fn main() { - assert_eq!(block_on_stable(foo()), Ok(1)); - assert_eq!(block_on_stable(foo()), Ok(1)); - assert_eq!(block_on_stable(_bar()), Ok(1)); - assert_eq!(block_on_stable(_bar2()), Ok(2)); - assert_eq!(block_on_stable(_bar4()), Ok(10)); - assert_eq!(block_on_stable(_foo6(8)), Err(8)); - assert_eq!(block_on_stable(A(11).a_foo()), Ok(11)); - assert_eq!(block_on_stable(loop_in_loop()), Ok(true)); - assert_eq!(block_on_stable(test_await_item()), Ok(())); -} - -#[async] -fn loop_in_loop() -> Result { - let mut cnt = 0; - let vec = vec![1, 2, 3, 4]; - #[async] - for x in futures::stream::iter_ok::<_, i32>(vec.clone()) { - #[async] - for y in futures::stream::iter_ok::<_, i32>(vec.clone()) { - cnt += x * y; - } - } - - let sum = (1..5).map(|x| (1..5).map(|y| x * y).sum::()).sum::(); - Ok(cnt == sum) -} - -#[async_stream(item = i32)] -fn poll_stream_after_error_stream() -> Result<(), ()> { - stream_yield!(5); - Err(()) -} - -#[test] -fn poll_stream_after_error() { - let mut s = poll_stream_after_error_stream().pin(); - assert_eq!(executor::block_on(poll_fn(|ctx| s.poll_next(ctx))), Ok(Some(5))); - assert_eq!(executor::block_on(poll_fn(|ctx| s.poll_next(ctx))), Err(())); - assert_eq!(executor::block_on(poll_fn(|ctx| s.poll_next(ctx))), Ok(None)); -} diff --git a/futures/tests_disabled/async_await_tests.rs b/futures/tests_disabled/async_await_tests.rs deleted file mode 100644 index b3d59f5849..0000000000 --- a/futures/tests_disabled/async_await_tests.rs +++ /dev/null @@ -1,4 +0,0 @@ -#![cfg_attr(feature = "nightly", feature(proc_macro, generators))] - -#[cfg(feature = "nightly")] -mod async_await;