Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Adds asynchronous enumeration #4855

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
66 changes: 64 additions & 2 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace Microsoft.Azure.Cosmos.Linq
/// This class provides extension methods for cosmos LINQ code.
/// </summary>
public static class CosmosLinqExtensions
{
{
/// <summary>
/// Returns the integer identifier corresponding to a specific item within a physical partition.
/// This method is to be used in LINQ expressions only and will be evaluated on server.
Expand All @@ -37,7 +37,7 @@ public static class CosmosLinqExtensions
public static int DocumentId(this object obj)
{
throw new NotImplementedException(ClientResources.TypeCheckExtensionFunctionsNotImplemented);
}
}

/// <summary>
/// Returns a Boolean value indicating if the type of the specified expression is an array.
Expand Down Expand Up @@ -303,6 +303,68 @@ public static QueryDefinition ToQueryDefinition<T>(this IQueryable<T> query)
throw new ArgumentException("ToQueryDefinition is only supported on Cosmos LINQ query operations", nameof(query));
}

/// <summary>
/// This extension method returns the current LINQ-derived NoSQL query for items within an Azure Cosmos DB for NoSQL container as an <see cref="IAsyncEnumerable{T}"/>.
///
/// The <see cref="IAsyncEnumerable{T}"/> enables iteration over pages of results with sets of items.
/// </summary>
/// <typeparam name="T">
/// The generic type to deserialize items to.
/// </typeparam>
/// <param name="query">
/// The <see cref="IQueryable{T}"/> to be converted to an asynchornous enumerable.
/// </param>
/// <seealso cref="IAsyncEnumerable{T}"/>
/// <seealso cref="FeedResponse{T}"/>
/// <returns>
/// This method returns an asynchronous enumerable that enables iteration over pages of results and sets of items for each page.
/// </returns>
/// <example>
/// This example shows how to:
///
/// 1. Create a <seealso cref="IQueryable{T}"/> from an existing <seealso cref="Container"/>.
/// 1. Use this extension method to convert the iterator into an <seealso cref="IAsyncEnumerable{T}"/>.
///
/// <code language="c#">
/// <![CDATA[
/// IQueryable<Item> query = container.GetItemLinqQueryable<Item>()
/// .Where(item => item.partitionKey == "example-partition-key")
/// .OrderBy(item => item.value);
///
/// IAsyncEnumerable<FeedResponse<Item>> pages = query.AsAsyncEnumerable();
///
/// List<Item> items = new();
/// double requestCharge = 0.0;
/// await foreach(var page in pages)
/// {
/// requestCharge += page.RequestCharge;
/// foreach (var item in page)
/// {
/// items.Add(item);
/// }
/// }
/// </code>
///
/// <code language="c#">
/// <![CDATA[
/// record Item(
/// string id,
/// string partitionKey,
/// string value
/// );
/// ]]>
/// </code>
/// </example>
public static IAsyncEnumerable<FeedResponse<T>> AsAsyncEnumerable<T>(this IQueryable<T> query)
{
if (query is CosmosLinqQuery<T> asyncEnumerable)
{
return asyncEnumerable;
}

throw new NotSupportedException("AsAsyncEnumerable is only supported for IQueryable<T> instances created by the Azure Cosmos DB LINQ provider.");
}

/// <summary>
/// This extension method gets the FeedIterator from LINQ IQueryable to execute query asynchronously.
/// This will create the fresh new FeedIterator when called.
Expand Down
28 changes: 26 additions & 2 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Cosmos.Linq
/// This is the entry point for LINQ query creation/execution, it generate query provider, implements IOrderedQueryable.
/// </summary>
/// <seealso cref="CosmosLinqQueryProvider"/>
internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<T>
internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<T>, IAsyncEnumerable<FeedResponse<T>>
{
private readonly CosmosLinqQueryProvider queryProvider;
private readonly Guid correlatedActivityId;
Expand Down Expand Up @@ -109,7 +109,7 @@ public IEnumerator<T> GetEnumerator()
" use GetItemQueryIterator to execute asynchronously");
}

