From af5dd85926f37a40eb14b00eac39fb49c920704c Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 1 Nov 2023 11:14:47 -0500 Subject: [PATCH] Use Go SDK dev server and add a couple of .NET update features (#364) --- .github/workflows/ci.yaml | 4 +- cmd/run.go | 17 +- dotnet.csproj | 4 +- features/update/activities/feature.cs | 53 ++++ features/update/async_accepted/feature.cs | 92 +++++++ .../dotnet/Temporalio.Features.Harness/App.cs | 4 + .../Temporalio.Features.Harness/Runner.cs | 98 +++++++- .../TestSkippedException.cs | 16 ++ harness/go/devserver/devserver.go | 238 ------------------ harness/go/devserver/freeport.go | 53 ---- 10 files changed, 272 insertions(+), 307 deletions(-) create mode 100644 features/update/activities/feature.cs create mode 100644 features/update/async_accepted/feature.cs create mode 100644 harness/dotnet/Temporalio.Features.Harness/TestSkippedException.cs delete mode 100644 harness/go/devserver/devserver.go delete mode 100644 harness/go/devserver/freeport.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 83f62864..a8cd7467 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -129,7 +129,7 @@ jobs: feature-tests-dotnet: uses: ./.github/workflows/dotnet.yaml with: - version: 0.1.0-beta1 + version: 0.1.0-beta2 version-is-repo-ref: false features-repo-ref: ${{ github.head_ref }} features-repo-path: ${{ github.event.pull_request.head.repo.full_name }} @@ -144,4 +144,4 @@ jobs: ts-ver: 'v1.5.2' java-ver: 'v1.22.0' py-ver: 'v1.0.0' - cs-ver: 'v0.1.0-beta1' + cs-ver: 'v0.1.0-beta2' diff --git a/cmd/run.go b/cmd/run.go index 21ca805f..95b283c0 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -20,13 +20,13 @@ import ( "github.com/google/uuid" "github.com/pmezard/go-difflib/difflib" "github.com/temporalio/features/harness/go/cmd" - "github.com/temporalio/features/harness/go/devserver" "github.com/temporalio/features/harness/go/harness" "github.com/temporalio/features/harness/go/history" "github.com/temporalio/features/sdkbuild" "github.com/urfave/cli/v2" "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/testsuite" ) const ( @@ -195,17 +195,22 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { // If the server is not set, start it ourselves if r.config.Server == "" { - server, err := devserver.Start(devserver.Options{ - Log: r.log, + server, err := testsuite.StartDevServer(ctx, testsuite.DevServerOptions{ // TODO(cretz): Configurable? - LogLevel: "error", - Namespace: r.config.Namespace, + LogLevel: "error", + ClientOptions: &client.Options{Namespace: r.config.Namespace}, + ExtraArgs: []string{ + "--dynamic-config-value", "system.forceSearchAttributesCacheRefreshOnRead=true", + "--dynamic-config-value", "system.enableActivityEagerExecution=true", + "--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true", + "--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true", + }, }) if err != nil { return fmt.Errorf("failed starting devserver: %w", err) } defer server.Stop() - r.config.Server = server.FrontendHostPort + r.config.Server = server.FrontendHostPort() r.log.Info("Started server", "HostPort", r.config.Server) } else { // Wait for namespace to become available diff --git a/dotnet.csproj b/dotnet.csproj index bd5a6157..151fb48e 100644 --- a/dotnet.csproj +++ b/dotnet.csproj @@ -23,17 +23,19 @@ - + all + + \ No newline at end of file diff --git a/features/update/activities/feature.cs b/features/update/activities/feature.cs new file mode 100644 index 00000000..07a61221 --- /dev/null +++ b/features/update/activities/feature.cs @@ -0,0 +1,53 @@ +namespace update.activities; + +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Features.Harness; +using Temporalio.Worker; +using Temporalio.Workflows; +using Xunit; + +class Feature : IFeature +{ + [Workflow] + class MyWorkflow + { + private bool shutdown; + + [WorkflowRun] + public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown); + + [WorkflowSignal] + public async Task ShutdownAsync() => shutdown = true; + + [WorkflowUpdate] + public Task DoActivitiesAsync() => + // Run 5 activities and sum the results + Task.WhenAll( + Enumerable.Range(0, 5).Select(_ => + Workflow.ExecuteActivityAsync( + () => MyActivities.MyActivity(), + new() { StartToCloseTimeout = TimeSpan.FromSeconds(5) } ))). + ContinueWith(vals => Enumerable.Sum(vals.Result)); + } + + class MyActivities + { + [Activity] + public static int MyActivity() => 6; + } + + public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) => + options.AddWorkflow().AddAllActivities(null); + + public async Task ExecuteAsync(Runner runner) + { + await runner.SkipIfUpdateNotSupportedAsync(); + var handle = await runner.Client.StartWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + runner.NewWorkflowOptions()); + Assert.Equal(30, await handle.ExecuteUpdateAsync(wf => wf.DoActivitiesAsync())); + await handle.SignalAsync(wf => wf.ShutdownAsync()); + return handle; + } +} \ No newline at end of file diff --git a/features/update/async_accepted/feature.cs b/features/update/async_accepted/feature.cs new file mode 100644 index 00000000..a6135272 --- /dev/null +++ b/features/update/async_accepted/feature.cs @@ -0,0 +1,92 @@ +namespace update.async_accepted; + +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Exceptions; +using Temporalio.Features.Harness; +using Temporalio.Worker; +using Temporalio.Workflows; +using Xunit; + +class Feature : IFeature +{ + [Workflow] + class MyWorkflow + { + private bool shutdown; + private bool finishUpdate; + + [WorkflowRun] + public Task RunAsync() => Workflow.WaitConditionAsync(() => shutdown); + + [WorkflowSignal] + public async Task ShutdownAsync() => shutdown = true; + + [WorkflowSignal] + public async Task FinishUpdateAsync() => finishUpdate = true; + + [WorkflowUpdate] + public async Task SuccessfulUpdateAsync() + { + await Workflow.WaitConditionAsync(() => finishUpdate); + finishUpdate = false; + return 123; + } + + [WorkflowUpdate] + public async Task FailureUpdateAsync() + { + await Workflow.WaitConditionAsync(() => finishUpdate); + finishUpdate = false; + throw new ApplicationFailureException("Intentional failure"); + } + } + + class MyActivities + { + [Activity] + public static int MyActivity() => 6; + } + + public void ConfigureWorker(Runner runner, TemporalWorkerOptions options) => + options.AddWorkflow().AddAllActivities(null); + + public async Task ExecuteAsync(Runner runner) + { + await runner.SkipIfAsyncUpdateNotSupportedAsync(); + + // Start workflow + var handle = await runner.Client.StartWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + runner.NewWorkflowOptions()); + + // Start update + var updateHandle1 = await handle.StartUpdateAsync(wf => wf.SuccessfulUpdateAsync()); + // Send signal to finish the update + await handle.SignalAsync(wf => wf.FinishUpdateAsync()); + // Confirm result + Assert.Equal(123, await updateHandle1.GetResultAsync()); + // Create another handle and confirm its result is the same + Assert.Equal(123, await handle.GetUpdateHandle(updateHandle1.Id).GetResultAsync()); + + // Start a failed update + var updateHandle2 = await handle.StartUpdateAsync(wf => wf.FailureUpdateAsync()); + // Send signal to finish the update + await handle.SignalAsync(wf => wf.FinishUpdateAsync()); + // Confirm failure + var exc = await Assert.ThrowsAsync( + () => updateHandle2.GetResultAsync()); + Assert.Equal("Intentional failure", exc.InnerException?.Message); + + // Start an update but cancel/timeout waiting on its result + var updateHandle3 = await handle.StartUpdateAsync(wf => wf.SuccessfulUpdateAsync()); + // Wait for result only for 100ms + using var tokenSource = new CancellationTokenSource(); + tokenSource.CancelAfter(TimeSpan.FromMilliseconds(100)); + await Assert.ThrowsAsync(() => + updateHandle3.GetResultAsync(new() { CancellationToken = tokenSource.Token })); + + await handle.SignalAsync(wf => wf.ShutdownAsync()); + return handle; + } +} \ No newline at end of file diff --git a/harness/dotnet/Temporalio.Features.Harness/App.cs b/harness/dotnet/Temporalio.Features.Harness/App.cs index 4df91d79..b1b99ec4 100644 --- a/harness/dotnet/Temporalio.Features.Harness/App.cs +++ b/harness/dotnet/Temporalio.Features.Harness/App.cs @@ -90,6 +90,10 @@ private static async Task RunCommandAsync(InvocationContext ctx) { await new Runner(client, taskQueue, feature, loggerFactory).RunAsync(ctx.GetCancellationToken()); } + catch (TestSkippedException e) + { + logger.LogInformation("Feature {Feature} skipped: {Reason}", feature.Dir, e.Message); + } catch(Exception e) { logger.LogError(e, "Feature {Feature} failed", feature.Dir); diff --git a/harness/dotnet/Temporalio.Features.Harness/Runner.cs b/harness/dotnet/Temporalio.Features.Harness/Runner.cs index 580b7ba3..dad615cb 100644 --- a/harness/dotnet/Temporalio.Features.Harness/Runner.cs +++ b/harness/dotnet/Temporalio.Features.Harness/Runner.cs @@ -1,13 +1,17 @@ namespace Temporalio.Features.Harness; using Temporalio.Client; +using Temporalio.Exceptions; using Temporalio.Worker; +using Temporalio.Workflows; /// /// Runner for running features. /// public class Runner { + private bool? maybeUpdateSupported; + internal Runner( ITemporalClient client, string taskQueue, @@ -64,15 +68,15 @@ public Task StartSingleParameterlessWorkflowAsync() { var workflow = WorkerOptions.Workflows.SingleOrDefault() ?? throw new InvalidOperationException("Must have a single workflow"); - return Client.StartWorkflowAsync( - workflow.Name!, - Array.Empty(), - new(id: $"{PreparedFeature.Dir}-{Guid.NewGuid()}", taskQueue: WorkerOptions.TaskQueue!) - { - ExecutionTimeout = TimeSpan.FromMinutes(1) - }); + return Client.StartWorkflowAsync(workflow.Name!, Array.Empty(), NewWorkflowOptions()); } + public WorkflowOptions NewWorkflowOptions() => + new(id: $"{PreparedFeature.Dir}-{Guid.NewGuid()}", taskQueue: WorkerOptions.TaskQueue!) + { + ExecutionTimeout = TimeSpan.FromMinutes(1) + }; + /// /// Checks the current history for the given handle using the replayer. /// @@ -101,4 +105,84 @@ public async Task CheckCurrentHistoryAsync(WorkflowHandle handle) throw new InvalidOperationException("Replay failed", e); } } + + /// + /// Throw skip exception if update not supported. + /// + /// Task for completion. + /// If update not supported. + public async Task SkipIfUpdateNotSupportedAsync() + { + if (await CheckUpdateSupportedAsync()) + { + return; + } + throw new TestSkippedException("Update not supported"); + } + + /// + /// Check if update not supported. + /// + /// True if supported, false if not. + public Task CheckUpdateSupportedAsync() => + CheckUpdateSupportCallAsync(() => + Client.GetWorkflowHandle("does-not-exist").ExecuteUpdateAsync( + "does-not-exist", Array.Empty())); + + /// + /// Throw skip exception if async update not supported. + /// + /// Task for completion. + /// If async update not supported. + public async Task SkipIfAsyncUpdateNotSupportedAsync() + { + if (await CheckAsyncUpdateSupportedAsync()) + { + return; + } + throw new TestSkippedException("Async update not supported"); + } + + /// + /// Check if async update not supported. + /// + /// True if supported, false if not. + public Task CheckAsyncUpdateSupportedAsync() => + CheckUpdateSupportCallAsync(() => + Client.GetWorkflowHandle("does-not-exist").StartUpdateAsync( + "does-not-exist", Array.Empty())); + + private async Task CheckUpdateSupportCallAsync(Func failingFunc) + { + // Don't care about races + if (maybeUpdateSupported == null) + { + try + { + try + { + await failingFunc(); + throw new InvalidOperationException("Unexpected success"); + } + catch (AggregateException e) + { + // Bug with agg exception: https://github.com/temporalio/sdk-dotnet/issues/151 + throw e.InnerExceptions.Single(); + } + } + catch (RpcException e) when (e.Code == RpcException.StatusCode.NotFound) + { + // Not found workflow means update does exist + maybeUpdateSupported = true; + } + catch (RpcException e) when ( + e.Code == RpcException.StatusCode.Unimplemented || e.Code == RpcException.StatusCode.PermissionDenied) + { + // Not implemented or permission denied means not supported, + // everything else is an error + maybeUpdateSupported = false; + } + } + return maybeUpdateSupported.Value; + } } \ No newline at end of file diff --git a/harness/dotnet/Temporalio.Features.Harness/TestSkippedException.cs b/harness/dotnet/Temporalio.Features.Harness/TestSkippedException.cs new file mode 100644 index 00000000..be621de6 --- /dev/null +++ b/harness/dotnet/Temporalio.Features.Harness/TestSkippedException.cs @@ -0,0 +1,16 @@ +namespace Temporalio.Features.Harness; + +/// +/// Exception used to skip a feature. +/// +public class TestSkippedException : Exception +{ + /// + /// Create exception. + /// + /// Reason for skipping. + public TestSkippedException(string message) + : base(message) + { + } +} \ No newline at end of file diff --git a/harness/go/devserver/devserver.go b/harness/go/devserver/devserver.go deleted file mode 100644 index 8f526860..00000000 --- a/harness/go/devserver/devserver.go +++ /dev/null @@ -1,238 +0,0 @@ -package devserver - -import ( - "archive/tar" - "archive/zip" - "bytes" - "compress/gzip" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "os/exec" - "path/filepath" - "runtime" - "strconv" - "strings" - - "github.com/temporalio/features/harness/go/harness" - "go.temporal.io/sdk/log" -) - -// DefaultVersion is the default DevServer version when not provided. -const DefaultVersion = "v0.2.0" - -// DevServer is a running DevServer instance. -type DevServer struct { - // The frontend host:port for use with Temporal SDK client. - FrontendHostPort string - cmd *exec.Cmd -} - -// DevServer start options. -type Options struct { - // This logger is only used by this process, not DevServer - Log log.Logger - // Defaults to random free port - GetFrontendPort func() (int, error) - // Defaults to "default" - Namespace string - // Defaults to DefaultVersion - Version string - // Defaults to unset - LogLevel string - // TODO(cretz): Other DevServer options? -} - -// Start a DevServer server. This may download the server if not already -// downloaded. -func Start(options Options) (*DevServer, error) { - if options.Log == nil { - options.Log = harness.DefaultLogger - } - if options.GetFrontendPort == nil { - options.GetFrontendPort = func() (port int, err error) { - prov := newPortProvider() - defer prov.Close() - port, err = prov.GetFreePort() - return - } - } - if options.Namespace == "" { - options.Namespace = "default" - } - if options.Version == "" { - options.Version = DefaultVersion - } else if !strings.HasPrefix(options.Version, "v") { - return nil, fmt.Errorf("version must have 'v' prefix") - } - - // Download if necessary - exePath, err := options.loadExePath() - if err != nil { - return nil, err - } - - // DevServer has no way to give us the port they chose, so we have to find - // our own free port - port, err := options.GetFrontendPort() - if err != nil { - return nil, fmt.Errorf("failed getting free port: %w", err) - } - portStr := strconv.Itoa(port) - - // Start - args := []string{ - "server", "start-dev", - "--headless", "--namespace", options.Namespace, "--port", portStr, - "--dynamic-config-value", "system.forceSearchAttributesCacheRefreshOnRead=true", - "--dynamic-config-value", "system.enableActivityEagerExecution=true", - } - if options.LogLevel != "" { - args = append(args, "--log-level", options.LogLevel) - } - cmd := exec.Command(exePath, args...) - cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr - options.Log.Info("Starting DevServer", "ExePath", exePath, "Args", args) - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed starting: %w", err) - } - - err = harness.WaitNamespaceAvailable(context.Background(), options.Log, - "127.0.0.1:"+portStr, options.Namespace, "", "") - if err != nil { - return nil, err - } - return &DevServer{FrontendHostPort: "127.0.0.1:" + portStr, cmd: cmd}, nil -} - -// Stop the running DevServer server and wait for it to stop. This errors if -// DevServer returned a failed exit code. -func (t *DevServer) Stop() error { - if err := t.cmd.Process.Kill(); err != nil { - return err - } - return t.cmd.Wait() -} - -func (o *Options) loadExePath() (string, error) { - // Build path based on version and check if already present - exePath := filepath.Join(os.TempDir(), "features-cli-"+o.Version) - if runtime.GOOS == "windows" { - exePath += ".exe" - } - if _, err := os.Stat(exePath); err == nil { - return exePath, nil - } - - // Build info URL - platform := runtime.GOOS - if platform != "windows" && platform != "darwin" && platform != "linux" { - return "", fmt.Errorf("unrecognized platform %v", platform) - } - arch := runtime.GOARCH - if arch != "amd64" && arch != "arm64" { - return "", fmt.Errorf("unrecognized architecture %v", arch) - } - infoURL := fmt.Sprintf("https://temporal.download/cli/%v?platform=%v&arch=%v", o.Version, platform, arch) - - // Get info - info := struct { - ArchiveURL string `json:"archiveUrl"` - FileToExtract string `json:"fileToExtract"` - }{} - resp, err := http.Get(infoURL) - if err != nil { - return "", fmt.Errorf("failed fetching info: %w", err) - } - b, err := io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - return "", fmt.Errorf("failed fetching info body: %w", err) - } else if resp.StatusCode != 200 { - return "", fmt.Errorf("failed fetching info, status: %v, body: %s", resp.Status, b) - } else if err = json.Unmarshal(b, &info); err != nil { - return "", fmt.Errorf("failed unmarshaling info: %w", err) - } - - // Download and extract - o.Log.Info("Downloading CLI", "Url", info.ArchiveURL, "ExePath", exePath) - resp, err = http.Get(info.ArchiveURL) - if err != nil { - return "", fmt.Errorf("failed downloading: %w", err) - } - defer resp.Body.Close() - // We want to download to a temporary file then rename. A better system-wide - // atomic downloader would use a common temp file and check whether it exists - // and wait on it, but doing multiple downloads in racy situations is - // good/simple enough for now. - f, err := os.CreateTemp("", "cli-downloading-") - if err != nil { - return "", fmt.Errorf("failed creating temp file: %w", err) - } - if strings.HasSuffix(info.ArchiveURL, ".tar.gz") { - err = extractTarball(resp.Body, info.FileToExtract, f) - } else if strings.HasSuffix(info.ArchiveURL, ".zip") { - err = extractZip(resp.Body, info.FileToExtract, f) - } else { - err = fmt.Errorf("unrecognized file extension on %v", info.ArchiveURL) - } - f.Close() - if err != nil { - return "", err - } - // Chmod it if not Windows - if runtime.GOOS != "windows" { - if err := os.Chmod(f.Name(), 0777); err != nil { - return "", fmt.Errorf("failed chmod'ing file: %w", err) - } - } - if err = os.Rename(f.Name(), exePath); err != nil { - return "", fmt.Errorf("failed moving file: %w", err) - } - return exePath, nil -} - -func extractTarball(r io.Reader, toExtract string, w io.Writer) error { - r, err := gzip.NewReader(r) - if err != nil { - return err - } - tarRead := tar.NewReader(r) - for { - h, err := tarRead.Next() - if err != nil { - // This can be EOF which means we never found our file - return err - } else if h.Name == toExtract { - _, err = io.Copy(w, tarRead) - return err - } - } -} - -func extractZip(r io.Reader, toExtract string, w io.Writer) error { - // Instead of using a third party zip streamer, and since Go stdlib doesn't - // support streaming read, we'll just put the entire archive in memory for now - b, err := io.ReadAll(r) - if err != nil { - return err - } - zipRead, err := zip.NewReader(bytes.NewReader(b), int64(len(b))) - if err != nil { - return err - } - for _, file := range zipRead.File { - if file.Name == toExtract { - r, err := file.Open() - if err != nil { - return err - } - _, err = io.Copy(w, r) - return err - } - } - return fmt.Errorf("could not find file in zip archive") -} diff --git a/harness/go/devserver/freeport.go b/harness/go/devserver/freeport.go deleted file mode 100644 index ccd8283c..00000000 --- a/harness/go/devserver/freeport.go +++ /dev/null @@ -1,53 +0,0 @@ -package devserver - -import ( - "fmt" - "net" -) - -// Modified from Temporalite which itself modified from -// https://github.com/phayes/freeport/blob/95f893ade6f232a5f1511d61735d89b1ae2df543/freeport.go - -func newPortProvider() *portProvider { - return &portProvider{} -} - -type portProvider struct { - listeners []*net.TCPListener -} - -// GetFreePort asks the kernel for a free open port that is ready to use. -func (p *portProvider) GetFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") - if err != nil { - if addr, err = net.ResolveTCPAddr("tcp6", "[::1]:0"); err != nil { - panic(fmt.Sprintf("cli: failed to get free port: %v", err)) - } - } - - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - - p.listeners = append(p.listeners, l) - - return l.Addr().(*net.TCPAddr).Port, nil -} - -func (p *portProvider) MustGetFreePort() int { - port, err := p.GetFreePort() - if err != nil { - panic(err) - } - return port -} - -func (p *portProvider) Close() error { - for _, l := range p.listeners { - if err := l.Close(); err != nil { - return err - } - } - return nil -}