Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/support continuation token limit property #437

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Microsoft.Azure.CosmosRepository/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@
global using Microsoft.Extensions.Options;
global using Newtonsoft.Json;
global using Newtonsoft.Json.Linq;
global using Newtonsoft.Json.Serialization;
global using Newtonsoft.Json.Serialization;
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,47 @@


// ReSharper disable once CheckNamespace

namespace Microsoft.Azure.CosmosRepository;

internal sealed partial class DefaultRepository<TItem>
{
/// <inheritdoc/>
public async ValueTask<IPage<TItem>> PageAsync(
public ValueTask<IPage<TItem>> PageAsync(
Expression<Func<TItem, bool>>? predicate = null,
int pageSize = 25,
string? continuationToken = null,
bool returnTotal = false,
CancellationToken cancellationToken = default)
{
CancellationToken cancellationToken = default) =>
PageAsync(
new QueryRequestOptions { MaxItemCount = pageSize },
predicate,
pageSize,
continuationToken,
returnTotal,
cancellationToken);

/// <inheritdoc/>
public async ValueTask<IPage<TItem>> PageAsync(
QueryRequestOptions requestOptions,
Expression<Func<TItem, bool>>? predicate = null,
int pageSize = 25,
string? continuationToken = null,
bool returnTotal = false,
CancellationToken cancellationToken = default)
{
Container container = await containerProvider.GetContainerAsync()
.ConfigureAwait(false);

QueryRequestOptions options = new()
{
MaxItemCount = pageSize
};
.ConfigureAwait(false);

// make sure that if the user hasn't said the value already we take it from the pageSize parameter
if (requestOptions.MaxItemCount is null)
Copy link
Contributor

@mateuszkumpf mateuszkumpf May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must expect that null could be passed. Initialize requestOptions if the value is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or throw exception

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the requestOptions isn't nullable, are you implying that we should protect against it anyway?

Copy link
Contributor

@mateuszkumpf mateuszkumpf May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, I don't trust users 😂 But if for you it's ok then for me too 😄

{
requestOptions.MaxItemCount = pageSize;
}

IQueryable<TItem> query = container
.GetItemLinqQueryable<TItem>(
requestOptions: options,
requestOptions: requestOptions,
continuationToken: continuationToken,
linqSerializerOptions: optionsMonitor.CurrentValue.SerializationOptions)
.Where(repositoryExpressionProvider.Build(
Expand All @@ -51,23 +69,40 @@ public async ValueTask<IPage<TItem>> PageAsync(
pageSize,
items.AsReadOnly(),
charge + countResponse?.RequestCharge ?? 0,
resultingContinuationToken);
resultingContinuationToken);
}

/// <inheritdoc/>
public async ValueTask<IPageQueryResult<TItem>> PageAsync(
public ValueTask<IPageQueryResult<TItem>> PageAsync(
Expression<Func<TItem, bool>>? predicate = null,
int pageNumber = 1,
int pageSize = 25,
bool returnTotal = false,
CancellationToken cancellationToken = default)
{
CancellationToken cancellationToken = default) =>
PageAsync(
requestOptions: new QueryRequestOptions { MaxItemCount = pageSize },
predicate: predicate,
pageNumber: pageNumber,
pageSize: pageSize,
returnTotal: returnTotal,
cancellationToken: cancellationToken);

/// <inheritdoc/>
public async ValueTask<IPageQueryResult<TItem>> PageAsync(
QueryRequestOptions requestOptions,
Expression<Func<TItem, bool>>? predicate = null,
int pageNumber = 1,
int pageSize = 25,
bool returnTotal = false,
CancellationToken cancellationToken = default)
{
Container container = await containerProvider.GetContainerAsync()
.ConfigureAwait(false);

IQueryable<TItem> query = container
.GetItemLinqQueryable<TItem>(
linqSerializerOptions: optionsMonitor.CurrentValue.SerializationOptions)
linqSerializerOptions: optionsMonitor.CurrentValue.SerializationOptions,
requestOptions: requestOptions)
.Where(repositoryExpressionProvider
.Build(predicate ?? repositoryExpressionProvider.Default<TItem>()));

Expand All @@ -94,6 +129,6 @@ public async ValueTask<IPageQueryResult<TItem>> PageAsync(
pageSize,
items.AsReadOnly(),
charge + countResponse?.RequestCharge ?? 0,
resultingContinuationToken /* This was missing, is this correct? */);
resultingContinuationToken /* This was missing, is this correct? */);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,30 @@ namespace Microsoft.Azure.CosmosRepository;
internal sealed partial class DefaultRepository<TItem>
{
/// <inheritdoc/>
public async ValueTask<TResult> QueryAsync<TResult>(
public ValueTask<TResult> QueryAsync<TResult>(
ISpecification<TItem, TResult> specification,
CancellationToken cancellationToken = default)
where TResult : IQueryResult<TItem>
{
where TResult : IQueryResult<TItem> =>
QueryAsync(specification, new QueryRequestOptions(), cancellationToken);

/// <inheritdoc/>
public async ValueTask<TResult> QueryAsync<TResult>(
ISpecification<TItem, TResult> specification,
QueryRequestOptions requestOptions,
CancellationToken cancellationToken = default) where TResult : IQueryResult<TItem>
{
Container container = await containerProvider.GetContainerAsync()
.ConfigureAwait(false);

QueryRequestOptions options = new();

if (specification.UseContinuationToken)
{
options.MaxItemCount = specification.PageSize;
requestOptions.MaxItemCount = specification.PageSize;
}

IQueryable<TItem> query = container
.GetItemLinqQueryable<TItem>(
requestOptions: options,
continuationToken: specification.ContinuationToken,
requestOptions: requestOptions,
continuationToken: specification.ContinuationToken,
linqSerializerOptions: optionsMonitor.CurrentValue.SerializationOptions)
.Where(repositoryExpressionProvider.Default<TItem>());

Expand All @@ -45,6 +50,6 @@ await GetAllItemsAsync(query, specification.PageSize, cancellationToken)

return specification.PostProcessingAction(
items.AsReadOnly(), count.Resource, charge + count.RequestCharge,
continuationToken);
continuationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT License.

// ReSharper disable once CheckNamespace


namespace Microsoft.Azure.CosmosRepository;

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ ValueTask<IPage<TItem>> PageAsync(
bool returnTotal = false,
CancellationToken cancellationToken = default);

/// <summary>
/// Offers a load more paging implementation for infinite scroll scenarios.
/// Allows for efficient paging making use of cosmos DBs continuation tokens, making this implementation cost effective.
/// </summary>
/// <param name="requestOptions">An <see cref="QueryRequestOptions"/> to specify query options for Cosmos SDK</param>
/// <param name="predicate">A filter criteria for the paging operation, if null it will get all <see cref="IItem"/>s</param>
/// <param name="pageSize">The size of the page to return from cosmos db.</param>
/// <param name="continuationToken">The token returned from a previous query, if null starts at the beginning of the data</param>
/// <param name="returnTotal">Specifies whether or not to return the total number of items that matched the query. This defaults to false as it can be a very expensive operation.</param>
/// <param name="cancellationToken">The cancellation token to use when making asynchronous operations.</param>
/// <returns>An <see cref="IPage{T}"/> of <see cref="IItem"/>s</returns>
/// <remarks>This method makes use of cosmos dbs continuation tokens for efficient, cost effective paging utilising low RUs</remarks>
ValueTask<IPage<TItem>> PageAsync(
QueryRequestOptions requestOptions,
Expression<Func<TItem, bool>>? predicate = null,
int pageSize = 25,
string? continuationToken = null,
bool returnTotal = false,
CancellationToken cancellationToken = default);

/// <summary>
/// Get items based on a specification.
/// The specification is used to define which filters are used, the order of the search results and how they are paged.
Expand All @@ -202,10 +222,49 @@ ValueTask<TResult> QueryAsync<TResult>(
CancellationToken cancellationToken = default)
where TResult : IQueryResult<TItem>;

/// <summary>
/// Get items based on a specification.
/// The specification is used to define which filters are used, the order of the search results and how they are paged.
/// Depending on how results are paged derive specification implementations from different classes:
/// For non paged results derive <see cref="DefaultSpecification{TItem}"/>
/// For continuation token derive <see cref="ContinuationTokenSpecification{T}"/>
/// For page number results derive <see cref="OffsetByPageNumberSpecification{T}"/>
/// </summary>
/// <typeparam name="TResult">Decides which paging information is retrieved. Use <see cref="ContinuationTokenSpecification{T}"/></typeparam>
/// <param name="specification">A specification used to filtering, ordering and paging. A <see cref="ISpecification{T, TResult}"/></param>
/// <param name="requestOptions">An <see cref="QueryRequestOptions"/> to specify query options for Cosmos SDK</param>
/// <param name="cancellationToken">The cancellation token to use when making asynchronous operations.</param>
/// <returns>The selected <typeparamref name="TResult"/> implementation that implements <see cref="IQueryResult{T}"/> of <see cref="IItem"/></returns>
/// <remarks>This method makes use of cosmos dbs continuation tokens for efficient, cost effective paging utilising low RUs</remarks>
ValueTask<TResult> QueryAsync<TResult>(
ISpecification<TItem, TResult> specification,
QueryRequestOptions requestOptions,
CancellationToken cancellationToken = default)
where TResult : IQueryResult<TItem>;

/// <summary>
/// Offers a load more paging implementation for infinite scroll scenarios.
/// Allows for efficient paging making use of cosmos DBs continuation tokens, making this implementation cost effective.
/// </summary>
/// <param name="predicate">A filter criteria for the paging operation, if null it will get all <see cref="IItem"/>s</param>
/// <param name="pageNumber">The page number to return from cosmos db.</param>
/// <param name="pageSize">The size of the page to return from cosmos db.</param>
/// <param name="returnTotal">Specifies whether or not to return the total number of items that matched the query. This defaults to false as it can be a very expensive operation.</param>
/// <param name="cancellationToken">The cancellation token to use when making asynchronous operations.</param>
/// <returns>An <see cref="IPageQueryResult{T}"/> of <see cref="IItem"/>s</returns>
/// <remarks>This method makes use of Cosmos DB's continuation tokens for efficient, cost effective paging utilizing low RUs</remarks>
ValueTask<IPageQueryResult<TItem>> PageAsync(
Expression<Func<TItem, bool>>? predicate = null,
int pageNumber = 1,
int pageSize = 25,
bool returnTotal = false,
CancellationToken cancellationToken = default);

/// <summary>
/// Offers a load more paging implementation for infinite scroll scenarios.
/// Allows for efficient paging making use of cosmos DBs continuation tokens, making this implementation cost effective.
/// </summary>
/// <param name="requestOptions">An <see cref="QueryRequestOptions"/> to specify query options for Cosmos SDK</param>
/// <param name="predicate">A filter criteria for the paging operation, if null it will get all <see cref="IItem"/>s</param>
/// <param name="pageNumber">The page number to return from cosmos db.</param>
/// <param name="pageSize">The size of the page to return from cosmos db.</param>
Expand All @@ -214,6 +273,7 @@ ValueTask<TResult> QueryAsync<TResult>(
/// <returns>An <see cref="IPageQueryResult{T}"/> of <see cref="IItem"/>s</returns>
/// <remarks>This method makes use of Cosmos DB's continuation tokens for efficient, cost effective paging utilizing low RUs</remarks>
ValueTask<IPageQueryResult<TItem>> PageAsync(
QueryRequestOptions requestOptions,
Expression<Func<TItem, bool>>? predicate = null,
int pageNumber = 1,
int pageSize = 25,
Expand Down Expand Up @@ -262,5 +322,49 @@ async IAsyncEnumerable<TItem> PageAsync(
}
}
}

/// <summary>
/// Wraps the existing paging support to return an <see cref="IAsyncEnumerable{T}"/>
/// where <c>T</c> is <typeparamref name="TItem"/>.
/// </summary>
/// <param name="predicate">A filter criteria for the paging operation, if null it will get all <see cref="IItem"/>s</param>
/// <param name="limit">The limit of how many items to yield. Defaults to <c>1,000</c>.</param>
/// <param name="cancellationToken">The optional <see cref="CancellationToken"/> used to </param>
/// <returns>An <see cref="IAsyncEnumerable{T}"/> where <c>T</c> is <typeparamref name="TItem"/>.</returns>
/// <remarks>This method makes use of Cosmos DB's continuation tokens for efficient, cost effective paging utilizing low RUs</remarks>
async IAsyncEnumerable<TItem> PageAsync(
QueryRequestOptions requestOptions,
Expression<Func<TItem, bool>>? predicate = null,
int limit = 1_000,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var collected = 0;
var currentPage = 0;
var hasMoreResults = true;

while (hasMoreResults && collected < limit
&& cancellationToken.IsCancellationRequested is false)
{
IPageQueryResult<TItem> page = await PageAsync(
requestOptions,
predicate,
pageNumber: ++currentPage,
25,
returnTotal: false,
cancellationToken);

hasMoreResults = page.HasNextPage.GetValueOrDefault();

foreach (TItem item in page.Items)
{
if (collected < limit)
{
yield return item;
}

collected++;
}
}
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


// ReSharper disable once CheckNamespace

namespace Microsoft.Azure.CosmosRepository;

/// <inheritdoc/>
Expand All @@ -22,4 +23,19 @@ public InMemoryRepository(ISpecificationEvaluator specificationEvaluator) =>

private void NotFound() => throw new CosmosException(string.Empty, HttpStatusCode.NotFound, 0, string.Empty, 0);
private void Conflict() => throw new CosmosException(string.Empty, HttpStatusCode.Conflict, 0, string.Empty, 0);

public ValueTask<IPage<TItem>> PageAsync(QueryRequestOptions requestOptions, Expression<Func<TItem, bool>>? predicate = null, int pageSize = 25, string? continuationToken = null, bool returnTotal = false, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

public ValueTask<TResult> QueryAsync<TResult>(ISpecification<TItem, TResult> specification, QueryRequestOptions requestOptions, CancellationToken cancellationToken = default) where TResult : IQueryResult<TItem>
{
throw new NotImplementedException();
}

public ValueTask<IPageQueryResult<TItem>> PageAsync(QueryRequestOptions requestOptions, Expression<Func<TItem, bool>>? predicate = null, int pageNumber = 1, int pageSize = 25, bool returnTotal = false, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}