FeedIterator<T> localFeedIterator = this.CreateFeedIterator(false, out ScalarOperationKind scalarOperationKind);
using FeedIterator<T> localFeedIterator = this.CreateFeedIterator(false, out ScalarOperationKind scalarOperationKind);
Debug.Assert(
scalarOperationKind == ScalarOperationKind.None,
"CosmosLinqQuery Assert!",
Expand All @@ -128,6 +128,30 @@ public IEnumerator<T> GetEnumerator()
}
}

/// <summary>
/// This method retrieves an <see cref="IAsyncEnumerator{T}"/> that enables iteration, asynchronously, over pages of results with sets of items.
/// </summary>
/// <param name="cancellationToken">
/// (optional) The cancellation token that could be used to cancel the operation.
/// </param>
/// <returns>
/// This method returns an asynchronous enumerator that enables iteration over pages of results and sets of items for each page.
/// </returns>
/// <remarks>
/// > [!IMPORTANT]
/// > This method triggers an asynchronous multi-page load.
/// </remarks>
public IAsyncEnumerator<FeedResponse<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
using FeedIteratorInlineCore<T> localFeedIterator = this.CreateFeedIterator(isContinuationExpected: false, out ScalarOperationKind scalarOperationKind);
Debug.Assert(
scalarOperationKind == ScalarOperationKind.None,
"CosmosLinqQuery Assert!",
$"Unexpected client operation. Expected 'None', Received '{scalarOperationKind}'");

return localFeedIterator.BuildFeedAsyncEnumerator<T>(cancellationToken);
}

/// <summary>
/// Synchronous Multi-Page load
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;

/// <summary>
/// This class encapsulates the asynchronous enumerator for an instance of <see cref="FeedIterator{T}"/>.
/// </summary>
/// <typeparam name="T">The generic type to deserialize items to.</typeparam>
public sealed class FeedAsyncEnumerator<T> : IAsyncEnumerable<FeedResponse<T>>, IDisposable
{
private readonly FeedIterator<T> feedIterator;

/// <summary>
/// This constructor creates an instance of <see cref="FeedAsyncEnumerator{T}"/>.
/// </summary>
/// <param name="feedIterator">
/// The target feed iterator instance.
/// </param>
public FeedAsyncEnumerator(FeedIterator<T> feedIterator)
{
this.feedIterator = feedIterator;
}

/// <summary>
/// This method is used to get the asynchronous enumerator for an instance of <see cref="FeedIterator{T}"/>.
/// </summary>
/// <param name="cancellationToken">
/// (optional) The cancellation token that could be used to cancel the operation.
/// </param>
/// <returns>
/// The asynchronous enumerator for an instance of <see cref="FeedIterator{T}"/>.
/// </returns>
public IAsyncEnumerator<FeedResponse<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return this.feedIterator.BuildFeedAsyncEnumerator<T>(cancellationToken);
}

