diff --git a/CHANGELOG.md b/CHANGELOG.md index 18e51383..eb8d1690 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ ## v1.0.0 - Added `SuspendInstanceAsync` and `ResumeInstanceAsync` to `DurableTaskClient`. +- Rename `DurableTaskClient` methods + - `TerminateAsync` -> `TerminateInstanceAsync` + - `PurgeInstances` -> `PurgeAllInstancesAsync` + - `PurgeInstanceMetadataAsync` -> `PurgeInstanceAsync` + - `GetInstanceMetadataAsync` -> `GetInstanceAsync` + - `GetInstances` -> `GetAllInstancesAsync` - `TaskOrchestrationContext.CreateReplaySafeLogger` now creates `ILogger` directly (as opposed to wrapping an existing `ILogger`). - Durable Functions class-based syntax now resolves `ITaskActivity` instances from `IServiceProvider`, if available there. - `DurableTaskClient` methods have been touched up to ensure `CancellationToken` is included, as well as is the last parameter. diff --git a/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs b/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs index f591a3b7..860901f4 100644 --- a/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs +++ b/src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs @@ -30,7 +30,7 @@ public static Task PurgeInstancesAsync( { Check.NotNull(client); PurgeInstancesFilter filter = new(createdFrom, createdTo, statuses); - return client.PurgeInstancesAsync(filter, cancellation); + return client.PurgeAllInstancesAsync(filter, cancellation); } /// diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 9cf3e822..9fb8d120 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -73,7 +73,7 @@ public virtual Task ScheduleNewOrchestrationInstanceAsync( /// and health of the backend task hub, and whether a start time was provided via . /// /// The task associated with this method completes after the orchestration instance was successfully scheduled. You - /// can use the to query the status of the + /// can use the to query the status of the /// scheduled instance, the method to wait /// for the instance to transition out of the status, or the /// method to wait for the instance to @@ -140,10 +140,67 @@ public virtual Task RaiseEventAsync( public abstract Task RaiseEventAsync( string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default); - /// - public virtual Task TerminateAsync( + /// + public virtual Task WaitForInstanceStartAsync( + string instanceId, CancellationToken cancellation) + => this.WaitForInstanceStartAsync(instanceId, false, cancellation); + + /// + /// Waits for an orchestration to start running and returns a + /// object that contains metadata about the started instance. + /// + /// + /// + /// A "started" orchestration instance is any instance not in the + /// state. + /// + /// If an orchestration instance is already running when this method is called, the method will return immediately. + /// + /// + /// The unique ID of the orchestration instance to wait for. + /// + /// Specify true to fetch the orchestration instance's inputs, outputs, and custom status, or false to + /// omit them. The default value is false to minimize the network bandwidth, serialization, and memory costs + /// associated with fetching the instance metadata. + /// + /// A that can be used to cancel the wait operation. + /// + /// Returns a record that describes the orchestration instance and its execution + /// status or null if no instance with ID is found. + /// + public abstract Task WaitForInstanceStartAsync( + string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); + + /// + public virtual Task WaitForInstanceCompletionAsync( string instanceId, CancellationToken cancellation) - => this.TerminateAsync(instanceId, null, cancellation); + => this.WaitForInstanceCompletionAsync(instanceId, false, cancellation); + + /// + /// Waits for an orchestration to complete and returns a + /// object that contains metadata about the started instance. + /// + /// + /// + /// A "completed" orchestration instance is any instance in one of the terminal states. For example, the + /// , , or + /// states. + /// + /// Orchestrations are long-running and could take hours, days, or months before completing. + /// Orchestrations can also be eternal, in which case they'll never complete unless terminated. + /// In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are + /// enforced using the parameter. + /// + /// If an orchestration instance is already complete when this method is called, the method will return immediately. + /// + /// + /// + public abstract Task WaitForInstanceCompletionAsync( + string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); + + /// + public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken cancellation) + => this.TerminateInstanceAsync(instanceId, null, cancellation); /// /// Terminates a running orchestration instance and updates its runtime status to @@ -175,17 +232,16 @@ public virtual Task TerminateAsync( /// termination of the orchestration once enqueued. /// /// A task that completes when the terminate message is enqueued. - public abstract Task TerminateAsync( + public abstract Task TerminateInstanceAsync( string instanceId, object? output = null, CancellationToken cancellation = default); - /// - public virtual Task WaitForInstanceStartAsync( - string instanceId, CancellationToken cancellation) - => this.WaitForInstanceStartAsync(instanceId, false, cancellation); + /// + public virtual Task SuspendInstanceAsync(string instanceId, CancellationToken cancellation) + => this.SuspendInstanceAsync(instanceId, null, cancellation); /// - /// Suspends an orchestration instance, halting processing of it until is used - /// to resume the orchestration. + /// Suspends an orchestration instance, halting processing of it until + /// is used to resume the orchestration. /// /// The instance ID of the orchestration to suspend. /// The optional suspension reason. @@ -197,8 +253,12 @@ public virtual Task WaitForInstanceStartAsync( public abstract Task SuspendInstanceAsync( string instanceId, string? reason = null, CancellationToken cancellation = default); + /// + public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken cancellation) + => this.ResumeInstanceAsync(instanceId, null, cancellation); + /// - /// Resumes an orchestration instance that was suspended via . + /// Resumes an orchestration instance that was suspended via . /// /// The instance ID of the orchestration to resume. /// The optional resume reason. @@ -210,63 +270,10 @@ public abstract Task SuspendInstanceAsync( public abstract Task ResumeInstanceAsync( string instanceId, string? reason = null, CancellationToken cancellation = default); - /// - /// Waits for an orchestration to start running and returns a - /// object that contains metadata about the started instance. - /// - /// - /// - /// A "started" orchestration instance is any instance not in the - /// state. - /// - /// If an orchestration instance is already running when this method is called, the method will return immediately. - /// - /// - /// The unique ID of the orchestration instance to wait for. - /// - /// Specify true to fetch the orchestration instance's inputs, outputs, and custom status, or false to - /// omit them. The default value is false to minimize the network bandwidth, serialization, and memory costs - /// associated with fetching the instance metadata. - /// - /// A that can be used to cancel the wait operation. - /// - /// Returns a record that describes the orchestration instance and its execution - /// status or null if no instance with ID is found. - /// - public abstract Task WaitForInstanceStartAsync( - string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); - - /// - public virtual Task WaitForInstanceCompletionAsync( - string instanceId, CancellationToken cancellation) - => this.WaitForInstanceCompletionAsync(instanceId, false, cancellation); - - /// - /// Waits for an orchestration to complete and returns a - /// object that contains metadata about the started instance. - /// - /// - /// - /// A "completed" orchestration instance is any instance in one of the terminal states. For example, the - /// , , or - /// states. - /// - /// Orchestrations are long-running and could take hours, days, or months before completing. - /// Orchestrations can also be eternal, in which case they'll never complete unless terminated. - /// In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are - /// enforced using the parameter. - /// - /// If an orchestration instance is already complete when this method is called, the method will return immediately. - /// - /// - /// - public abstract Task WaitForInstanceCompletionAsync( - string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); - - /// - public virtual Task GetInstanceMetadataAsync( + /// + public virtual Task GetInstancesAsync( string instanceId, CancellationToken cancellation) - => this.GetInstanceMetadataAsync(instanceId, false, cancellation); + => this.GetInstancesAsync(instanceId, false, cancellation); /// /// Fetches orchestration instance metadata from the configured durable store. @@ -278,15 +285,15 @@ public abstract Task WaitForInstanceCompletionAsync( /// memory costs associated with fetching the instance metadata. /// /// - public abstract Task GetInstanceMetadataAsync( + public abstract Task GetInstancesAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default); /// /// Queries orchestration instances. /// - /// Filters down the instances included in the query. + /// Filters down the instances included in the query. /// An async pageable of the query results. - public abstract AsyncPageable GetInstances(OrchestrationQuery? query = null); + public abstract AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null); /// /// Purges orchestration instance metadata from the durable store. @@ -314,7 +321,7 @@ public abstract Task WaitForInstanceCompletionAsync( /// value of 1 or 0, depending on whether the target /// instance was successfully purged. /// - public abstract Task PurgeInstanceMetadataAsync( + public abstract Task PurgeInstanceAsync( string instanceId, CancellationToken cancellation = default); /// @@ -328,7 +335,7 @@ public abstract Task PurgeInstanceMetadataAsync( /// This method returns a object after the operation has completed with a /// indicating the number of orchestration instances that were purged. /// - public abstract Task PurgeInstancesAsync( + public abstract Task PurgeAllInstancesAsync( PurgeInstancesFilter filter, CancellationToken cancellation = default); // TODO: Create task hub diff --git a/src/Client/Core/OrchestrationMetadata.cs b/src/Client/Core/OrchestrationMetadata.cs index e2394696..3fc43b0a 100644 --- a/src/Client/Core/OrchestrationMetadata.cs +++ b/src/Client/Core/OrchestrationMetadata.cs @@ -11,7 +11,7 @@ namespace Microsoft.DurableTask.Client; /// /// /// Instances of this class are produced by methods in the class, such as -/// , +/// , /// and /// . /// @@ -117,7 +117,7 @@ public OrchestrationMetadata(string name, string instanceId) /// /// /// This method can only be used when inputs and outputs are explicitly requested from the - /// or + /// or /// method that produced /// this object. /// @@ -143,7 +143,7 @@ public OrchestrationMetadata(string name, string instanceId) /// /// /// This method can only be used when inputs and outputs are explicitly requested from the - /// or + /// or /// method that produced /// this object. /// @@ -169,7 +169,7 @@ public OrchestrationMetadata(string name, string instanceId) /// /// /// This method can only be used when inputs and outputs are explicitly requested from the - /// or + /// or /// method that produced /// this object. /// diff --git a/src/Client/Core/PurgeInstancesFilter.cs b/src/Client/Core/PurgeInstancesFilter.cs index b3793a80..04c7f9c3 100644 --- a/src/Client/Core/PurgeInstancesFilter.cs +++ b/src/Client/Core/PurgeInstancesFilter.cs @@ -10,6 +10,8 @@ namespace Microsoft.DurableTask.Client; /// Date created to. /// The statuses. public record PurgeInstancesFilter( - DateTimeOffset? CreatedFrom, DateTimeOffset? CreatedTo, IEnumerable? Statuses) + DateTimeOffset? CreatedFrom = null, + DateTimeOffset? CreatedTo = null, + IEnumerable? Statuses = null) { } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 98a40f88..c3a97797 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -116,7 +116,7 @@ public override async Task RaiseEventAsync( } /// - public override async Task TerminateAsync( + public override async Task TerminateInstanceAsync( string instanceId, object? output = null, CancellationToken cancellation = default) { if (string.IsNullOrEmpty(instanceId)) @@ -181,7 +181,7 @@ await this.sidecarClient.ResumeInstanceAsync( } /// - public override async Task GetInstanceMetadataAsync( + public override async Task GetInstancesAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) { if (string.IsNullOrEmpty(instanceId)) @@ -207,7 +207,7 @@ await this.sidecarClient.ResumeInstanceAsync( } /// - public override AsyncPageable GetInstances(OrchestrationQuery? query = null) + public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null) { return Pageable.Create(async (continuation, pageSize, cancellation) => { @@ -215,23 +215,23 @@ public override AsyncPageable GetInstances(OrchestrationQ { Query = new P.InstanceQuery { - CreatedTimeFrom = query?.CreatedFrom?.ToTimestamp(), - CreatedTimeTo = query?.CreatedTo?.ToTimestamp(), - FetchInputsAndOutputs = query?.FetchInputsAndOutputs ?? false, - InstanceIdPrefix = query?.InstanceIdPrefix, - MaxInstanceCount = pageSize ?? query?.PageSize ?? OrchestrationQuery.DefaultPageSize, - ContinuationToken = continuation ?? query?.ContinuationToken, + CreatedTimeFrom = filter?.CreatedFrom?.ToTimestamp(), + CreatedTimeTo = filter?.CreatedTo?.ToTimestamp(), + FetchInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false, + InstanceIdPrefix = filter?.InstanceIdPrefix, + MaxInstanceCount = pageSize ?? filter?.PageSize ?? OrchestrationQuery.DefaultPageSize, + ContinuationToken = continuation ?? filter?.ContinuationToken, }, }; - if (query?.Statuses is not null) + if (filter?.Statuses is not null) { - request.Query.RuntimeStatus.AddRange(query.Statuses.Select(x => x.ToGrpcStatus())); + request.Query.RuntimeStatus.AddRange(filter.Statuses.Select(x => x.ToGrpcStatus())); } - if (query?.TaskHubNames is not null) + if (filter?.TaskHubNames is not null) { - request.Query.TaskHubNames.AddRange(query.TaskHubNames); + request.Query.TaskHubNames.AddRange(filter.TaskHubNames); } try @@ -239,7 +239,7 @@ public override AsyncPageable GetInstances(OrchestrationQ P.QueryInstancesResponse response = await this.sidecarClient.QueryInstancesAsync( request, cancellationToken: cancellation); - bool getInputsAndOutputs = query?.FetchInputsAndOutputs ?? false; + bool getInputsAndOutputs = filter?.FetchInputsAndOutputs ?? false; IReadOnlyList values = response.OrchestrationState .Select(x => this.CreateMetadata(x, getInputsAndOutputs)) .ToList(); @@ -249,7 +249,7 @@ public override AsyncPageable GetInstances(OrchestrationQ catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { throw new OperationCanceledException( - $"The {nameof(this.GetInstances)} operation was canceled.", e, cancellation); + $"The {nameof(this.GetInstancesAsync)} operation was canceled.", e, cancellation); } }); } @@ -305,7 +305,7 @@ public override async Task WaitForInstanceCompletionAsync } /// - public override Task PurgeInstanceMetadataAsync( + public override Task PurgeInstanceAsync( string instanceId, CancellationToken cancellation = default) { this.logger.PurgingInstanceMetadata(instanceId); @@ -315,7 +315,7 @@ public override Task PurgeInstanceMetadataAsync( } /// - public override Task PurgeInstancesAsync( + public override Task PurgeAllInstancesAsync( PurgeInstancesFilter filter, CancellationToken cancellation = default) { this.logger.PurgingInstances(filter); @@ -385,7 +385,7 @@ async Task PurgeInstancesCoreAsync( catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { throw new OperationCanceledException( - $"The {nameof(this.PurgeInstancesAsync)} operation was canceled.", e, cancellation); + $"The {nameof(this.PurgeAllInstancesAsync)} operation was canceled.", e, cancellation); } } diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index 4cc04346..0732c1a5 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -77,24 +77,24 @@ public override ValueTask DisposeAsync() throw new NotImplementedException(); } - public override Task GetInstanceMetadataAsync( + public override Task GetInstancesAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) { throw new NotImplementedException(); } - public override AsyncPageable GetInstances(OrchestrationQuery? query = null) + public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null) { throw new NotImplementedException(); } - public override Task PurgeInstanceMetadataAsync( + public override Task PurgeInstanceAsync( string instanceId, CancellationToken cancellation = default) { throw new NotImplementedException(); } - public override Task PurgeInstancesAsync( + public override Task PurgeAllInstancesAsync( PurgeInstancesFilter filter, CancellationToken cancellation = default) { throw new NotImplementedException(); @@ -127,7 +127,7 @@ public override Task SuspendInstanceAsync( throw new NotImplementedException(); } - public override Task TerminateAsync( + public override Task TerminateInstanceAsync( string instanceId, object? output = null, CancellationToken cancellation = default) { throw new NotImplementedException(); diff --git a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs index 9c85b716..d92250c0 100644 --- a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs @@ -107,24 +107,24 @@ public override ValueTask DisposeAsync() throw new NotImplementedException(); } - public override Task GetInstanceMetadataAsync( + public override Task GetInstancesAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) { throw new NotImplementedException(); } - public override AsyncPageable GetInstances(OrchestrationQuery? query = null) + public override AsyncPageable GetAllInstancesAsync(OrchestrationQuery? filter = null) { throw new NotImplementedException(); } - public override Task PurgeInstanceMetadataAsync( + public override Task PurgeInstanceAsync( string instanceId, CancellationToken cancellation = default) { throw new NotImplementedException(); } - public override Task PurgeInstancesAsync( + public override Task PurgeAllInstancesAsync( PurgeInstancesFilter filter, CancellationToken cancellation = default) { throw new NotImplementedException(); @@ -157,7 +157,7 @@ public override Task SuspendInstanceAsync( throw new NotImplementedException(); } - public override Task TerminateAsync( + public override Task TerminateInstanceAsync( string instanceId, object? output = null, CancellationToken cancellation = default) { throw new NotImplementedException(); diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 99919540..ea8aef02 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -44,12 +44,12 @@ static void AssertMetadata(OrchestrationMetadata metadata, string instanceId, Or OrchestrationName, input: shouldThrow); await server.Client.WaitForInstanceStartAsync(instanceId, default); - OrchestrationMetadata? metadata = await server.Client.GetInstanceMetadataAsync(instanceId, false); + OrchestrationMetadata? metadata = await server.Client.GetInstancesAsync(instanceId, false); AssertMetadata(metadata!, instanceId, OrchestrationRuntimeStatus.Running); await server.Client.RaiseEventAsync(instanceId, "event", default); await server.Client.WaitForInstanceCompletionAsync(instanceId, default); - metadata = await server.Client.GetInstanceMetadataAsync(instanceId, false); + metadata = await server.Client.GetInstancesAsync(instanceId, false); AssertMetadata( metadata!, instanceId, @@ -101,7 +101,7 @@ static async Task ForEachOrchestrationAsync(Func func) await ForEachOrchestrationAsync( x => server.Client.ScheduleNewOrchestrationInstanceAsync( OrchestrationName, input: false, new StartOrchestrationOptions(x))); - AsyncPageable pageable = server.Client.GetInstances(query); + AsyncPageable pageable = server.Client.GetAllInstancesAsync(query); await ForEachOrchestrationAsync(x => server.Client.WaitForInstanceStartAsync(x, default)); List metadata = await pageable.ToListAsync(); @@ -129,7 +129,7 @@ await server.Client.ScheduleNewOrchestrationInstanceAsync( OrchestrationName, input: false, new StartOrchestrationOptions($"GetInstances_AsPages_EndToEnd-{i}")); } - AsyncPageable pageable = server.Client.GetInstances(query); + AsyncPageable pageable = server.Client.GetAllInstancesAsync(query); List> pages = await pageable.AsPages(pageSizeHint: 5).ToListAsync(); pages.Should().HaveCount(5); pages.ForEach(p => p.Values.Should().HaveCount(p.ContinuationToken is null ? 1 : 5)); diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 6e8dfe38..b6af1559 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -325,7 +325,7 @@ public async Task Termination() OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken); var expectedOutput = new { quote = "I'll be back." }; - await server.Client.TerminateAsync(instanceId, expectedOutput); + await server.Client.TerminateInstanceAsync(instanceId, expectedOutput); metadata = await server.Client.WaitForInstanceCompletionAsync( instanceId, getInputsAndOutputs: true, this.TimeoutToken);