Skip to content

Commit

Permalink
Use Go SDK dev server and add a couple of .NET update features (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Nov 1, 2023
1 parent 7fb593e commit af5dd85
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 307 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
feature-tests-dotnet:
uses: ./.github/workflows/dotnet.yaml
with:
version: 0.1.0-beta1
version: 0.1.0-beta2
version-is-repo-ref: false
features-repo-ref: ${{ github.head_ref }}
features-repo-path: ${{ github.event.pull_request.head.repo.full_name }}
Expand All @@ -144,4 +144,4 @@ jobs:
ts-ver: 'v1.5.2'
java-ver: 'v1.22.0'
py-ver: 'v1.0.0'
cs-ver: 'v0.1.0-beta1'
cs-ver: 'v0.1.0-beta2'
17 changes: 11 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"github.com/google/uuid"
"github.com/pmezard/go-difflib/difflib"
"github.com/temporalio/features/harness/go/cmd"
"github.com/temporalio/features/harness/go/devserver"
"github.com/temporalio/features/harness/go/harness"
"github.com/temporalio/features/harness/go/history"
"github.com/temporalio/features/sdkbuild"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/testsuite"
)

const (
Expand Down Expand Up @@ -195,17 +195,22 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error {

// If the server is not set, start it ourselves
if r.config.Server == "" {
server, err := devserver.Start(devserver.Options{
Log: r.log,
server, err := testsuite.StartDevServer(ctx, testsuite.DevServerOptions{
// TODO(cretz): Configurable?
LogLevel: "error",
Namespace: r.config.Namespace,
LogLevel: "error",
ClientOptions: &client.Options{Namespace: r.config.Namespace},
ExtraArgs: []string{
"--dynamic-config-value", "system.forceSearchAttributesCacheRefreshOnRead=true",
"--dynamic-config-value", "system.enableActivityEagerExecution=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
},
})
if err != nil {
return fmt.Errorf("failed starting devserver: %w", err)
}
defer server.Stop()
r.config.Server = server.FrontendHostPort
r.config.Server = server.FrontendHostPort()
r.log.Info("Started server", "HostPort", r.config.Server)
} else {
// Wait for namespace to become available
Expand Down
4 changes: 3 additions & 1 deletion dotnet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<PackageReference Include="Temporalio" Version="0.1.0-beta1">
<PackageReference Include="Temporalio" Version="0.1.0-beta2">
<!--
We have to make sure this isn't included transitively so it can be
overridden.
-->
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="XUnit.Assert" Version="2.5.3" />
</ItemGroup>

<ItemGroup>
<Using Include="Microsoft.Extensions.Logging" />
<Using Include="Xunit" />
</ItemGroup>

</Project>
53 changes: 53 additions & 0 deletions features/update/activities/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace update.activities;

using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;
using Xunit;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowSignal]
public async Task ShutdownAsync() => shutdown = true;

[WorkflowUpdate]
public Task<int> DoActivitiesAsync() =>
// Run 5 activities and sum the results
Task.WhenAll(
Enumerable.Range(0, 5).Select(_ =>
Workflow.ExecuteActivityAsync(
() => MyActivities.MyActivity(),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(5) } ))).
ContinueWith(vals => Enumerable.Sum(vals.Result));
}

class MyActivities
{
[Activity]
public static int MyActivity() => 6;
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>().AddAllActivities<MyActivities>(null);

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfUpdateNotSupportedAsync();
var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());
Assert.Equal(30, await handle.ExecuteUpdateAsync(wf => wf.DoActivitiesAsync()));
await handle.SignalAsync(wf => wf.ShutdownAsync());
return handle;
}
}
92 changes: 92 additions & 0 deletions features/update/async_accepted/feature.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace update.async_accepted;

using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Exceptions;
using Temporalio.Features.Harness;
using Temporalio.Worker;
using Temporalio.Workflows;
using Xunit;