/// <summary>
/// Releases the unmanaged resources used by the <see cref="FeedIterator{T}"/> and optionally releases the managed resources.
/// </summary>
public void Dispose()
{
this.feedIterator.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;

/// <summary>
/// This class provides extension methods for Azure Cosmos DB for NoSQL's asynchronous feed iterator.
/// </summary>
public static class FeedAsyncEnumeratorExtensions
{
/// <summary>
/// This extension method returns the current feed iterator for items within an Azure Cosmos DB for NoSQL container as an <see cref="IAsyncEnumerable{T}"/>.
///
/// The <see cref="IAsyncEnumerable{T}"/> enables iteration over pages of results with sets of items.
/// </summary>
/// <typeparam name="T">
/// The generic type to deserialize items to.
/// </typeparam>
/// <param name="feedIterator">
/// The feed iterator to be converted to an asynchronous enumerable.
/// </param>
/// <seealso cref="IAsyncEnumerable{T}"/>
/// <seealso cref="FeedResponse{T}"/>
/// <returns>
/// This method returns an asynchronous enumerable that enables iteration over pages of results and sets of items for each page.
/// </returns>
/// <example>
/// This example shows how to:
///
/// 1. Create a <seealso cref="FeedIterator{T}"/> from an existing <seealso cref="Container"/>.
/// 1. Use this extension method to convert the iterator into an <seealso cref="IAsyncEnumerable{T}"/>.
///
/// <code language="c#">
/// <![CDATA[
/// FeedIterator<Item> feedIterator = container.GetItemQueryIterator<Item>(
/// queryText: "SELECT * FROM items"
/// );
///
/// IAsyncEnumerable<FeedResponse<Item>> pages = query.AsAsyncEnumerable();
///
/// List<Item> items = new();
/// double requestCharge = 0.0;
/// await foreach(var page in pages)
/// {
/// requestCharge += page.RequestCharge;
/// foreach (var item in page)
/// {
/// items.Add(item);
/// }
/// }
/// </code>
///
/// <code language="c#">
/// <![CDATA[
/// record Item(
/// string id,
/// string partitionKey,
/// string value
/// );
/// ]]>
/// </code>
/// </example>
public static IAsyncEnumerable<FeedResponse<T>> AsAsyncEnumerable<T>(
this FeedIterator<T> feedIterator)
{
return new FeedAsyncEnumerator<T>(feedIterator);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.Threading;

/// <summary>
/// This class provides extension methods for Azure Cosmos DB for NoSQL's feed iterator.
/// </summary>
public static class FeedIteratorExtensions
{
/// <summary>
/// This extension method takes an existing <see cref="FeedIterator{T}"/> and builds an <see cref="IAsyncEnumerator{T}"/> that enables iteration over pages of results with sets of items.
/// </summary>
/// <typeparam name="T">The generic type to deserialize items to.</typeparam>
/// <param name="feedIterator">
/// The target feed iterator instance.
/// </param>
/// <param name="cancellationToken">
/// (optional) The cancellation token that could be used to cancel the operation.
/// </param>
/// <returns>
/// This extension method returns an asynchronous enumerator that enables iteration over pages of results and sets of items for each page.
/// </returns>
/// <seealso cref="FeedIterator{T}"/>
/// <seealso cref="IAsyncEnumerable{T}"/>
/// <seealso href="https://learn.microsoft.com/dotnet/csharp/iterators#iterating-with-foreach"/>
/// <example>
/// This example shows how to:
///
/// 1. Create a <seealso cref="FeedIterator{T}"/> from an existing <seealso cref="Container"/>.
/// 1. Use this extension method to convert the iterator into an <seealso cref="IAsyncEnumerator{T}"/>.
///
/// Finally, use the `await foreach` statement to iterate over the pages and items.
/// <code language="c#">
/// <![CDATA[
/// FeedIterator<Item> feedIterator = container.GetItemQueryIterator<Item>(
/// queryText: "SELECT * FROM items"
/// );
///
/// IAsyncEnumerator<FeedResponse<Item>> pages = feedIterator.BuildFeedAsyncEnumerator<Item>();
///
/// List<Items> items = new();
/// double requestCharge = 0.0;
/// await foreach(var page in pages)
/// {
/// requestCharge += page.RequestCharge;
/// foreach (var item in page)
/// {
/// items.Add(item);
/// }
/// }
/// ]]>
/// </code>
///
/// <code language="c#">
/// <![CDATA[
/// record Item(
/// string id,
/// string partitionKey,
/// string value
/// );
/// ]]>
/// </code>
/// </example>
public static async IAsyncEnumerator<FeedResponse<T>> BuildFeedAsyncEnumerator<T>(
this FeedIterator<T> feedIterator,
CancellationToken cancellationToken = default)
{
while (feedIterator.HasMoreResults)
{
FeedResponse<T> response = await feedIterator.ReadNextAsync(cancellationToken);

yield return response;
}
}
}
}
Loading
Loading