Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise queries #204

Merged
2 changes: 1 addition & 1 deletion eng/proto
8 changes: 5 additions & 3 deletions src/Abstractions/Entities/TaskEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ public abstract void SignalEntity(
/// </summary>
/// <param name="name">The name of the orchestration to start.</param>
/// <param name="options">The options for starting the orchestration.</param>
public virtual void StartOrchestration(TaskName name, StartOrchestrationOptions options)
=> this.StartOrchestration(name, null, options);
/// <returns>The instance id for the new orchestration.</returns>
public virtual string ScheduleNewOrchestration(TaskName name, StartOrchestrationOptions options)
=> this.ScheduleNewOrchestration(name, null, options);

/// <summary>
/// Starts an orchestration.
/// </summary>
/// <param name="name">The name of the orchestration to start.</param>
/// <param name="input">The input for the orchestration.</param>
/// <param name="options">The options for starting the orchestration.</param>
public abstract void StartOrchestration(
/// <returns>The instance id for the new orchestration.</returns>
public abstract string ScheduleNewOrchestration(
TaskName name, object? input = null, StartOrchestrationOptions? options = null);
}
17 changes: 14 additions & 3 deletions src/Client/Core/Entities/CleanEntityStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@ namespace Microsoft.DurableTask.Client.Entities;
public readonly record struct CleanEntityStorageRequest
{
/// <summary>
/// Gets a value indicating whether to remove empty entities.
/// Gets the default request parameters. The default is meant to represent
/// "maximal" cleaning that is safe to call at all times.
/// </summary>
public static CleanEntityStorageRequest Default => new()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also add a parameterless constructor and set all the bool properties to true there. This doesn't change default - but just an idea for you to consider.

{
RemoveEmptyEntities = true,
ReleaseOrphanedLocks = true,
ContinuationToken = null,
};

/// <summary>
/// Gets a value indicating whether to remove empty entities. Defaults to true.
/// </summary>
/// <remarks>
/// An entity is considered empty, and is removed, if it has no state, is not locked.
/// </remarks>
public bool RemoveEmptyEntities { get; init; }

/// <summary>
/// Gets a value indicating whether to release orphaned locks or not.
/// Gets a value indicating whether to release orphaned locks or not. Defaults to true.
/// </summary>
/// <remarks>
/// Locks are considered orphaned, and are released, and if the orchestration that holds them is not in state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Locks are considered orphaned, and are released, and if the orchestration that holds them is not in state
/// Locks are considered orphaned, and are released, if the orchestration that holds them is not in state

Expand All @@ -28,7 +39,7 @@ public readonly record struct CleanEntityStorageRequest
public bool ReleaseOrphanedLocks { get; init; }

/// <summary>
/// Gets the continuation token to resume a previous <see cref="CleanEntityStorageRequest"/>.
/// Gets the continuation token to resume a previous <see cref="CleanEntityStorageRequest"/>. Defaults to null.
/// </summary>
public string? ContinuationToken { get; init; }
}
Expand Down
2 changes: 1 addition & 1 deletion src/Client/Core/Entities/DurableEntityClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public virtual Task SignalEntityAsync(
/// <param name="cancellation">The cancellation token to cancel the operation.</param>
/// <returns>A task that completes when the operation is finished.</returns>
public abstract Task<CleanEntityStorageResult> CleanEntityStorageAsync(
CleanEntityStorageRequest request = default,
CleanEntityStorageRequest? request = null,
bool continueUntilComplete = true,
CancellationToken cancellation = default);
}
10 changes: 10 additions & 0 deletions src/Client/Core/Entities/EntityMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public EntityMetadata(EntityInstanceId id, TState? state)
/// </summary>
public DateTimeOffset LastModifiedTime { get; init; }

/// <summary>
/// Gets the size of the backlog queue, if there is a backlog, and if that metric is supported by the backend.
/// </summary>
public int BacklogQueueSize { get; init; }

/// <summary>
/// Gets the instance id of the orchestration that has locked this entity, or null if the entity is not locked.
/// </summary>
public string? LockedBy { get; init; }

/// <summary>
/// Gets a value indicating whether this metadata response includes the entity state.
/// </summary>
Expand Down
15 changes: 13 additions & 2 deletions src/Client/Core/Entities/EntityQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,20 @@ public string? InstanceIdStartsWith
public bool IncludeState { get; init; } = true;

/// <summary>
/// Gets the size of each page to return.
/// Gets a value indicating whether to include metadata about transient entities. Defaults to false.
/// </summary>
public int PageSize { get; init; } = DefaultPageSize;
/// <remarks> Transient entities are entities that do not have an application-defined state, but for which the storage provider is
/// tracking metadata for synchronization purposes.
/// For example, a transient entity may be observed when the entity is in the process of being created or deleted, or
/// when the entity has been locked by a critical section. By default, transient entities are not included in queries since they are
/// considered to "not exist" from the perspective of the user application.
/// </remarks>
public bool IncludeTransient { get; init; }

/// <summary>
/// Gets the size of each page to return. If null, the page size is determined by the backend.
/// </summary>
public int? PageSize { get; init; }

/// <summary>
/// Gets the continuation token to resume a previous query.
Expand Down
42 changes: 31 additions & 11 deletions src/Client/Grpc/GrpcDurableEntityClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ public override AsyncPageable<EntityMetadata<TState>> GetAllEntitiesAsync<TState

/// <inheritdoc/>
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(
CleanEntityStorageRequest request = default,
CleanEntityStorageRequest? request = null,
bool continueUntilComplete = true,
CancellationToken cancellation = default)
{
string? continuationToken = request.ContinuationToken;
CleanEntityStorageRequest req = request ?? CleanEntityStorageRequest.Default;
string? continuationToken = req.ContinuationToken;
int emptyEntitiesRemoved = 0;
int orphanedLocksReleased = 0;

Expand All @@ -104,8 +105,8 @@ public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(
P.CleanEntityStorageResponse response = await this.sidecarClient.CleanEntityStorageAsync(
new P.CleanEntityStorageRequest
{
RemoveEmptyEntities = request.RemoveEmptyEntities,
ReleaseOrphanedLocks = request.ReleaseOrphanedLocks,
RemoveEmptyEntities = req.RemoveEmptyEntities,
ReleaseOrphanedLocks = req.ReleaseOrphanedLocks,
ContinuationToken = continuationToken,
},
cancellationToken: cancellation);
Expand Down Expand Up @@ -165,6 +166,7 @@ AsyncPageable<TMetadata> GetAllEntitiesCoreAsync<TMetadata>(
where TMetadata : class
{
bool includeState = filter?.IncludeState ?? true;
bool includeTransient = filter?.IncludeTransient ?? false;
string startsWith = filter?.InstanceIdStartsWith ?? string.Empty;
DateTimeOffset? lastModifiedFrom = filter?.LastModifiedFrom;
DateTimeOffset? lastModifiedTo = filter?.LastModifiedTo;
Expand All @@ -184,6 +186,7 @@ AsyncPageable<TMetadata> GetAllEntitiesCoreAsync<TMetadata>(
LastModifiedFrom = lastModifiedFrom?.ToTimestamp(),
LastModifiedTo = lastModifiedTo?.ToTimestamp(),
IncludeState = includeState,
IncludeTransient = includeTransient,
PageSize = pageSize,
ContinuationToken = continuation ?? filter?.ContinuationToken,
},
Expand All @@ -208,25 +211,42 @@ EntityMetadata ToEntityMetadata(P.EntityMetadata metadata, bool includeState)
{
var coreEntityId = DTCore.Entities.EntityId.FromString(metadata.InstanceId);
EntityInstanceId entityId = new(coreEntityId.Name, coreEntityId.Key);
bool hasState = metadata.SerializedState != null;

SerializedData? data = includeState ? new(metadata.SerializedState, this.dataConverter) : null;
SerializedData? data = (includeState && hasState) ? new(metadata.SerializedState!, this.dataConverter) : null;
return new EntityMetadata(entityId, data)
{
LastModifiedTime = metadata.LastModifiedTime.ToDateTimeOffset(),
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}

EntityMetadata<T> ToEntityMetadata<T>(P.EntityMetadata metadata, bool includeState)
{
var coreEntityId = DTCore.Entities.EntityId.FromString(metadata.InstanceId);
EntityInstanceId entityId = new(coreEntityId.Name, coreEntityId.Key);

T? data = includeState ? this.dataConverter.Deserialize<T>(metadata.SerializedState) : default;
DateTimeOffset lastModified = metadata.LastModifiedTime.ToDateTimeOffset();
bool hasState = metadata.SerializedState != null;

// Use separate constructors to ensure default structs get correct state inclusion value.
return includeState
? new EntityMetadata<T>(entityId, data) { LastModifiedTime = lastModified }
: new EntityMetadata<T>(entityId) { LastModifiedTime = lastModified };
if (includeState && hasState)
{
T? data = includeState ? this.dataConverter.Deserialize<T>(metadata.SerializedState) : default;
return new EntityMetadata<T>(entityId, data)
{
LastModifiedTime = lastModified,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}
else
{
return new EntityMetadata<T>(entityId)
{
LastModifiedTime = lastModified,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}
}
}
2 changes: 2 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
Input = operationAction.StartNewOrchestration.Input,
InstanceId = operationAction.StartNewOrchestration.InstanceId,
Version = operationAction.StartNewOrchestration.Version,
ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
};
default:
throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
Expand Down Expand Up @@ -571,6 +572,7 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
Input = startNewOrchestrationAction.Input,
Version = startNewOrchestrationAction.Version,
InstanceId = startNewOrchestrationAction.InstanceId,
ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
};
break;
}
Expand Down
7 changes: 5 additions & 2 deletions src/Worker/Core/Shims/TaskEntityShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,20 @@ public override void SignalEntity(EntityInstanceId id, string operationName, obj
});
}

public override void StartOrchestration(TaskName name, object? input = null, StartOrchestrationOptions? options = null)
public override string ScheduleNewOrchestration(TaskName name, object? input = null, StartOrchestrationOptions? options = null)
{
Check.NotEntity(true, options?.InstanceId);

string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N");
this.operationActions.Add(new StartNewOrchestrationOperationAction()
{
Name = name.Name,
Version = name.Version,
InstanceId = Guid.NewGuid().ToString("N"),
InstanceId = instanceId,
Input = this.dataConverter.Serialize(input),
ScheduledStartTime = options?.StartAt?.UtcDateTime,
});
return instanceId;
}
}

Expand Down
Loading