Skip to content

Commit

Permalink
Add support for entities from client shim
Browse files Browse the repository at this point in the history
  • Loading branch information
jviau committed Oct 12, 2023
1 parent 586884b commit e1d523a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

namespace Microsoft.DurableTask.Client;

Expand Down Expand Up @@ -58,17 +60,51 @@ public static IDurableTaskClientBuilder UseOrchestrationService(
builder.Services.AddOptions<ShimDurableTaskClientOptions>(builder.Name)
.PostConfigure<IServiceProvider>((opt, sp) =>
{
if (opt.Client is not null)
{
return;
}
// Try to resolve client from service container.
opt.Client = sp.GetService<IOrchestrationServiceClient>()
?? sp.GetService<IOrchestrationService>() as IOrchestrationServiceClient;
ConfigureClient(sp, opt);
ConfigureEntities(builder.Name, sp, opt);
})
.Validate(x => x.Client is not null, "ShimDurableTaskClientOptions.Client must not be null.");
.Validate(x => x.Client is not null, "ShimDurableTaskClientOptions.Client must not be null.")
.Validate(
x => !x.EnableEntitySupport || x.Entities.Queries is not null,
"ShimDurableTaskClientOptions.Entities.Queries must not be null when entity support is enabled.");

return builder.UseBuildTarget<ShimDurableTaskClient, ShimDurableTaskClientOptions>();
}

static void ConfigureClient(IServiceProvider services, ShimDurableTaskClientOptions options)
{
if (options.Client is not null)
{
return;
}

// Try to resolve client from service container.
options.Client = services.GetService<IOrchestrationServiceClient>()
?? services.GetService<IOrchestrationService>() as IOrchestrationServiceClient;
}

static void ConfigureEntities(string name, IServiceProvider services, ShimDurableTaskClientOptions options)
{
if (options.Entities.Queries is null)
{
options.Entities.Queries = services.GetService<EntityBackendQueries>()
?? GetEntityService(services, options)?.EntityBackendQueries;
}

if (options.Entities.MaxSignalDelayTime is null)
{
EntityBackendProperties? properties = services.GetService<IOptionsMonitor<EntityBackendProperties>>()?.Get(name)
?? GetEntityService(services, options)?.EntityBackendProperties;
options.Entities.MaxSignalDelayTime = properties?.MaximumSignalDelayTime;
}
}

static IEntityOrchestrationService? GetEntityService(
IServiceProvider services, ShimDurableTaskClientOptions options)
{
return services.GetService<IEntityOrchestrationService>()
?? services.GetService<IOrchestrationService>() as IEntityOrchestrationService
?? services.GetService<IOrchestrationServiceClient>() as IEntityOrchestrationService
?? options.Client as IEntityOrchestrationService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,38 @@ namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
/// </summary>
class ShimDurableEntityClient : DurableEntityClient
{
readonly IOrchestrationServiceClient client;
readonly IEntityOrchestrationService service;
readonly DataConverter converter;
readonly ShimDurableTaskClientOptions options;

/// <summary>
/// Initializes a new instance of the <see cref="ShimDurableEntityClient"/> class.
/// </summary>
/// <param name="name">The name of this client.</param>
/// <param name="client">The orchestration service client.</param>
/// <param name="service">The entity service.</param>
/// <param name="converter">The data converter.</param>
public ShimDurableEntityClient(
string name, IOrchestrationServiceClient client, IEntityOrchestrationService service, DataConverter converter)
/// <param name="options">The client options..</param>
public ShimDurableEntityClient(string name, ShimDurableTaskClientOptions options)
: base(name)
{
this.client = Check.NotNull(client);
this.service = Check.NotNull(service);
this.converter = Check.NotNull(converter);
this.options = Check.NotNull(options);
}

EntityBackendQueries Queries => this.options.Entities.Queries!;

DataConverter Converter => this.options.DataConverter;

/// <inheritdoc/>
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(
CleanEntityStorageRequest? request = null,
bool continueUntilComplete = true,
CancellationToken cancellation = default)
{
CleanEntityStorageRequest r = request ?? CleanEntityStorageRequest.Default;
EntityBackendQueries.CleanEntityStorageResult result = await this.service
.EntityBackendQueries!.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = r.RemoveEmptyEntities,
ReleaseOrphanedLocks = r.ReleaseOrphanedLocks,
ContinuationToken = r.ContinuationToken,
},
cancellation);
EntityBackendQueries.CleanEntityStorageResult result = await this.Queries.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = r.RemoveEmptyEntities,
ReleaseOrphanedLocks = r.ReleaseOrphanedLocks,
ContinuationToken = r.ContinuationToken,
},
cancellation);

