Skip to content

Commit

Permalink
[APIPUB-69] - Implements Reverse Paging. (#70)
Browse files Browse the repository at this point in the history
* SPIKE:  Find approach for reverse paging

* fixes

* Code organization

* Code cleanup

* Trying to implement tests

* More tests

* Fixes after rebase

* Camel case on ApiPublisherSettings
  • Loading branch information
DavidJGapCR authored Sep 5, 2024
1 parent 0ca8baf commit f331deb
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/EdFi.Tools.ApiPublisher.Cli/apiPublisherSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"changeVersionPagingWindowSize": 25000,
"enableRateLimit": false,
"rateLimitNumberExecutions": 100,
"rateLimitTimeLimitMinutes": 1
"rateLimitTimeLimitMinutes": 1,
"useReversePaging": false
},
"authorizationFailureHandling": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,18 @@ protected override void Load(ContainerBuilder builder)
// Register resource page message producer using a ChangeVersion paging strategy
if (options.UseChangeVersionPaging)
{
builder.RegisterType<EdFiApiChangeVersionPagingStreamResourcePageMessageProducer>()
if (options.UseReversePaging)
{
builder.RegisterType<EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer>()
.As<IStreamResourcePageMessageProducer>()
.SingleInstance();
}
else
{
builder.RegisterType<EdFiApiChangeVersionPagingStreamResourcePageMessageProducer>()
.As<IStreamResourcePageMessageProducer>()
.SingleInstance();
}
}
// Register resource page message producer using a limit/offset paging strategy
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,26 @@ public async Task<IEnumerable<TProcessDataMessage>> HandleStreamResourcePageAsyn
break;
}

