diff --git a/docker/github/docker-compose.yaml b/docker/github/docker-compose.yaml index 650007bf9..a2d270b00 100644 --- a/docker/github/docker-compose.yaml +++ b/docker/github/docker-compose.yaml @@ -34,11 +34,13 @@ services: - "6934:6934" - "6935:6935" - "6939:6939" + - "7243:7243" environment: - "CASSANDRA_SEEDS=cassandra" - "ENABLE_ES=true" - "ES_SEEDS=elasticsearch" - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" + - "FRONTEND_HTTP_PORT=7243" depends_on: - cassandra - elasticsearch diff --git a/docker/github/dynamicconfig/development.yaml b/docker/github/dynamicconfig/development.yaml index f3fa56512..7a0b9db0e 100644 --- a/docker/github/dynamicconfig/development.yaml +++ b/docker/github/dynamicconfig/development.yaml @@ -23,7 +23,7 @@ worker.removableBuildIdDurationSinceDefault: system.enableNexus: - value: true component.nexusoperations.callback.endpoint.template: - - value: http://localhost:7243/api/v1/namespaces/{{.NamespaceName}}/nexus/callback + - value: http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback component.callbacks.allowedAddresses: - value: - Pattern: "localhost:7243" diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index 0e02603f0..f53daeb2f 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -37,6 +37,7 @@ import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.common.interceptors.WorkflowClientInterceptor; import io.temporal.internal.WorkflowThreadMarker; +import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.client.RootWorkflowClientInvoker; import io.temporal.internal.client.WorkerFactoryRegistry; import io.temporal.internal.client.WorkflowClientInternal; @@ -567,4 +568,16 @@ public void registerWorkerFactory(WorkerFactory workerFactory) { public void deregisterWorkerFactory(WorkerFactory workerFactory) { workerFactoryRegistry.deregister(workerFactory); } + + @Override + public WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow) { + enforceNonWorkflowThread(); + WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START_NEXUS, request); + try { + workflow.apply(); + return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class); + } finally { + WorkflowInvocationHandler.closeAsyncInvocation(); + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index 0bfa46048..1497ed91a 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -20,6 +20,8 @@ package io.temporal.client; +import static io.temporal.internal.common.InternalUtils.createNexusBoundStub; + import com.google.common.base.Defaults; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; @@ -30,6 +32,7 @@ import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata; import io.temporal.common.metadata.POJOWorkflowMethodMetadata; import io.temporal.common.metadata.WorkflowMethodType; +import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.sync.StubMarker; import io.temporal.workflow.QueryMethod; import io.temporal.workflow.SignalMethod; @@ -37,8 +40,7 @@ import io.temporal.workflow.WorkflowMethod; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; -import java.util.Objects; -import java.util.Optional; +import java.util.*; /** * Dynamic implementation of a strongly typed workflow interface that can be used to start, signal @@ -51,6 +53,7 @@ public enum InvocationType { START, EXECUTE, SIGNAL_WITH_START, + START_NEXUS, UPDATE_WITH_START } @@ -87,6 +90,9 @@ static void initAsyncInvocation(InvocationType type, T value) { } else if (type == InvocationType.SIGNAL_WITH_START) { SignalWithStartBatchRequest batch = (SignalWithStartBatchRequest) value; invocationContext.set(new SignalWithStartWorkflowInvocationHandler(batch)); + } else if (type == InvocationType.START_NEXUS) { + NexusStartWorkflowRequest request = (NexusStartWorkflowRequest) value; + invocationContext.set(new StartNexusOperationInvocationHandler(request)); } else if (type == InvocationType.UPDATE_WITH_START) { UpdateWithStartWorkflowOperation operation = (UpdateWithStartWorkflowOperation) value; invocationContext.set(new UpdateWithStartInvocationHandler(operation)); @@ -400,6 +406,41 @@ public R getResult(Class resultClass) { } } + private static class StartNexusOperationInvocationHandler implements SpecificInvocationHandler { + private final NexusStartWorkflowRequest request; + private Object result; + + public StartNexusOperationInvocationHandler(NexusStartWorkflowRequest request) { + this.request = request; + } + + @Override + public InvocationType getInvocationType() { + return InvocationType.START_NEXUS; + } + + @Override + public void invoke( + POJOWorkflowInterfaceMetadata workflowMetadata, + WorkflowStub untyped, + Method method, + Object[] args) { + WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); + if (workflowMethod == null) { + throw new IllegalArgumentException( + "Only on a method annotated with @WorkflowMethod can be used to start a Nexus operation."); + } + + result = createNexusBoundStub(untyped, request).start(args); + } + + @Override + @SuppressWarnings("unchecked") + public R getResult(Class resultClass) { + return (R) result; + } + } + private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler { enum State { diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java index fe7954c76..f3c770c3b 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java @@ -21,6 +21,7 @@ package io.temporal.client; import com.google.common.base.Objects; +import io.temporal.api.common.v1.Callback; import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; import io.temporal.common.*; @@ -79,6 +80,8 @@ public static WorkflowOptions merge( .setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy()) .setStaticSummary(o.getStaticSummary()) .setStaticDetails(o.getStaticDetails()) + .setRequestId(o.getRequestId()) + .setCompletionCallbacks(o.getCompletionCallbacks()) .validateBuildWithDefaults(); } @@ -112,12 +115,16 @@ public static final class Builder { private Duration startDelay; - private WorkflowIdConflictPolicy workflowIdConflictpolicy; + private WorkflowIdConflictPolicy workflowIdConflictPolicy; private String staticSummary; private String staticDetails; + private String requestId; + + private List completionCallbacks; + private Builder() {} private Builder(WorkflowOptions options) { @@ -138,9 +145,11 @@ private Builder(WorkflowOptions options) { this.contextPropagators = options.contextPropagators; this.disableEagerExecution = options.disableEagerExecution; this.startDelay = options.startDelay; - this.workflowIdConflictpolicy = options.workflowIdConflictpolicy; + this.workflowIdConflictPolicy = options.workflowIdConflictPolicy; this.staticSummary = options.staticSummary; this.staticDetails = options.staticDetails; + this.requestId = options.requestId; + this.completionCallbacks = options.completionCallbacks; } /** @@ -191,8 +200,8 @@ public Builder setWorkflowIdReusePolicy(WorkflowIdReusePolicy workflowIdReusePol *
  • TerminateExisting Terminate the running workflow before starting a new one. * */ - public Builder setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy workflowIdConflictpolicy) { - this.workflowIdConflictpolicy = workflowIdConflictpolicy; + public Builder setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy workflowIdConflictPolicy) { + this.workflowIdConflictPolicy = workflowIdConflictPolicy; return this; } @@ -413,6 +422,28 @@ public Builder setStaticDetails(String staticDetails) { return this; } + /** + * A unique identifier for this start request. + * + *

    WARNING: Not intended for User Code. + */ + @Experimental + public Builder setRequestId(String requestId) { + this.requestId = requestId; + return this; + } + + /** + * Callbacks to be called by the server when this workflow reaches a terminal state. + * + *

    WARNING: Not intended for User Code. + */ + @Experimental + public Builder setCompletionCallbacks(List completionCallbacks) { + this.completionCallbacks = completionCallbacks; + return this; + } + public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -429,9 +460,11 @@ public WorkflowOptions build() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy, + workflowIdConflictPolicy, staticSummary, - staticDetails); + staticDetails, + requestId, + completionCallbacks); } /** @@ -453,9 +486,11 @@ public WorkflowOptions validateBuildWithDefaults() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy, + workflowIdConflictPolicy, staticSummary, - staticDetails); + staticDetails, + requestId, + completionCallbacks); } } @@ -487,12 +522,16 @@ public WorkflowOptions validateBuildWithDefaults() { private final Duration startDelay; - private final WorkflowIdConflictPolicy workflowIdConflictpolicy; + private final WorkflowIdConflictPolicy workflowIdConflictPolicy; private final String staticSummary; private final String staticDetails; + private final String requestId; + + private final List completionCallbacks; + private WorkflowOptions( String workflowId, WorkflowIdReusePolicy workflowIdReusePolicy, @@ -508,9 +547,11 @@ private WorkflowOptions( List contextPropagators, boolean disableEagerExecution, Duration startDelay, - WorkflowIdConflictPolicy workflowIdConflictpolicy, + WorkflowIdConflictPolicy workflowIdConflictPolicy, String staticSummary, - String staticDetails) { + String staticDetails, + String requestId, + List completionCallbacks) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; this.workflowRunTimeout = workflowRunTimeout; @@ -525,9 +566,11 @@ private WorkflowOptions( this.contextPropagators = contextPropagators; this.disableEagerExecution = disableEagerExecution; this.startDelay = startDelay; - this.workflowIdConflictpolicy = workflowIdConflictpolicy; + this.workflowIdConflictPolicy = workflowIdConflictPolicy; this.staticSummary = staticSummary; this.staticDetails = staticDetails; + this.requestId = requestId; + this.completionCallbacks = completionCallbacks; } public String getWorkflowId() { @@ -596,7 +639,17 @@ public boolean isDisableEagerExecution() { } public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() { - return workflowIdConflictpolicy; + return workflowIdConflictPolicy; + } + + @Experimental + public String getRequestId() { + return requestId; + } + + @Experimental + public List getCompletionCallbacks() { + return completionCallbacks; } public String getStaticSummary() { @@ -630,9 +683,11 @@ public boolean equals(Object o) { && Objects.equal(contextPropagators, that.contextPropagators) && Objects.equal(disableEagerExecution, that.disableEagerExecution) && Objects.equal(startDelay, that.startDelay) - && Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy) + && Objects.equal(workflowIdConflictPolicy, that.workflowIdConflictPolicy) && Objects.equal(staticSummary, that.staticSummary) - && Objects.equal(staticDetails, that.staticDetails); + && Objects.equal(staticDetails, that.staticDetails) + && Objects.equal(requestId, that.requestId) + && Objects.equal(completionCallbacks, that.completionCallbacks); } @Override @@ -652,9 +707,11 @@ public int hashCode() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy, + workflowIdConflictPolicy, staticSummary, - staticDetails); + staticDetails, + requestId, + completionCallbacks); } @Override @@ -691,12 +748,16 @@ public String toString() { + disableEagerExecution + ", startDelay=" + startDelay - + ", workflowIdConflictpolicy=" - + workflowIdConflictpolicy + + ", workflowIdConflictPolicy=" + + workflowIdConflictPolicy + ", staticSummary=" + staticSummary + ", staticDetails=" + staticDetails + + ", requestId=" + + requestId + + ", completionCallbacks=" + + completionCallbacks + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index a8cf4a6ae..b99da3e45 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -383,4 +383,12 @@ CompletableFuture getResultAsync( void terminate(@Nullable String reason, Object... details); Optional getOptions(); + + /** + * Creates a new stub that can be used to start a new run of the same workflow type with the same + * options. + * + * @param options new options to use for the stub + */ + WorkflowStub newInstance(WorkflowOptions options); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 91dde73de..d3b1a245f 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -447,6 +447,12 @@ public Optional getOptions() { return Optional.ofNullable(options); } + @Override + public WorkflowStub newInstance(WorkflowOptions options) { + return new WorkflowStubImpl( + clientOptions, workflowClientInvoker, workflowType.orElse(null), options); + } + private void checkStarted() { if (execution.get() == null || execution.get().getWorkflowId() == null) { throw new IllegalStateException("Null workflowId. Was workflow started?"); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartWorkflowRequest.java b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartWorkflowRequest.java new file mode 100644 index 000000000..6de97fc34 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/NexusStartWorkflowRequest.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.client; + +import java.util.Map; + +public final class NexusStartWorkflowRequest { + private final String requestId; + private final String callbackUrl; + private final Map callbackHeaders; + private final String taskQueue; + + public NexusStartWorkflowRequest( + String requestId, String callbackUrl, Map callbackHeaders, String taskQueue) { + this.requestId = requestId; + this.callbackUrl = callbackUrl; + this.callbackHeaders = callbackHeaders; + this.taskQueue = taskQueue; + } + + public String getRequestId() { + return requestId; + } + + public String getCallbackUrl() { + return callbackUrl; + } + + public Map getCallbackHeaders() { + return callbackHeaders; + } + + public String getTaskQueue() { + return taskQueue; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java index 83f27b8bb..cafba9f24 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java @@ -20,8 +20,10 @@ package io.temporal.internal.client; +import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowClient; import io.temporal.worker.WorkerFactory; +import io.temporal.workflow.Functions; /** * From OOP point of view, there is no reason for this interface not to extend {@link @@ -35,4 +37,6 @@ public interface WorkflowClientInternal { void registerWorkerFactory(WorkerFactory workerFactory); void deregisterWorkerFactory(WorkerFactory workerFactory); + + WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java index e9382a8c1..9282d6a18 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java @@ -88,6 +88,14 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest( request.setWorkflowIdConflictPolicy(options.getWorkflowIdConflictPolicy()); } + if (options.getRequestId() != null) { + request.setRequestId(options.getRequestId()); + } + + if (options.getCompletionCallbacks() != null) { + options.getCompletionCallbacks().forEach(request::addCompletionCallbacks); + } + String taskQueue = options.getTaskQueue(); if (taskQueue != null && !taskQueue.isEmpty()) { request.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index babc0b0eb..73ff12ac7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -21,8 +21,13 @@ package io.temporal.internal.common; import com.google.common.base.Defaults; +import io.temporal.api.common.v1.Callback; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.taskqueue.v1.TaskQueue; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import java.util.Arrays; /** Utility functions shared by the implementation code. */ public final class InternalUtils { @@ -49,6 +54,39 @@ public static Object getValueOrDefault(Object value, Class valueClass) { return Defaults.defaultValue(valueClass); } + /** + * Creates a new stub that is bound to the same workflow as the given stub, but with the Nexus + * callback URL and headers set. + * + * @param stub the stub to create a new stub from + * @param request the request containing the Nexus callback URL and headers + * @return a new stub bound to the same workflow as the given stub, but with the Nexus callback + * URL and headers set + */ + public static WorkflowStub createNexusBoundStub( + WorkflowStub stub, NexusStartWorkflowRequest request) { + if (!stub.getOptions().isPresent()) { + throw new IllegalArgumentException("Options are expected to be set on the stub"); + } + WorkflowOptions options = stub.getOptions().get(); + WorkflowOptions.Builder nexusWorkflowOptions = + WorkflowOptions.newBuilder() + .setRequestId(request.getRequestId()) + .setCompletionCallbacks( + Arrays.asList( + Callback.newBuilder() + .setNexus( + Callback.Nexus.newBuilder() + .setUrl(request.getCallbackUrl()) + .putAllHeader(request.getCallbackHeaders()) + .build()) + .build())); + if (options.getTaskQueue() == null) { + nexusWorkflowOptions.setTaskQueue(request.getTaskQueue()); + } + return stub.newInstance(nexusWorkflowOptions.build()); + } + /** Prohibit instantiation */ private InternalUtils() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java new file mode 100644 index 000000000..99ca97b3e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +/** + * Thread local store of the context object passed to a nexus operation implementation. Not to be + * used directly. + */ +public final class CurrentNexusOperationContext { + private static final ThreadLocal CURRENT = new ThreadLocal<>(); + + public static NexusOperationContextImpl get() { + NexusOperationContextImpl result = CURRENT.get(); + if (result == null) { + throw new IllegalStateException( + "NexusOperationContext can be used only inside of nexus operation handler " + + "implementation methods and in the same thread that invoked the operation."); + } + return CURRENT.get(); + } + + public static void set(NexusOperationContextImpl context) { + if (context == null) { + throw new IllegalArgumentException("null context"); + } + if (CURRENT.get() != null) { + throw new IllegalStateException("current already set"); + } + CURRENT.set(context); + } + + public static void unset() { + CURRENT.set(null); + } + + private CurrentNexusOperationContext() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java new file mode 100644 index 000000000..56565f4ff --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import io.temporal.nexus.NexusOperationContext; + +public final class NexusInternal { + private NexusInternal() {} + + public static NexusOperationContext getOperationContext() { + return CurrentNexusOperationContext.get(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java new file mode 100644 index 000000000..2d887711e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; +import io.temporal.nexus.NexusOperationContext; + +public class NexusOperationContextImpl implements NexusOperationContext { + private final String taskQueue; + private final WorkflowClient client; + private final Scope metricsScope; + + public NexusOperationContextImpl(String taskQueue, WorkflowClient client, Scope metricsScope) { + this.taskQueue = taskQueue; + this.client = client; + this.metricsScope = metricsScope; + } + + @Override + public Scope getMetricsScope() { + return metricsScope; + } + + public WorkflowClient getWorkflowClient() { + return client; + } + + public String getTaskQueue() { + return taskQueue; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 3e893cf0d..426cc7f68 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -109,6 +109,9 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } } + CurrentNexusOperationContext.set( + new NexusOperationContextImpl(taskQueue, client, metricsScope)); + switch (request.getVariantCase()) { case START_OPERATION: StartOperationResponse startResponse = @@ -150,6 +153,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException if (timeoutTask != null) { timeoutTask.cancel(false); } + CurrentNexusOperationContext.unset(); } } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/Nexus.java b/temporal-sdk/src/main/java/io/temporal/nexus/Nexus.java new file mode 100644 index 000000000..eb06ac057 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/Nexus.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.temporal.common.Experimental; +import io.temporal.internal.nexus.NexusInternal; +import io.temporal.internal.sync.WorkflowInternal; + +/** This class contains methods exposing Temporal APIs for Nexus Operations */ +@Experimental +public final class Nexus { + /** + * Can be used to get information about a Nexus Operation. This static method relies on a + * thread-local variable and works only in the original Nexus thread. + */ + public static NexusOperationContext getOperationContext() { + return NexusInternal.getOperationContext(); + } + + /** + * Use this to rethrow a checked exception from a Nexus Operation instead of adding the exception + * to a method signature. + * + * @return Never returns; always throws. Throws original exception if e is {@link + * RuntimeException} or {@link Error}. + */ + public static RuntimeException wrap(Throwable e) { + return WorkflowInternal.wrap(e); + } + + /** Prohibits instantiation. */ + private Nexus() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java new file mode 100644 index 000000000..f40a251fb --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; + +/** + * Context object passed to a Nexus operation implementation. Use {@link + * Nexus#getExecutionContext()} from a Nexus Operation implementation to access. + */ +@Experimental +public interface NexusOperationContext { + + /** + * Get scope for reporting business metrics in a nexus handler. This scope is tagged with the + * service and operation. + * + *

    The original metrics scope is set through {@link + * WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up. + */ + Scope getMetricsScope(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java b/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java new file mode 100644 index 000000000..d3f5c9610 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** + * Function interface for {@link WorkflowClientOperationHandlers#sync} representing a call made for + * every operation call that takes a {@link WorkflowClient}. + */ +@FunctionalInterface +@Experimental +public interface SynchronousWorkflowClientOperationFunction { + @Nullable + R apply( + OperationContext ctx, OperationStartDetails details, WorkflowClient client, @Nullable T input) + throws OperationUnsuccessfulException; +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java new file mode 100644 index 000000000..c41a9f0e7 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.nexusrpc.handler.*; +import io.nexusrpc.handler.OperationHandler; +import io.temporal.client.WorkflowClient; +import io.temporal.common.Experimental; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.NexusOperationContextImpl; + +/** WorkflowClientOperationHandlers can be used to create Temporal specific OperationHandlers */ +@Experimental +public final class WorkflowClientOperationHandlers { + /** + * Helper to create {@link io.nexusrpc.handler.OperationHandler} instances that take a {@link + * io.temporal.client.WorkflowClient}. + */ + public static OperationHandler sync( + SynchronousWorkflowClientOperationFunction func) { + return io.nexusrpc.handler.OperationHandler.sync( + (OperationContext ctx, OperationStartDetails details, T input) -> { + NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); + return func.apply(ctx, details, nexusCtx.getWorkflowClient(), input); + }); + } + + /** + * Maps a workflow method to an {@link io.nexusrpc.handler.OperationHandler}. + * + * @param startMethod returns the workflow method reference to call + * @return Operation handler to be used as an {@link OperationImpl} + */ + public static OperationHandler fromWorkflowMethod( + WorkflowMethodFactory startMethod) { + return new RunWorkflowOperation<>( + (OperationContext context, OperationStartDetails details, WorkflowClient client, T input) -> + WorkflowHandle.fromWorkflowMethod( + startMethod.apply(context, details, client, input), input)); + } + + /** + * Maps a workflow handle to an {@link io.nexusrpc.handler.OperationHandler}. + * + * @param handleFactory returns the workflow handle that will be mapped to the call + * @return Operation handler to be used as an {@link OperationImpl} + */ + public static OperationHandler fromWorkflowHandle( + WorkflowHandleFactory handleFactory) { + return new RunWorkflowOperation<>(handleFactory); + } + + /** Prohibits instantiation. */ + private WorkflowClientOperationHandlers() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandle.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandle.java new file mode 100644 index 000000000..fd299f809 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandle.java @@ -0,0 +1,284 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.temporal.client.*; +import io.temporal.workflow.Functions; + +/** WorkflowHandle is a readonly representation of a workflow run backing a Nexus operation. */ +public final class WorkflowHandle { + /** + * Create a handle to a zero argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod(Functions.Proc workflow) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(workflow)); + } + + /** + * Create a handle to a one argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc1 workflow, A1 arg1) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1))); + } + + /** + * Create a handle to a two argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc2 workflow, A1 arg1, A2 arg2) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2))); + } + + /** + * Create a handle to a three argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc3 workflow, A1 arg1, A2 arg2, A3 arg3) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3))); + } + + /** + * Create a handle to a four argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 third workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc4 workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4))); + } + + /** + * Create a handle to a five argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc5 workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5))); + } + + /** + * Create a handle to a five argument workflow with void return type + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument + * @param arg6 fifth workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Proc6 workflow, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6))); + } + + /** + * Create a handle to a zero argument workflow + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod(Functions.Func workflow) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(() -> workflow.apply())); + } + + /** + * Create a handle to a one argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func1 workflow, A1 arg1) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1))); + } + + /** + * Create a handle to a two argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow function parameter + * @param arg2 second workflow function parameter + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func2 workflow, A1 arg1, A2 arg2) { + return new WorkflowHandle(new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2))); + } + + /** + * Create a handle to a three argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow function parameter + * @param arg2 second workflow function parameter + * @param arg3 third workflow function parameter + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func3 workflow, A1 arg1, A2 arg2, A3 arg3) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3))); + } + + /** + * Create a handle to a four argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow function parameter + * @param arg2 second workflow function parameter + * @param arg3 third workflow function parameter + * @param arg4 fourth workflow function parameter + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func4 workflow, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4))); + } + + /** + * Create a handle to a five argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow function parameter + * @param arg2 second workflow function parameter + * @param arg3 third workflow function parameter + * @param arg4 fourth workflow function parameter + * @param arg5 fifth workflow function parameter + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func5 workflow, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5))); + } + + /** + * Create a handle to a six argument workflow. + * + * @param workflow The only supported value is method reference to a proxy created through {@link + * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow function parameter + * @param arg2 second workflow function parameter + * @param arg3 third workflow function parameter + * @param arg4 fourth workflow function parameter + * @param arg5 fifth workflow function parameter + * @param arg6 sixth workflow function parameter + * @return WorkflowHandle + */ + public static WorkflowHandle fromWorkflowMethod( + Functions.Func6 workflow, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6) { + return new WorkflowHandle( + new WorkflowMethodMethodInvoker(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6))); + } + + /** + * Create a WorkflowHandle from an untyped workflow stub. + * + * @param stub The workflow stub to use + * @param resultClass class of the workflow return value + * @param args arguments to start the workflow + * @return WorkflowHandle + */ + static WorkflowHandle fromWorkflowStub( + WorkflowStub stub, Class resultClass, Object... args) { + return new WorkflowHandle(new WorkflowStubHandleInvoker(stub, args)); + } + + final WorkflowHandleInvoker invoker; + + WorkflowHandleInvoker getInvoker() { + return invoker; + } + + /** Prohibits outside instantiation. */ + private WorkflowHandle(WorkflowHandleInvoker invoker) { + this.invoker = invoker; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java new file mode 100644 index 000000000..a5b0e14be --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import javax.annotation.Nullable; + +/** + * Function interface for {@link + * WorkflowClientOperationHandlers#fromWorkflowHandle(WorkflowHandleFactory)} representing the + * workflow to associate with each operation call. + */ +@FunctionalInterface +public interface WorkflowHandleFactory { + /** + * Invoked every operation start call and expected to return a workflow handle to a workflow stub + * through the provided {@link WorkflowClient}. + */ + @Nullable + WorkflowHandle apply( + OperationContext context, OperationStartDetails details, WorkflowClient client, T input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleInvoker.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleInvoker.java new file mode 100644 index 000000000..7a74e0b9b --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleInvoker.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.internal.client.NexusStartWorkflowRequest; + +interface WorkflowHandleInvoker { + WorkflowExecution invoke(NexusStartWorkflowRequest request); +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java new file mode 100644 index 000000000..f59ff75db --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.workflow.Functions; +import javax.annotation.Nullable; + +/** + * Function interface for {@link + * WorkflowClientOperationHandlers#fromWorkflowMethod(WorkflowMethodFactory)} representing the + * workflow method to invoke for every operation call. + */ +@FunctionalInterface +public interface WorkflowMethodFactory { + /** + * Invoked every operation start call and expected to return a workflow method reference to a + * proxy created through {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)} using the + * provided {@link WorkflowClient}. + */ + @Nullable + Functions.Func1 apply( + OperationContext context, OperationStartDetails details, WorkflowClient client, T input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java new file mode 100644 index 000000000..8512646eb --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import io.temporal.internal.client.WorkflowClientInternal; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.NexusOperationContextImpl; +import io.temporal.workflow.Functions; + +class WorkflowMethodMethodInvoker implements WorkflowHandleInvoker { + private Functions.Proc workflow; + + public WorkflowMethodMethodInvoker(Functions.Proc workflow) { + this.workflow = workflow; + } + + @Override + public WorkflowExecution invoke(NexusStartWorkflowRequest request) { + NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); + return ((WorkflowClientInternal) nexusCtx.getWorkflowClient().getInternal()) + .startNexus(request, workflow); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java new file mode 100644 index 000000000..e1cbc702f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import io.nexusrpc.OperationInfo; +import io.nexusrpc.handler.*; +import io.nexusrpc.handler.OperationHandler; +import io.temporal.client.WorkflowClient; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.NexusOperationContextImpl; + +class RunWorkflowOperation implements OperationHandler { + private final WorkflowHandleFactory handleFactory; + + RunWorkflowOperation(WorkflowHandleFactory handleFactory) { + this.handleFactory = handleFactory; + } + + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails operationStartDetails, T input) { + NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); + + WorkflowHandle handle = + handleFactory.apply(ctx, operationStartDetails, nexusCtx.getWorkflowClient(), input); + + NexusStartWorkflowRequest nexusRequest = + new NexusStartWorkflowRequest( + operationStartDetails.getRequestId(), + operationStartDetails.getCallbackUrl(), + operationStartDetails.getCallbackHeaders(), + nexusCtx.getTaskQueue()); + return OperationStartResult.async(handle.getInvoker().invoke(nexusRequest).getWorkflowId()); + } + + @Override + public R fetchResult( + OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public OperationInfo fetchInfo( + OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void cancel( + OperationContext operationContext, OperationCancelDetails operationCancelDetails) { + WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient(); + client.newUntypedWorkflowStub(operationCancelDetails.getOperationId()).cancel(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowStubHandleInvoker.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowStubHandleInvoker.java new file mode 100644 index 000000000..d2acf151a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowStubHandleInvoker.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.nexus; + +import static io.temporal.internal.common.InternalUtils.createNexusBoundStub; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.client.NexusStartWorkflowRequest; + +class WorkflowStubHandleInvoker implements WorkflowHandleInvoker { + final Object[] args; + final WorkflowStub stub; + + WorkflowStubHandleInvoker(WorkflowStub stub, Object[] args) { + this.args = args; + this.stub = stub; + } + + @Override + public WorkflowExecution invoke(NexusStartWorkflowRequest request) { + return createNexusBoundStub(stub, request).start(args); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java index 094f9799f..b7d0cb0fa 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java @@ -100,9 +100,9 @@ public void startOneArgsFuncWithDefault() { testWorkflowRule.getWorkflowClient().newWorkflowStub(Test1ArgWorkflowFunc.class, options); // Use worker that polls on a task queue configured through @WorkflowMethod annotation of // func1 - assertResult(1, WorkflowClient.start(stubF1::func1, 1)); + assertResult(1, WorkflowClient.start(stubF1::func1, "1")); Assert.assertEquals( - 1, stubF1.func1(1)); // Check that duplicated start just returns the result. + "1", stubF1.func1("1")); // Check that duplicated start just returns the result. } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/ExecuteTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/ExecuteTest.java index 2981a7be3..53b370a2c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/ExecuteTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/ExecuteTest.java @@ -41,8 +41,9 @@ public void testExecute() throws ExecutionException, InterruptedException { Assert.assertEquals("func", WorkflowClient.execute(stubF::func).get()); Test1ArgWorkflowFunc stubF1 = testWorkflowRule.newWorkflowStubTimeoutOptions(Test1ArgWorkflowFunc.class); - Assert.assertEquals(1, (int) WorkflowClient.execute(stubF1::func1, 1).get()); - Assert.assertEquals(1, stubF1.func1(1)); // Check that duplicated start just returns the result. + Assert.assertEquals("1", WorkflowClient.execute(stubF1::func1, "1").get()); + Assert.assertEquals( + "1", stubF1.func1("1")); // Check that duplicated start just returns the result. Test2ArgWorkflowFunc stubF2 = testWorkflowRule.newWorkflowStubTimeoutOptions(Test2ArgWorkflowFunc.class); Assert.assertEquals("12", WorkflowClient.execute(stubF2::func2, "1", 2).get()); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowIdReusePolicyTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowIdReusePolicyTest.java index 9eb28ccbc..17b18d17c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowIdReusePolicyTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowIdReusePolicyTest.java @@ -52,12 +52,12 @@ public void testWorkflowIdResuePolicy() { testWorkflowRule .getWorkflowClient() .newWorkflowStub(Test1ArgWorkflowFunc.class, workflowOptions); - Assert.assertEquals(1, stubF1_1.func1(1)); + Assert.assertEquals("1", stubF1_1.func1("1")); Test1ArgWorkflowFunc stubF1_2 = testWorkflowRule .getWorkflowClient() .newWorkflowStub(Test1ArgWorkflowFunc.class, workflowOptions); - Assert.assertEquals(1, stubF1_2.func1(2)); + Assert.assertEquals("1", stubF1_2.func1("2")); // Setting WorkflowIdReusePolicy to AllowDuplicate will trigger new run. workflowOptions = @@ -70,7 +70,7 @@ public void testWorkflowIdResuePolicy() { testWorkflowRule .getWorkflowClient() .newWorkflowStub(Test1ArgWorkflowFunc.class, workflowOptions); - Assert.assertEquals(2, stubF1_3.func1(2)); + Assert.assertEquals("2", stubF1_3.func1("2")); // Setting WorkflowIdReusePolicy to RejectDuplicate or AllowDuplicateFailedOnly does not work as // expected. See https://github.com/uber/cadence-java-client/issues/295. diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildAsyncWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildAsyncWorkflowTest.java index d6c06fb5e..4df10bb30 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildAsyncWorkflowTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildAsyncWorkflowTest.java @@ -57,7 +57,7 @@ public String execute(String taskQueue) { Assert.assertEquals("func", Async.function(stubF::func).get()); Test1ArgWorkflowFunc stubF1 = Workflow.newChildWorkflowStub(Test1ArgWorkflowFunc.class, workflowOptions); - assertEquals(1, (int) Async.function(stubF1::func1, 1).get()); + assertEquals("1", Async.function(stubF1::func1, "1").get()); Test2ArgWorkflowFunc stubF2 = Workflow.newChildWorkflowStub(Test2ArgWorkflowFunc.class, workflowOptions); assertEquals("12", Async.function(stubF2::func2, "1", 2).get()); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java new file mode 100644 index 000000000..9697e24b6 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class AsyncWorkflowOperationTest extends BaseNexusTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class, TestOperationWorkflow.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void testWorkflowOperation() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("Hello from operation workflow " + testWorkflowRule.getTaskQueue(), result); + } + + @Test + public void testWorkflowOperationReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testAsyncWorkflowOperationTestHistory.json", AsyncWorkflowOperationTest.TestNexus.class); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + // Try to call an asynchronous operation in a blocking way + TestNexusServices.TestNexusService1 serviceStub = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + // Try to call an asynchronous operation in a blocking way + String asyncResult = serviceStub.operation(input); + // Try to call an asynchronous operation in a non-blocking way + Promise asyncPromise = Async.function(serviceStub::operation, input); + Assert.assertEquals(asyncPromise.get(), asyncResult); + // Try to call an asynchronous operation in a non-blocking way using a handle + NexusOperationHandle asyncOpHandle = + Workflow.startNexusOperation(serviceStub::operation, "block"); + NexusOperationExecution asyncExec = asyncOpHandle.getExecution().get(); + // Execution id is present for an asynchronous operations + Assert.assertTrue("Operation id should be present", asyncExec.getOperationId().isPresent()); + // Result should only be completed if the operation is completed + Assert.assertFalse("Result should not be completed", asyncOpHandle.getResult().isCompleted()); + // Unblock the operation + Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get()) + .unblock(); + // Wait for the operation to complete + Assert.assertEquals("Hello from operation workflow block", asyncOpHandle.getResult().get()); + return asyncResult; + } + } + + @WorkflowInterface + public interface OperationWorkflow { + @WorkflowMethod + String execute(String arg); + + @SignalMethod + void unblock(); + } + + public static class TestOperationWorkflow implements OperationWorkflow { + boolean unblocked = false; + + @Override + public String execute(String arg) { + if (arg.equals("block")) { + Workflow.await(() -> unblocked); + } + return "Hello from operation workflow " + arg; + } + + @Override + public void unblock() { + unblocked = true; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowMethod( + (context, details, client, input) -> + client.newWorkflowStub( + OperationWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::execute); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java new file mode 100644 index 000000000..154e500de --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class ParallelWorkflowOperationTest extends BaseNexusTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class, TestOperationWorkflow.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void testParallelOperations() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("0123456789", result); + } + + @Test + public void testParallelOperationsReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testParallelWorkflowOperationTestHistory.json", TestNexus.class); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + TestNexusServices.TestNexusService1 serviceStub = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + List> asyncResult = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + NexusOperationHandle h = + Workflow.startNexusOperation(serviceStub::operation, String.valueOf(i)); + asyncResult.add(h.getResult()); + } + StringBuilder result = new StringBuilder(); + for (Promise promise : asyncResult) { + result.append(promise.get()); + } + return result.toString(); + } + } + + @WorkflowInterface + public interface OperationWorkflow { + @WorkflowMethod + String execute(String arg); + } + + public static class TestOperationWorkflow implements OperationWorkflow { + @Override + public String execute(String arg) { + Workflow.sleep(Workflow.newRandom().ints(1, 1000).findFirst().getAsInt()); + return arg; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowMethod( + (context, details, client, input) -> + client.newWorkflowStub( + OperationWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::execute); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java new file mode 100644 index 000000000..c26327b93 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; + +import com.google.common.collect.ImmutableMap; +import com.uber.m3.tally.RootScopeBuilder; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.common.reporter.TestStatsReporter; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.serviceclient.MetricsTag; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.MetricsType; +import io.temporal.worker.WorkerMetricsTag; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import java.time.Duration; +import java.util.Map; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SyncClientOperationTest extends BaseNexusTest { + private final TestStatsReporter reporter = new TestStatsReporter(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setMetricsScope( + new RootScopeBuilder() + .reporter(reporter) + .reportEvery(com.uber.m3.util.Duration.ofMillis(10))) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void syncClientOperation() { + TestUpdatedWorkflow workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class); + Assert.assertTrue(workflowStub.execute().startsWith("Update ID:")); + + // Test metrics all tasks should have + Map nexusWorkerTags = + ImmutableMap.builder() + .putAll(MetricsTag.defaultTags(NAMESPACE)) + .put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.NEXUS_WORKER.getValue()) + .put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue()) + .buildKeepingLast(); + reporter.assertTimer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, nexusWorkerTags); + Map operationTags = + ImmutableMap.builder() + .putAll(nexusWorkerTags) + .put(MetricsTag.NEXUS_SERVICE, "TestNexusService1") + .put(MetricsTag.NEXUS_OPERATION, "operation") + .buildKeepingLast(); + reporter.assertTimer(MetricsType.NEXUS_EXEC_LATENCY, operationTags); + reporter.assertTimer(MetricsType.NEXUS_TASK_E2E_LATENCY, operationTags); + // Test our custom metric + reporter.assertCounter("operation", operationTags, 1); + } + + @WorkflowInterface + public interface TestUpdatedWorkflow { + + @WorkflowMethod + String execute(); + + @UpdateMethod + String update(String arg); + } + + public static class TestNexus implements TestUpdatedWorkflow { + @Override + public String execute() { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(1)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + // Try to call a synchronous operation in a blocking way + TestNexusServices.TestNexusService1 serviceStub = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + return serviceStub.operation(Workflow.getInfo().getWorkflowId()); + } + + @Override + public String update(String arg) { + return "Update ID: " + Workflow.getCurrentUpdateInfo().get().getUpdateId(); + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + // Implemented inline + return WorkflowClientOperationHandlers.sync( + (ctx, details, client, id) -> { + Nexus.getOperationContext().getMetricsScope().counter("operation").inc(1); + return client + .newWorkflowStub(TestUpdatedWorkflow.class, id) + .update("Update from operation"); + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java index ea9c10b16..22c5112b6 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java @@ -79,7 +79,7 @@ public String execute(String input) { NexusOperationExecution syncExec = syncOpHandle.getExecution().get(); // Execution id is not present for synchronous operations Assert.assertFalse( - "Execution id should not be present", syncExec.getOperationId().isPresent()); + "Operation id should not be present", syncExec.getOperationId().isPresent()); // Result should always be completed for a synchronous operations when the Execution // is resolved Assert.assertTrue("Result should be completed", syncOpHandle.getResult().isCompleted()); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java new file mode 100644 index 000000000..65f3e2d9c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java @@ -0,0 +1,181 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.nexus.WorkflowHandle; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowHandleFuncTest extends BaseNexusTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceFuncImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void handleTests() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("funcinputinput2input23input234input2345input23456", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + + TestNexusServiceFunc serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceFunc.class, serviceOptions); + StringBuilder result = new StringBuilder(); + for (int i = 0; i < 7; i++) { + result.append(serviceStub.operation(i)); + } + return result.toString(); + } + } + + @Service + public interface TestNexusServiceFunc { + @Operation + String operation(Integer input); + } + + @ServiceImpl(service = TestNexusServiceFunc.class) + public class TestNexusServiceFuncImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowHandle( + (context, details, client, input) -> { + switch (input) { + case 0: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func); + case 1: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func1, + "input"); + case 2: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func2, + "input", + 2); + case 3: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func3, + "input", + 2, + 3); + case 4: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func4, + "input", + 2, + 3, + 4); + case 5: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func5, + "input", + 2, + 3, + 4, + 5); + case 6: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test6ArgWorkflowFunc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::func6, + "input", + 2, + 3, + 4, + 5, + 6); + default: + return null; + } + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java new file mode 100644 index 000000000..fd5da0981 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.nexus.WorkflowHandle; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowHandleProcTest extends BaseNexusTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceFuncImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Test + public void handleTests() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("success", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + + TestNexusServiceProc serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceProc.class, serviceOptions); + for (int i = 0; i < 7; i++) { + serviceStub.operation(i); + } + return "success"; + } + } + + @Service + public interface TestNexusServiceProc { + @Operation + Void operation(Integer input); + } + + @ServiceImpl(service = TestNexusServiceProc.class) + public class TestNexusServiceFuncImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowHandle( + (context, details, client, input) -> { + switch (input) { + case 0: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc); + case 1: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc1, + "input"); + case 2: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc2, + "input", + 2); + case 3: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc3, + "input", + 2, + 3); + case 4: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc4, + "input", + 2, + 3, + 4); + case 5: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc5, + "input", + 2, + 3, + 4, + 5); + case 6: + return WorkflowHandle.fromWorkflowMethod( + client.newWorkflowStub( + TestMultiArgWorkflowFunctions.Test6ArgWorkflowProc.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) + ::proc6, + "input", + 2, + 3, + 4, + 5, + 6); + default: + return null; + } + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java index 76b0bba38..55c4c3387 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java @@ -44,7 +44,7 @@ public interface TestNoArgsWorkflowFunc extends TestUpdateFunc { public interface Test1ArgWorkflowFunc extends TestUpdateFunc { @WorkflowMethod(name = "func1") - int func1(int input); + String func1(String input); } @WorkflowInterface @@ -162,7 +162,7 @@ public String func() { } @Override - public int func1(int a1) { + public String func1(String a1) { return a1; } diff --git a/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json b/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json new file mode 100644 index 000000000..f860a9c5e --- /dev/null +++ b/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json @@ -0,0 +1,555 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-09-21T22:04:51.255938Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049652", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "TestWorkflow1" + }, + "taskQueue": { + "name": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + } + ] + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "200s", + "workflowTaskTimeout": "5s", + "originalExecutionRunId": "c55c636c-5344-4b8d-b22a-2a4c4eee8fa3", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "firstExecutionRunId": "c55c636c-5344-4b8d-b22a-2a4c4eee8fa3", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "b3e872e4-74e6-4e76-a444-077b812e9a1b" + } + }, + { + "eventId": "2", + "eventTime": "2024-09-21T22:04:51.256089Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049653", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-09-21T22:04:51.259445Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049659", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "9bdeae08-5113-413d-8281-12b88b525c69", + "historySizeBytes": "508" + } + }, + { + "eventId": "4", + "eventTime": "2024-09-21T22:04:51.367145Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049663", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "sdkMetadata": { + "langUsedFlags": [ + 1 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-09-21T22:04:51.367233Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049664", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "73d3da51-b437-4202-a980-516c44bb7a84", + "endpointId": "941a6908-7da9-4ae9-902d-6a1821f0ed85" + } + }, + { + "eventId": "6", + "eventTime": "2024-09-21T22:04:51.394909Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049679", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "5", + "operationId": "53539d2b-c80d-4c6f-abf0-f8fd73c14410", + "requestId": "73d3da51-b437-4202-a980-516c44bb7a84" + } + }, + { + "eventId": "7", + "eventTime": "2024-09-21T22:04:51.394926Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049680", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "8", + "eventTime": "2024-09-21T22:04:51.395768Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049684", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "e9686c4b-9502-444d-a991-b7578a55ed1a", + "historySizeBytes": "1318" + } + }, + { + "eventId": "9", + "eventTime": "2024-09-21T22:04:51.402442Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049695", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "10", + "eventTime": "2024-09-21T22:04:51.399172Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049696", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "5", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + }, + "requestId": "73d3da51-b437-4202-a980-516c44bb7a84" + } + }, + { + "eventId": "11", + "eventTime": "2024-09-21T22:04:51.402476Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049697", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "12", + "eventTime": "2024-09-21T22:04:51.404313Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049701", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "c4c512f3-3b89-422a-b114-0f98d520287d", + "historySizeBytes": "1891" + } + }, + { + "eventId": "13", + "eventTime": "2024-09-21T22:04:51.410790Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049705", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "14", + "eventTime": "2024-09-21T22:04:51.410809Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049706", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "13", + "requestId": "aedc84ce-5870-4a58-9aa6-dc166e56cc0d", + "endpointId": "941a6908-7da9-4ae9-902d-6a1821f0ed85" + } + }, + { + "eventId": "15", + "eventTime": "2024-09-21T22:04:51.416236Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049719", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "14", + "operationId": "92f0b987-c98f-4bd9-98b5-bd53ac85a92a", + "requestId": "aedc84ce-5870-4a58-9aa6-dc166e56cc0d" + } + }, + { + "eventId": "16", + "eventTime": "2024-09-21T22:04:51.416250Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049720", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "17", + "eventTime": "2024-09-21T22:04:51.416855Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049724", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "16", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "cbe3345b-3a4b-4782-97e1-4ea9e2230375", + "historySizeBytes": "2697" + } + }, + { + "eventId": "18", + "eventTime": "2024-09-21T22:04:51.419110Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049735", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "16", + "startedEventId": "17", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "19", + "eventTime": "2024-09-21T22:04:51.419961Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049737", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "14", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + }, + "requestId": "aedc84ce-5870-4a58-9aa6-dc166e56cc0d" + } + }, + { + "eventId": "20", + "eventTime": "2024-09-21T22:04:51.419976Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049738", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "21", + "eventTime": "2024-09-21T22:04:51.420653Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049742", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "20", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "1371ab54-9c3b-4d9d-9721-4638f46b5838", + "historySizeBytes": "3270" + } + }, + { + "eventId": "22", + "eventTime": "2024-09-21T22:04:51.423700Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049746", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "20", + "startedEventId": "21", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "23", + "eventTime": "2024-09-21T22:04:51.423718Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049747", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJsb2NrIg==" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "22", + "requestId": "e89c12bc-cc30-4ba0-ab9f-70974b4f052f", + "endpointId": "941a6908-7da9-4ae9-902d-6a1821f0ed85" + } + }, + { + "eventId": "24", + "eventTime": "2024-09-21T22:04:51.427450Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049760", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "23", + "operationId": "aaaf6547-84de-4d79-bc0c-6b59a64ea061", + "requestId": "e89c12bc-cc30-4ba0-ab9f-70974b4f052f" + } + }, + { + "eventId": "25", + "eventTime": "2024-09-21T22:04:51.427463Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049761", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "26", + "eventTime": "2024-09-21T22:04:51.428158Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049765", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "25", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "45fee988-4673-41cc-b677-4d05890fae42", + "historySizeBytes": "4010" + } + }, + { + "eventId": "27", + "eventTime": "2024-09-21T22:04:51.436705Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049772", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "25", + "startedEventId": "26", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "28", + "eventTime": "2024-09-21T22:04:51.436729Z", + "eventType": "EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED", + "taskId": "1049773", + "signalExternalWorkflowExecutionInitiatedEventAttributes": { + "workflowTaskCompletedEventId": "27", + "namespaceId": "64b5c2fb-2aae-4b9c-a3e1-564c49fab4b8", + "workflowExecution": { + "workflowId": "aaaf6547-84de-4d79-bc0c-6b59a64ea061" + }, + "signalName": "unblock", + "header": {} + } + }, + { + "eventId": "29", + "eventTime": "2024-09-21T22:04:51.438172Z", + "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049781", + "externalWorkflowExecutionSignaledEventAttributes": { + "initiatedEventId": "28", + "namespace": "UnitTest", + "namespaceId": "64b5c2fb-2aae-4b9c-a3e1-564c49fab4b8", + "workflowExecution": { + "workflowId": "aaaf6547-84de-4d79-bc0c-6b59a64ea061" + } + } + }, + { + "eventId": "30", + "eventTime": "2024-09-21T22:04:51.438174Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049782", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "31", + "eventTime": "2024-09-21T22:04:51.438656Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049789", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "30", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "29b1a0d7-d39d-493c-bf05-256d3ed99412", + "historySizeBytes": "4617" + } + }, + { + "eventId": "32", + "eventTime": "2024-09-21T22:04:51.442440Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049794", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "30", + "startedEventId": "31", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "33", + "eventTime": "2024-09-21T22:04:51.443830Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049803", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "23", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IGJsb2NrIg==" + }, + "requestId": "e89c12bc-cc30-4ba0-ab9f-70974b4f052f" + } + }, + { + "eventId": "34", + "eventTime": "2024-09-21T22:04:51.443840Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049804", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "11805@Quinn-Klassens-MacBook-Pro.local:c294db9c-9a58-41e8-97bc-31888b633bab", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-fdb273b6-de24-4867-ab48-9166b135d230" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "35", + "eventTime": "2024-09-21T22:04:51.444745Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049808", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "34", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "requestId": "f5fad756-9be6-4ff9-8a9f-1d3bee637720", + "historySizeBytes": "5122" + } + }, + { + "eventId": "36", + "eventTime": "2024-09-21T22:04:51.447003Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049812", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "34", + "startedEventId": "35", + "identity": "11805@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "37", + "eventTime": "2024-09-21T22:04:51.447018Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1049813", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tZmRiMjczYjYtZGUyNC00ODY3LWFiNDgtOTE2NmIxMzVkMjMwIg==" + } + ] + }, + "workflowTaskCompletedEventId": "36" + } + } + ] +} \ No newline at end of file diff --git a/temporal-sdk/src/test/resources/testParallelWorkflowOperationTestHistory.json b/temporal-sdk/src/test/resources/testParallelWorkflowOperationTestHistory.json new file mode 100644 index 000000000..2131fd08b --- /dev/null +++ b/temporal-sdk/src/test/resources/testParallelWorkflowOperationTestHistory.json @@ -0,0 +1,742 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-09-21T21:58:59.291887Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049233", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "TestWorkflow1" + }, + "taskQueue": { + "name": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IldvcmtmbG93VGVzdC10ZXN0UGFyYWxsZWxPcGVyYXRpb25zLTFlMjUxNTM4LTQ4MTQtNDUzNS1iOTk5LWU4NmQ4ZjE3YTY5MSI=" + } + ] + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "200s", + "workflowTaskTimeout": "5s", + "originalExecutionRunId": "cf054c98-0160-450e-9f01-061d861b5a54", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "firstExecutionRunId": "cf054c98-0160-450e-9f01-061d861b5a54", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "531437b0-31dd-4222-bf02-96d6cbd8885b" + } + }, + { + "eventId": "2", + "eventTime": "2024-09-21T21:58:59.291967Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049234", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-09-21T21:58:59.295147Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049240", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "requestId": "7404551c-c4ce-4b89-88aa-856cce2feb74", + "historySizeBytes": "512" + } + }, + { + "eventId": "4", + "eventTime": "2024-09-21T21:58:59.409523Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049244", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "sdkMetadata": { + "langUsedFlags": [ + 1 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-09-21T21:58:59.409555Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049245", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjAi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "96b76c12-c846-4cce-9290-74ee9b1d79ab", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "6", + "eventTime": "2024-09-21T21:58:59.409595Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049246", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "c4ff6b12-5e3e-4838-919d-377854b1ea73", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "7", + "eventTime": "2024-09-21T21:58:59.409608Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049247", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjIi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "9dd9cf59-393d-4ecb-8270-9aa0b72adc8c", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "8", + "eventTime": "2024-09-21T21:58:59.409617Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049248", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjMi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "db9fb6d0-e7de-460f-b518-2d30678e8011", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "9", + "eventTime": "2024-09-21T21:58:59.409627Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049249", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjQi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "6bd05424-845c-426c-9798-00af06ad27d9", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "10", + "eventTime": "2024-09-21T21:58:59.409636Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049250", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjUi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "c2a445d2-b548-456f-91c6-d7319b2733b5", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "11", + "eventTime": "2024-09-21T21:58:59.409646Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049251", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjYi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "4a0df3c4-e1e0-4877-b3bd-c5185d18027d", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "12", + "eventTime": "2024-09-21T21:58:59.409665Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049252", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijci" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "6eb32f69-5c3a-4259-a386-d5d2845aca77", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "13", + "eventTime": "2024-09-21T21:58:59.409673Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049253", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijgi" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "68692641-f77f-46a0-accd-4916de8bcc15", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "14", + "eventTime": "2024-09-21T21:58:59.409681Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1049254", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijki" + }, + "scheduleToCloseTimeout": "10s", + "workflowTaskCompletedEventId": "4", + "requestId": "6d69714a-df8f-4c7a-a2e3-a4eed95355b2", + "endpointId": "6870540a-d056-4f6d-a372-3ad23fedf16e" + } + }, + { + "eventId": "15", + "eventTime": "2024-09-21T21:58:59.441823Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049335", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "7", + "operationId": "b99dd681-6005-4120-a9f5-77f69e59ca8a", + "requestId": "9dd9cf59-393d-4ecb-8270-9aa0b72adc8c" + } + }, + { + "eventId": "16", + "eventTime": "2024-09-21T21:58:59.441839Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049336", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "6114@Quinn-Klassens-MacBook-Pro.local:5bff66e2-9eaf-49f2-aa60-1fa7e7cf7848", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "17", + "eventTime": "2024-09-21T21:58:59.443506Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049341", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "6", + "operationId": "26fc52ac-66c6-46c2-ba32-8f524e03b80b", + "requestId": "c4ff6b12-5e3e-4838-919d-377854b1ea73" + } + }, + { + "eventId": "18", + "eventTime": "2024-09-21T21:58:59.443897Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049343", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "9", + "operationId": "d4b13a56-dc16-4bac-af5e-b505c61ba569", + "requestId": "6bd05424-845c-426c-9798-00af06ad27d9" + } + }, + { + "eventId": "19", + "eventTime": "2024-09-21T21:58:59.444123Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049345", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "14", + "operationId": "1f3aced5-a1ff-4a0d-a02e-065c1b54179c", + "requestId": "6d69714a-df8f-4c7a-a2e3-a4eed95355b2" + } + }, + { + "eventId": "20", + "eventTime": "2024-09-21T21:58:59.444333Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049347", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "8", + "operationId": "0b4f0b4e-54d3-4185-8a0a-485f27462bc5", + "requestId": "db9fb6d0-e7de-460f-b518-2d30678e8011" + } + }, + { + "eventId": "21", + "eventTime": "2024-09-21T21:58:59.444519Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049349", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "5", + "operationId": "0777cf0f-3ea0-4e94-8b4e-9580ea4e6ed2", + "requestId": "96b76c12-c846-4cce-9290-74ee9b1d79ab" + } + }, + { + "eventId": "22", + "eventTime": "2024-09-21T21:58:59.444697Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049351", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "11", + "operationId": "7b694b12-6488-4317-8bec-c893f7f1c07b", + "requestId": "4a0df3c4-e1e0-4877-b3bd-c5185d18027d" + } + }, + { + "eventId": "23", + "eventTime": "2024-09-21T21:58:59.444885Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049353", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "13", + "operationId": "3afdbf97-3338-450f-914a-14a60dece2ab", + "requestId": "68692641-f77f-46a0-accd-4916de8bcc15" + } + }, + { + "eventId": "24", + "eventTime": "2024-09-21T21:58:59.445060Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049355", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "12", + "operationId": "e276e526-7fc3-42c5-b12b-2c22ed98b301", + "requestId": "6eb32f69-5c3a-4259-a386-d5d2845aca77" + } + }, + { + "eventId": "25", + "eventTime": "2024-09-21T21:58:59.445416Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1049357", + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "10", + "operationId": "e0561cad-d066-46ae-918c-34c52aae8aaa", + "requestId": "c2a445d2-b548-456f-91c6-d7319b2733b5" + } + }, + { + "eventId": "26", + "eventTime": "2024-09-21T21:58:59.446001Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049359", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "16", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "requestId": "152c4c51-fa99-4241-9707-2cd4e174d276", + "historySizeBytes": "4537" + } + }, + { + "eventId": "27", + "eventTime": "2024-09-21T21:58:59.455430Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049411", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "16", + "startedEventId": "26", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "28", + "eventTime": "2024-09-21T21:59:00.458552Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049480", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "9", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjQi" + }, + "requestId": "6bd05424-845c-426c-9798-00af06ad27d9" + } + }, + { + "eventId": "29", + "eventTime": "2024-09-21T21:59:00.458567Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049481", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "6114@Quinn-Klassens-MacBook-Pro.local:5bff66e2-9eaf-49f2-aa60-1fa7e7cf7848", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "30", + "eventTime": "2024-09-21T21:59:00.461771Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049508", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "29", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "requestId": "f8078c43-d081-4139-8eb8-29e178ddcbbe", + "historySizeBytes": "5005" + } + }, + { + "eventId": "31", + "eventTime": "2024-09-21T21:59:00.467152Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049560", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "29", + "startedEventId": "30", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "32", + "eventTime": "2024-09-21T21:59:00.464054Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049561", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "14", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijki" + }, + "requestId": "6d69714a-df8f-4c7a-a2e3-a4eed95355b2" + } + }, + { + "eventId": "33", + "eventTime": "2024-09-21T21:59:00.467168Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049562", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "6114@Quinn-Klassens-MacBook-Pro.local:5bff66e2-9eaf-49f2-aa60-1fa7e7cf7848", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "34", + "eventTime": "2024-09-21T21:59:00.469553Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049580", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "6", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + }, + "requestId": "c4ff6b12-5e3e-4838-919d-377854b1ea73" + } + }, + { + "eventId": "35", + "eventTime": "2024-09-21T21:59:00.472643Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049601", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "7", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjIi" + }, + "requestId": "9dd9cf59-393d-4ecb-8270-9aa0b72adc8c" + } + }, + { + "eventId": "36", + "eventTime": "2024-09-21T21:59:00.474830Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049615", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "5", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjAi" + }, + "requestId": "96b76c12-c846-4cce-9290-74ee9b1d79ab" + } + }, + { + "eventId": "37", + "eventTime": "2024-09-21T21:59:00.476132Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049624", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "33", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "requestId": "f3014e31-1105-4efc-b197-d28c6bd6c59c", + "historySizeBytes": "5767" + } + }, + { + "eventId": "38", + "eventTime": "2024-09-21T21:59:00.480427Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049632", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "33", + "startedEventId": "37", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "39", + "eventTime": "2024-09-21T21:59:00.478102Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049633", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "12", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijci" + }, + "requestId": "6eb32f69-5c3a-4259-a386-d5d2845aca77" + } + }, + { + "eventId": "40", + "eventTime": "2024-09-21T21:59:00.478529Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049634", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "13", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijgi" + }, + "requestId": "68692641-f77f-46a0-accd-4916de8bcc15" + } + }, + { + "eventId": "41", + "eventTime": "2024-09-21T21:59:00.478862Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049635", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "11", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjYi" + }, + "requestId": "4a0df3c4-e1e0-4877-b3bd-c5185d18027d" + } + }, + { + "eventId": "42", + "eventTime": "2024-09-21T21:59:00.479198Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049636", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "8", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjMi" + }, + "requestId": "db9fb6d0-e7de-460f-b518-2d30678e8011" + } + }, + { + "eventId": "43", + "eventTime": "2024-09-21T21:59:00.479584Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", + "taskId": "1049637", + "nexusOperationCompletedEventAttributes": { + "scheduledEventId": "10", + "result": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjUi" + }, + "requestId": "c2a445d2-b548-456f-91c6-d7319b2733b5" + } + }, + { + "eventId": "44", + "eventTime": "2024-09-21T21:59:00.480440Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049638", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "6114@Quinn-Klassens-MacBook-Pro.local:5bff66e2-9eaf-49f2-aa60-1fa7e7cf7848", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testParallelOperations-1e251538-4814-4535-b999-e86d8f17a691" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "45", + "eventTime": "2024-09-21T21:59:00.497213Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049642", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "44", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "requestId": "14fbd347-f7de-40e3-a532-5a5a88e6b78f", + "historySizeBytes": "6627" + } + }, + { + "eventId": "46", + "eventTime": "2024-09-21T21:59:00.500049Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049646", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "44", + "startedEventId": "45", + "identity": "6114@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "47", + "eventTime": "2024-09-21T21:59:00.500065Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1049647", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjAxMjM0NTY3ODki" + } + ] + }, + "workflowTaskCompletedEventId": "46" + } + } + ] +} \ No newline at end of file diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/LongPollUtil.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/LongPollUtil.java index 79c99c1bf..3398681cb 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/LongPollUtil.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/LongPollUtil.java @@ -30,6 +30,7 @@ static boolean isLongPoll( MethodDescriptor method, CallOptions callOptions) { if (method == WorkflowServiceGrpc.getPollWorkflowTaskQueueMethod() || method == WorkflowServiceGrpc.getPollActivityTaskQueueMethod() + || method == WorkflowServiceGrpc.getPollNexusTaskQueueMethod() || method == WorkflowServiceGrpc.getUpdateWorkflowExecutionMethod() || method == WorkflowServiceGrpc.getExecuteMultiOperationMethod() || method == WorkflowServiceGrpc.getPollWorkflowExecutionUpdateMethod()) { diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 69541eae6..92175b4c8 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -715,6 +715,7 @@ private static void scheduleNexusOperation( .setPayload(attr.getInput()) .addLinks(link) .setCallback("http://test-env/operations") + .setRequestId(UUID.randomUUID().toString()) // The test server uses this to lookup the operation .putCallbackHeader( "operation-reference", ref.toBytes().toStringUtf8()))); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java index 6d04a006f..d1e907ad2 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java @@ -188,6 +188,11 @@ public Optional getOptions() { return next.getOptions(); } + @Override + public WorkflowStub newInstance(WorkflowOptions options) { + return new TimeLockingWorkflowStub(locker, next.newInstance(options)); + } + /** Unlocks time skipping before blocking calls and locks back after completion. */ private class TimeLockingFuture extends CompletableFuture {