Skip to content

Commit

Permalink
Add support for entities from client shim
Browse files Browse the repository at this point in the history
  • Loading branch information
jviau committed Oct 12, 2023
1 parent 8a0c761 commit 586884b
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
182 changes: 182 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;

namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;

/// <summary>
/// A shim client for interacting with entities backend via <see cref="IOrchestrationServiceClient"/>.
/// </summary>
class ShimDurableEntityClient : DurableEntityClient
{
readonly IOrchestrationServiceClient client;
readonly IEntityOrchestrationService service;
readonly DataConverter converter;

/// <summary>
/// Initializes a new instance of the <see cref="ShimDurableEntityClient"/> class.
/// </summary>
/// <param name="name">The name of this client.</param>
/// <param name="client">The orchestration service client.</param>
/// <param name="service">The entity service.</param>
/// <param name="converter">The data converter.</param>
public ShimDurableEntityClient(
string name, IOrchestrationServiceClient client, IEntityOrchestrationService service, DataConverter converter)
: base(name)
{
this.client = Check.NotNull(client);
this.service = Check.NotNull(service);
this.converter = Check.NotNull(converter);
}

/// <inheritdoc/>
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(
CleanEntityStorageRequest? request = null,
bool continueUntilComplete = true,
CancellationToken cancellation = default)
{
CleanEntityStorageRequest r = request ?? CleanEntityStorageRequest.Default;
EntityBackendQueries.CleanEntityStorageResult result = await this.service
.EntityBackendQueries!.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = r.RemoveEmptyEntities,
ReleaseOrphanedLocks = r.ReleaseOrphanedLocks,
ContinuationToken = r.ContinuationToken,
},
cancellation);

return new()
{
EmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
OrphanedLocksReleased = result.OrphanedLocksReleased,
ContinuationToken = result.ContinuationToken,
};
}

/// <inheritdoc/>
public override AsyncPageable<EntityMetadata> GetAllEntitiesAsync(EntityQuery? filter = null)
=> this.GetAllEntitiesAsync(this.Convert, filter);

/// <inheritdoc/>
public override AsyncPageable<EntityMetadata<T>> GetAllEntitiesAsync<T>(EntityQuery? filter = null)
=> this.GetAllEntitiesAsync(this.Convert<T>, filter);

/// <inheritdoc/>
public override async Task<EntityMetadata?> GetEntityAsync(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert(await this.service.EntityBackendQueries!.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
public override async Task<EntityMetadata<T>?> GetEntityAsync<T>(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert<T>(await this.service.EntityBackendQueries!.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
public override async Task SignalEntityAsync(
EntityInstanceId id,
string operationName,
object? input = null,
SignalEntityOptions? options = null,
CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);

DateTimeOffset? scheduledTime = options?.SignalTime;
string? serializedInput = this.converter.Serialize(input);

EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal(
new OrchestrationInstance() { InstanceId = id.ToString() },
Guid.NewGuid(),
operationName,
serializedInput,
EntityMessageEvent.GetCappedScheduledTime(
DateTime.UtcNow,
this.service.EntityBackendProperties!.MaximumSignalDelayTime,
scheduledTime?.UtcDateTime));

await this.client.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
}

AsyncPageable<TMetadata> GetAllEntitiesAsync<TMetadata>(
Func<EntityBackendQueries.EntityMetadata, TMetadata> select,
EntityQuery? filter)
where TMetadata : notnull
{
bool includeState = filter?.IncludeState ?? true;
bool includeTransient = filter?.IncludeTransient ?? false;
string startsWith = filter?.InstanceIdStartsWith ?? string.Empty;
DateTime? lastModifiedFrom = filter?.LastModifiedFrom?.UtcDateTime;
DateTime? lastModifiedTo = filter?.LastModifiedTo?.UtcDateTime;

return Pageable.Create(async (continuation, size, cancellation) =>
{
size ??= filter?.PageSize;
EntityBackendQueries.EntityQueryResult result = await this.service
.EntityBackendQueries!.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = startsWith,
LastModifiedFrom = lastModifiedFrom,
LastModifiedTo = lastModifiedTo,
IncludeTransient = includeTransient,
IncludeState = includeState,
ContinuationToken = continuation,
PageSize = size,
},
cancellation);
return new Page<TMetadata>(result.Results.Select(select).ToList(), result.ContinuationToken);
});
}

EntityMetadata<T> Convert<T>(EntityBackendQueries.EntityMetadata metadata)
{
return new(
new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key),
this.converter.Deserialize<T>(metadata.SerializedState))
{
LastModifiedTime = metadata.LastModifiedTime,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}

EntityMetadata<T>? Convert<T>(EntityBackendQueries.EntityMetadata? metadata)
{
if (metadata is null)
{
return null;
}

return this.Convert<T>(metadata.Value);
}

EntityMetadata Convert(EntityBackendQueries.EntityMetadata metadata)
{
SerializedData? data = metadata.SerializedState is null ? null : new(metadata.SerializedState, this.converter);
return new(new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key), data)
{
LastModifiedTime = metadata.LastModifiedTime,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}

EntityMetadata? Convert(EntityBackendQueries.EntityMetadata? metadata)
{
if (metadata is null)
{
return null;
}

return this.Convert(metadata.Value);
}
}
23 changes: 23 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

using System.Diagnostics.CodeAnalysis;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Core = DurableTask.Core;
Expand All @@ -18,6 +20,7 @@ namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
class ShimDurableTaskClient : DurableTaskClient
{
readonly ShimDurableTaskClientOptions options;
ShimDurableEntityClient? entities;

/// <summary>
/// Initializes a new instance of the <see cref="ShimDurableTaskClient"/> class.
Expand All @@ -42,6 +45,26 @@ public ShimDurableTaskClient(string name, ShimDurableTaskClientOptions options)
this.options = Check.NotNull(options);
}

/// <inheritdoc/>
public override DurableEntityClient Entities
{
get
{
if (this.entities is null)
{
if (this.options.Client is not IEntityOrchestrationService entityService)
{
throw new NotSupportedException(
"The configured IOrchestrationServiceClient does not support entities.");
}

this.entities = new(this.Name, this.options.Client, entityService, this.options.DataConverter);
}

return this.entities;
}
}

DataConverter DataConverter => this.options.DataConverter;

IOrchestrationServiceClient Client => this.options.Client!;
Expand Down

0 comments on commit 586884b

Please sign in to comment.