// Perform limit/offset final page check (for need for possible continuation)
if (message.IsFinalPage && JArray.Parse(responseContent).Count == limit)
if (!options.UseReversePaging)
{
if (_logger.IsEnabled(LogEventLevel.Debug))
// Perform limit/offset final page check (for need for possible continuation)
if (message.IsFinalPage && JArray.Parse(responseContent).Count == limit)
{
_logger.Debug($"{message.ResourceUrl}: Final page was full. Attempting to retrieve more data.");
}
if (_logger.IsEnabled(LogEventLevel.Debug))
{
_logger.Debug($"{message.ResourceUrl}: Final page was full. Attempting to retrieve more data.");
}

// Looks like there could be more data
offset += limit;
// Looks like there could be more data
offset += limit;

continue;
continue;
}
}
else
{
break;
}

}
catch (RateLimiterRejectedException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
$"{message.ResourceUrl}: Retrieving total count of items in change versions {message.ChangeWindow.MinChangeVersion} to {message.ChangeWindow.MaxChangeVersion}.");
}
else
{
{
_logger.Information($"{message.ResourceUrl}: Retrieving total count of items.");
}

Expand All @@ -59,7 +59,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
int limit = message.PageSize;

var pageMessages = new List<StreamResourcePageMessage<TProcessDataMessage>>();

if (totalCount > 0)
{
var noOfPartitions = Math.Ceiling((decimal)(message.ChangeWindow.MaxChangeVersion - message.ChangeWindow.MinChangeVersion)
Expand All @@ -72,7 +72,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
{
long changeVersionWindowEndValue = (changeVersionWindowStartValue > 0 ?
changeVersionWindowStartValue - 1 : changeVersionWindowStartValue) + options.ChangeVersionPagingWindowSize;

if (changeVersionWindowEndValue > message.ChangeWindow.MaxChangeVersion)
{
changeVersionWindowEndValue = message.ChangeWindow.MaxChangeVersion;
Expand Down Expand Up @@ -134,4 +134,4 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P

return pageMessages;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

using EdFi.Tools.ApiPublisher.Core.Configuration;
using EdFi.Tools.ApiPublisher.Core.Counting;
using EdFi.Tools.ApiPublisher.Core.Processing;
using EdFi.Tools.ApiPublisher.Core.Processing.Handlers;
using EdFi.Tools.ApiPublisher.Core.Processing.Messages;
using Serilog;
using System.Threading.Tasks.Dataflow;

namespace EdFi.Tools.ApiPublisher.Connections.Api.Processing.Source.MessageProducers;

public class EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer : IStreamResourcePageMessageProducer
{
private readonly ISourceTotalCountProvider _sourceTotalCountProvider;
private readonly ILogger _logger = Log.ForContext(typeof(EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer));

public EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer(ISourceTotalCountProvider sourceTotalCountProvider)
{
_sourceTotalCountProvider = sourceTotalCountProvider;
}

public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> ProduceMessagesAsync<TProcessDataMessage>(
StreamResourceMessage message,
Options options,
ITargetBlock<ErrorItemMessage> errorHandlingBlock,
Func<StreamResourcePageMessage<TProcessDataMessage>, string, IEnumerable<TProcessDataMessage>> createProcessDataMessages,
CancellationToken cancellationToken)
{
if (message.ChangeWindow?.MaxChangeVersion != default(long) && message.ChangeWindow?.MaxChangeVersion != null)
{
_logger.Information(
$"{message.ResourceUrl}: Retrieving total count of items in change versions {message.ChangeWindow.MinChangeVersion} to {message.ChangeWindow.MaxChangeVersion}.");
}
else
{
_logger.Information($"{message.ResourceUrl}: Retrieving total count of items.");
}

// Get total count of items in source resource for change window (if applicable)
var (totalCountSuccess, totalCount) = await _sourceTotalCountProvider.TryGetTotalCountAsync(
message.ResourceUrl,
options,
message.ChangeWindow,
errorHandlingBlock,
cancellationToken);

if (!totalCountSuccess)
{
// Allow processing to continue without performing additional work on this resource.
return Enumerable.Empty<StreamResourcePageMessage<TProcessDataMessage>>();
}

_logger.Information($"{message.ResourceUrl}: Total count = {totalCount}");

int limit = message.PageSize;

var pageMessages = new List<StreamResourcePageMessage<TProcessDataMessage>>();

if (totalCount > 0)
{
var noOfPartitions = Math.Ceiling((decimal)(message.ChangeWindow.MaxChangeVersion - message.ChangeWindow.MinChangeVersion)
/ options.ChangeVersionPagingWindowSize);

int changeVersionWindow = 0;
long changeVersionWindowStartValue = message.ChangeWindow.MinChangeVersion;

while (changeVersionWindow < noOfPartitions)
{
long changeVersionWindowEndValue = (changeVersionWindowStartValue > 0 ?
changeVersionWindowStartValue - 1 : changeVersionWindowStartValue) + options.ChangeVersionPagingWindowSize;

if (changeVersionWindowEndValue > message.ChangeWindow.MaxChangeVersion)
{
changeVersionWindowEndValue = message.ChangeWindow.MaxChangeVersion;
}
var changeWindow = new ChangeWindow
{
MinChangeVersion = changeVersionWindowStartValue,
MaxChangeVersion = changeVersionWindowEndValue
};
changeVersionWindowStartValue = changeVersionWindowEndValue + 1;

// Get total count of items in source resource for change window (if applicable)
var (totalCountOnWindowSuccess, totalCountOnWindow) = await _sourceTotalCountProvider.TryGetTotalCountAsync(
message.ResourceUrl,
options,
changeWindow,
errorHandlingBlock,
cancellationToken);

if (!totalCountOnWindowSuccess)
{
continue;
}

bool isLastOne = false;
long offsetOnWindow = totalCountOnWindow - limit;
if (offsetOnWindow < 0)
{
offsetOnWindow = 0;
isLastOne = true;
}

int limitOnWindow = totalCountOnWindow < limit ? (int)totalCountOnWindow : limit;
while ((offsetOnWindow >= 0 || isLastOne == true) && totalCountOnWindow > 0 && limitOnWindow > 0)
{
var pageMessage = new StreamResourcePageMessage<TProcessDataMessage>
{
// Resource-specific context
ResourceUrl = message.ResourceUrl,
PostAuthorizationFailureRetry = message.PostAuthorizationFailureRetry,

// Page-strategy specific context
Limit = limitOnWindow,
Offset = offsetOnWindow,

// Global processing context
ChangeWindow = changeWindow,
CreateProcessDataMessages = createProcessDataMessages,

CancellationSource = message.CancellationSource,
};

pageMessages.Add(pageMessage);
offsetOnWindow -= limit;
if (isLastOne)
break;
if (offsetOnWindow < 0)
{
limitOnWindow = limit + (int)offsetOnWindow;
offsetOnWindow = 0;
isLastOne = true;
}
}
changeVersionWindow++;

}
}

// Flag the last page for special "continuation" processing
if (pageMessages.Any())
{
// Page-strategy specific context
pageMessages.Last().IsFinalPage = true;
}

return pageMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ public int MaxDegreeOfParallelismForPostResourceItem
public int RateLimitNumberExecutions { get; set; } = 100;

public int RateLimitTimeLimitMinutes { get; set; } = 1;

public bool UseReversePaging { get; set; } = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public IConfigurationBuilder Create(string[] commandLineArgs)
["--enableRateLimit"] = "Options:EnableRateLimit",
["--rateLimitNumberExecutions"] = "Options:RateLimitNumberExecutions",
["--rateLimitTimeLimitMinutes"] = "Options:RateLimitTimeLimitMinutes",
["--useReversePaging"] = "Options:UseReversePaging",

// Resource selection (comma delimited paths - e.g. "/ed-fi/students,/ed-fi/studentSchoolAssociations")
["--include"] = "Connections:Source:Include",
Expand Down
11 changes: 8 additions & 3 deletions src/EdFi.Tools.ApiPublisher.Tests/Helpers/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public static ChangeProcessor CreateChangeProcessorWithDefaultDependencies(
IFakeHttpRequestHandler fakeSourceRequestHandler,
ApiConnectionDetails targetApiConnectionDetails,
IFakeHttpRequestHandler fakeTargetRequestHandler,
INodeJSService nodeJsService = null)
INodeJSService nodeJsService = null,
bool withReversePaging = false)
{
EdFiApiClient SourceApiClientFactory() =>
new EdFiApiClient(
Expand Down Expand Up @@ -300,8 +301,12 @@ EdFiApiClient TargetApiClientFactory() =>

var streamingResourceProcessor = new StreamingResourceProcessor(
new StreamResourceBlockFactory(
new EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider))),
(withReversePaging) ?
new EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider)) :
new EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider))
),
new StreamResourcePagesBlockFactory(new EdFiApiStreamResourcePageMessageHandler(sourceEdFiApiClientProvider)),
sourceApiConnectionDetails);

Expand Down
Loading

0 comments on commit f331deb

Please sign in to comment.