class Feature : IFeature
{
[Workflow]
class MyWorkflow
{
private bool shutdown;
private bool finishUpdate;

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown);

[WorkflowSignal]
public async Task ShutdownAsync() => shutdown = true;

[WorkflowSignal]
public async Task FinishUpdateAsync() => finishUpdate = true;

[WorkflowUpdate]
public async Task<int> SuccessfulUpdateAsync()
{
await Workflow.WaitConditionAsync(() => finishUpdate);
finishUpdate = false;
return 123;
}

[WorkflowUpdate]
public async Task FailureUpdateAsync()
{
await Workflow.WaitConditionAsync(() => finishUpdate);
finishUpdate = false;
throw new ApplicationFailureException("Intentional failure");
}
}

class MyActivities
{
[Activity]
public static int MyActivity() => 6;
}

public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) =>
options.AddWorkflow<MyWorkflow>().AddAllActivities<MyActivities>(null);

public async Task<WorkflowHandle?> ExecuteAsync(Runner runner)
{
await runner.SkipIfAsyncUpdateNotSupportedAsync();

// Start workflow
var handle = await runner.Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
runner.NewWorkflowOptions());

// Start update
var updateHandle1 = await handle.StartUpdateAsync(wf => wf.SuccessfulUpdateAsync());
// Send signal to finish the update
await handle.SignalAsync(wf => wf.FinishUpdateAsync());
// Confirm result
Assert.Equal(123, await updateHandle1.GetResultAsync());
// Create another handle and confirm its result is the same
Assert.Equal(123, await handle.GetUpdateHandle<int>(updateHandle1.Id).GetResultAsync());

// Start a failed update
var updateHandle2 = await handle.StartUpdateAsync(wf => wf.FailureUpdateAsync());
// Send signal to finish the update
await handle.SignalAsync(wf => wf.FinishUpdateAsync());
// Confirm failure
var exc = await Assert.ThrowsAsync<WorkflowUpdateFailedException>(
() => updateHandle2.GetResultAsync());
Assert.Equal("Intentional failure", exc.InnerException?.Message);

// Start an update but cancel/timeout waiting on its result
var updateHandle3 = await handle.StartUpdateAsync(wf => wf.SuccessfulUpdateAsync());
// Wait for result only for 100ms
using var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromMilliseconds(100));
await Assert.ThrowsAsync<OperationCanceledException>(() =>
updateHandle3.GetResultAsync(new() { CancellationToken = tokenSource.Token }));