return new()
{
Expand All @@ -69,13 +65,13 @@ public override AsyncPageable<EntityMetadata<T>> GetAllEntitiesAsync<T>(EntityQu
/// <inheritdoc/>
public override async Task<EntityMetadata?> GetEntityAsync(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert(await this.service.EntityBackendQueries!.GetEntityAsync(
=> this.Convert(await this.Queries.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
public override async Task<EntityMetadata<T>?> GetEntityAsync<T>(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert<T>(await this.service.EntityBackendQueries!.GetEntityAsync(
=> this.Convert<T>(await this.Queries.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
Expand All @@ -90,7 +86,7 @@ public override async Task SignalEntityAsync(
Check.NotNull(id.Key);

DateTimeOffset? scheduledTime = options?.SignalTime;
string? serializedInput = this.converter.Serialize(input);
string? serializedInput = this.Converter.Serialize(input);

EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal(
new OrchestrationInstance() { InstanceId = id.ToString() },
Expand All @@ -99,10 +95,10 @@ public override async Task SignalEntityAsync(
serializedInput,
EntityMessageEvent.GetCappedScheduledTime(
DateTime.UtcNow,
this.service.EntityBackendProperties!.MaximumSignalDelayTime,
this.options.Entities.MaxSignalDelayTimeOrDefault,
scheduledTime?.UtcDateTime));

await this.client.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
}

AsyncPageable<TMetadata> GetAllEntitiesAsync<TMetadata>(
Expand All @@ -119,19 +115,18 @@ AsyncPageable<TMetadata> GetAllEntitiesAsync<TMetadata>(
return Pageable.Create(async (continuation, size, cancellation) =>
{
size ??= filter?.PageSize;
EntityBackendQueries.EntityQueryResult result = await this.service
.EntityBackendQueries!.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = startsWith,
LastModifiedFrom = lastModifiedFrom,
LastModifiedTo = lastModifiedTo,
IncludeTransient = includeTransient,
IncludeState = includeState,
ContinuationToken = continuation,
PageSize = size,
},
cancellation);
EntityBackendQueries.EntityQueryResult result = await this.Queries.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = startsWith,
LastModifiedFrom = lastModifiedFrom,
LastModifiedTo = lastModifiedTo,
IncludeTransient = includeTransient,
IncludeState = includeState,
ContinuationToken = continuation,
PageSize = size,
},
cancellation);
return new Page<TMetadata>(result.Results.Select(select).ToList(), result.ContinuationToken);
});
Expand All @@ -141,7 +136,7 @@ EntityMetadata<T> Convert<T>(EntityBackendQueries.EntityMetadata metadata)
{
return new(
new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key),
this.converter.Deserialize<T>(metadata.SerializedState))
this.Converter.Deserialize<T>(metadata.SerializedState))
{
LastModifiedTime = metadata.LastModifiedTime,
BacklogQueueSize = metadata.BacklogQueueSize,
Expand All @@ -161,7 +156,7 @@ EntityMetadata<T> Convert<T>(EntityBackendQueries.EntityMetadata metadata)

EntityMetadata Convert(EntityBackendQueries.EntityMetadata metadata)
{
SerializedData? data = metadata.SerializedState is null ? null : new(metadata.SerializedState, this.converter);
SerializedData? data = metadata.SerializedState is null ? null : new(metadata.SerializedState, this.Converter);
return new(new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key), data)
{
LastModifiedTime = metadata.LastModifiedTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,20 @@ public override DurableEntityClient Entities
{
get
{
if (!this.options.EnableEntitySupport)
{
throw new InvalidOperationException("Entity support is not enabled.");
}

if (this.entities is null)
{
if (this.options.Client is not IEntityOrchestrationService entityService)
if (this.options.Entities.Queries is null)
{
throw new NotSupportedException(
"The configured IOrchestrationServiceClient does not support entities.");
}

this.entities = new(this.Name, this.options.Client, entityService, this.options.DataConverter);
this.entities = new(this.Name, this.options);
}

return this.entities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.Entities;

namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;

Expand All @@ -15,4 +17,33 @@ public sealed class ShimDurableTaskClientOptions : DurableTaskClientOptions
/// If not manually set, this will be resolved from the <see cref="IServiceProvider" />, if available.
/// </summary>
public IOrchestrationServiceClient? Client { get; set; }

/// <summary>
/// Gets the <see cref="ShimDurableTaskEntityOptions"/> to configure entity support.
/// </summary>
public ShimDurableTaskEntityOptions Entities { get; } = new();
}

/// <summary>
/// Options for entities.
/// </summary>
public class ShimDurableTaskEntityOptions
{
/// <summary>
/// Gets or sets the <see cref="EntityBackendQueries"/> to use in the <see cref="DurableEntityClient" />.
/// If not set manually, this will attempt to be resolved automatically by looking for
/// <see cref="IEntityOrchestrationService"/> in the <see cref="IServiceProvider"/>.
/// </summary>
public EntityBackendQueries? Queries { get; set; }

/// <summary>
/// Gets or sets the maximum time span to use for signal delay. If not set manually, will attempt to be resolved
/// through the service container. This will finally default to 3 days if it cannot be any other means.
/// </summary>
public TimeSpan? MaxSignalDelayTime { get; set; }

/// <summary>
/// Gets the max signal delay time.
/// </summary>
internal TimeSpan MaxSignalDelayTimeOrDefault => this.MaxSignalDelayTime ?? TimeSpan.FromDays(3);
}

0 comments on commit e1d523a

Please sign in to comment.