Skip to content

Commit

Permalink
Dotnet Update Features (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jan 31, 2024
1 parent 42a71ad commit 69ca251
Show file tree
Hide file tree
Showing 14 changed files with 745 additions and 47 deletions.
2 changes: 1 addition & 1 deletion dotnet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<PackageReference Include="Temporalio" Version="0.1.0-beta2">
<PackageReference Include="Temporalio" Version="1.0.0">
<!--
We have to make sure this isn't included transitively so it can be
overridden.
Expand Down
2 changes: 1 addition & 1 deletion features/update/activities/feature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Task<int> DoActivitiesAsync() =>
Enumerable.Range(0, 5).Select(_ =>
Workflow.ExecuteActivityAsync(
() => MyActivities.MyActivity(),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(5) } ))).
new() { StartToCloseTimeout = TimeSpan.FromSeconds(5) }))).
ContinueWith(vals => Enumerable.Sum(vals.Result));
}

Expand Down
60 changes: 60 additions & 0 deletions features/update/basic/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace update.basic;

using Temporalio.Exceptions;
using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowUpdate]
public async Task<string> MyUpdate(string _)
{
shutdown = true;
return "hi";
}

[WorkflowUpdateValidator(nameof(MyUpdate))]
public void ValidateMyUpdate(string arg)
{
if (arg == "invalid")
{
throw new ApplicationFailureException("invalid");
}
}
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>();

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();

var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

try
{
await handle.ExecuteUpdateAsync(wf => wf.MyUpdate("invalid"));
throw new Exception("Expected to fail");
}
catch (WorkflowUpdateFailedException)
{
// Expected
}

Assert.Equal("hi", await handle.ExecuteUpdateAsync(wf => wf.MyUpdate("valid")));
return handle;
}
}
64 changes: 64 additions & 0 deletions features/update/basic_async/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
namespace update.basic_async;

using Temporalio.Exceptions;
using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowUpdate]
public async Task<string> MyUpdate(string _)
{
shutdown = true;
return "hi";
}

[WorkflowUpdateValidator(nameof(MyUpdate))]
public void ValidateMyUpdate(string arg)
{
if (arg == "invalid")
{
throw new ApplicationFailureException("invalid");
}
}
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>();

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();

var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

var badUpdateHandle = await handle.StartUpdateAsync(wf => wf.MyUpdate("invalid"));

try
{
await badUpdateHandle.GetResultAsync();
throw new Exception("Expected to fail");
}
catch (WorkflowUpdateFailedException)
{
// Expected
}

var goodUpdateHandle = await handle.StartUpdateAsync(wf => wf.MyUpdate("valid"));
Assert.Equal("hi", await goodUpdateHandle.GetResultAsync());

return handle;
}
}
70 changes: 70 additions & 0 deletions features/update/client_interceptor/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
namespace update.client_interceptor;

using Temporalio.Client.Interceptors;
using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class MyOutboundClientInterceptor : IClientInterceptor
{
public ClientOutboundInterceptor InterceptClient(
ClientOutboundInterceptor nextInterceptor) =>
new ClientOutbound(nextInterceptor);

private sealed class ClientOutbound : ClientOutboundInterceptor
{
public ClientOutbound(ClientOutboundInterceptor next) : base(next)
{
}

public override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>(
StartWorkflowUpdateInput input)
{
var newInput = input with { Args = new object[] { "intercepted" } };
return base.StartWorkflowUpdateAsync<TResult>(newInput);
}
}
}

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowUpdate]
public async Task<string> MyUpdate(string arg)
{
shutdown = true;
return arg;
}
}

public void ConfigureClient(Runner runner, TemporalClientConnectOptions options)
{
options.Interceptors = new[] { new MyOutboundClientInterceptor() };
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options)
{
options.AddWorkflow<MyWorkflow>();
}

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();

var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

Assert.Equal("intercepted", await handle.ExecuteUpdateAsync(wf => wf.MyUpdate("Enchicat")));

return handle;
}
}
52 changes: 52 additions & 0 deletions features/update/deduplication/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace update.deduplication;

using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowUpdate]
public async Task<int> MyUpdate(bool exit)
{
shutdown = exit;
Count++;
return Count;
}

[WorkflowQuery]
public int Count { get; set; }
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options)
{
options.AddWorkflow<MyWorkflow>();
}

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();

var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

var updateId = "myid";
await handle.ExecuteUpdateAsync(wf => wf.MyUpdate(false), new() {UpdateID = updateId});
Assert.Equal(1, await handle.QueryAsync(wf => wf.Count));
await handle.ExecuteUpdateAsync(wf => wf.MyUpdate(false), new() {UpdateID = updateId});
Assert.Equal(1, await handle.QueryAsync(wf => wf.Count));
await handle.ExecuteUpdateAsync(wf => wf.MyUpdate(true));

return handle;
}
}
71 changes: 71 additions & 0 deletions features/update/non_durable_reject/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using Temporalio.Api.Enums.V1;

namespace update.non_durable_reject;

using Temporalio.Exceptions;
using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowUpdate]
public async Task<string> MyUpdate(string _)
{
shutdown = true;
return "hi";
}

[WorkflowUpdateValidator(nameof(MyUpdate))]
public void ValidateMyUpdate(string arg)
{
if (arg == "invalid")
{
throw new ApplicationFailureException("invalid");
}
}
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>();

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();

var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

for (var i = 0; i < 5; i++)
{
try
{
await handle.ExecuteUpdateAsync(wf => wf.MyUpdate("invalid"));
}
catch (WorkflowUpdateFailedException)
{
// Expected
}
}

await handle.ExecuteUpdateAsync(wf => wf.MyUpdate("valid"));
await handle.GetResultAsync();

// Verify there are no rejections written to history
var history = await handle.FetchHistoryAsync();
Assert.DoesNotContain(history.Events,
e => e.EventType == EventType.WorkflowExecutionUpdateRejected);

return handle;
}
}
Loading

0 comments on commit 69ca251

Please sign in to comment.