await handle.SignalAsync(wf => wf.ShutdownAsync());
return handle;
}
}
4 changes: 4 additions & 0 deletions harness/dotnet/Temporalio.Features.Harness/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private static async Task RunCommandAsync(InvocationContext ctx)
{
await new Runner(client, taskQueue, feature, loggerFactory).RunAsync(ctx.GetCancellationToken());
}
catch (TestSkippedException e)
{
logger.LogInformation("Feature {Feature} skipped: {Reason}", feature.Dir, e.Message);
}
catch(Exception e)
{
logger.LogError(e, "Feature {Feature} failed", feature.Dir);
Expand Down
98 changes: 91 additions & 7 deletions harness/dotnet/Temporalio.Features.Harness/Runner.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
namespace Temporalio.Features.Harness;

using Temporalio.Client;
using Temporalio.Exceptions;
using Temporalio.Worker;
using Temporalio.Workflows;

/// <summary>
/// Runner for running features.
/// </summary>
public class Runner
{
private bool? maybeUpdateSupported;

internal Runner(
ITemporalClient client,
string taskQueue,
Expand Down Expand Up @@ -64,15 +68,15 @@ public Task<WorkflowHandle> StartSingleParameterlessWorkflowAsync()
{
var workflow = WorkerOptions.Workflows.SingleOrDefault() ??
throw new InvalidOperationException("Must have a single workflow");
return Client.StartWorkflowAsync(
workflow.Name!,
Array.Empty<object?>(),
new(id: $"{PreparedFeature.Dir}-{Guid.NewGuid()}", taskQueue: WorkerOptions.TaskQueue!)
{
ExecutionTimeout = TimeSpan.FromMinutes(1)
});
return Client.StartWorkflowAsync(workflow.Name!, Array.Empty<object?>(), NewWorkflowOptions());
}

public WorkflowOptions NewWorkflowOptions() =>
new(id: $"{PreparedFeature.Dir}-{Guid.NewGuid()}", taskQueue: WorkerOptions.TaskQueue!)
{
ExecutionTimeout = TimeSpan.FromMinutes(1)
};

/// <summary>
/// Checks the current history for the given handle using the replayer.
/// </summary>
Expand Down Expand Up @@ -101,4 +105,84 @@ public async Task CheckCurrentHistoryAsync(WorkflowHandle handle)
throw new InvalidOperationException("Replay failed", e);
}
}

/// <summary>
/// Throw skip exception if update not supported.
/// </summary>
/// <returns>Task for completion.</returns>
/// <exception cref="TestSkippedException">If update not supported.</exception>
public async Task SkipIfUpdateNotSupportedAsync()
{
if (await CheckUpdateSupportedAsync())
{
return;
}
throw new TestSkippedException("Update not supported");
}

/// <summary>
/// Check if update not supported.
/// </summary>
/// <returns>True if supported, false if not.</returns>
public Task<bool> CheckUpdateSupportedAsync() =>
CheckUpdateSupportCallAsync(() =>
Client.GetWorkflowHandle("does-not-exist").ExecuteUpdateAsync(
"does-not-exist", Array.Empty<object?>()));

/// <summary>
/// Throw skip exception if async update not supported.
/// </summary>
/// <returns>Task for completion.</returns>
/// <exception cref="TestSkippedException">If async update not supported.</exception>
public async Task SkipIfAsyncUpdateNotSupportedAsync()
{
if (await CheckAsyncUpdateSupportedAsync())
{
return;
}
throw new TestSkippedException("Async update not supported");
}

/// <summary>
/// Check if async update not supported.
/// </summary>
/// <returns>True if supported, false if not.</returns>
public Task<bool> CheckAsyncUpdateSupportedAsync() =>
CheckUpdateSupportCallAsync(() =>
Client.GetWorkflowHandle("does-not-exist").StartUpdateAsync(
"does-not-exist", Array.Empty<object?>()));

private async Task<bool> CheckUpdateSupportCallAsync(Func<Task> failingFunc)
{
// Don't care about races
if (maybeUpdateSupported == null)
{
try
{
try
{
await failingFunc();
throw new InvalidOperationException("Unexpected success");
}
catch (AggregateException e)
{
// Bug with agg exception: https://github.com/temporalio/sdk-dotnet/issues/151
throw e.InnerExceptions.Single();
}
}
catch (RpcException e) when (e.Code == RpcException.StatusCode.NotFound)
{
// Not found workflow means update does exist
maybeUpdateSupported = true;
}
catch (RpcException e) when (
e.Code == RpcException.StatusCode.Unimplemented || e.Code == RpcException.StatusCode.PermissionDenied)
{
// Not implemented or permission denied means not supported,
// everything else is an error
maybeUpdateSupported = false;
}
}
return maybeUpdateSupported.Value;
}
}
16 changes: 16 additions & 0 deletions harness/dotnet/Temporalio.Features.Harness/TestSkippedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Temporalio.Features.Harness;

/// <summary>
/// Exception used to skip a feature.
/// </summary>
public class TestSkippedException : Exception
{
/// <summary>
/// Create exception.
/// </summary>
/// <param name="message">Reason for skipping.</param>
public TestSkippedException(string message)
: base(message)
{
}
}
Loading

0 comments on commit af5dd85

Please sign in to comment.