diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java index 0e47fa772..19db55e85 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkflowImplementationOptions.java @@ -43,6 +43,14 @@ public static Builder newBuilder() { return new Builder(); } + public static Builder newBuilder(WorkflowImplementationOptions options) { + return new Builder(options); + } + + public Builder toBuilder() { + return new Builder(this); + } + public static final class Builder { private Class[] failWorkflowExceptionTypes; @@ -55,6 +63,19 @@ public static final class Builder { private Builder() {} + private Builder(WorkflowImplementationOptions options) { + if (options == null) { + return; + } + this.failWorkflowExceptionTypes = options.getFailWorkflowExceptionTypes(); + this.activityOptions = options.getActivityOptions(); + this.defaultActivityOptions = options.getDefaultActivityOptions(); + this.localActivityOptions = options.getLocalActivityOptions(); + this.defaultLocalActivityOptions = options.getDefaultLocalActivityOptions(); + this.nexusServiceOptions = options.getNexusServiceOptions(); + this.defaultNexusServiceOptions = options.getDefaultNexusServiceOptions(); + } + /** * Optional: Sets how workflow worker deals with exceptions thrown from the workflow code which * include non-deterministic history events (presumably arising from non-deterministic workflow diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotTests.java index b062e748c..435fbd1f5 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotTests.java @@ -39,7 +39,6 @@ import io.temporal.worker.WorkerOptions; import io.temporal.worker.tuning.*; import io.temporal.workflow.*; -import io.temporal.workflow.nexus.BaseNexusTest; import io.temporal.workflow.shared.TestNexusServices; import java.time.Duration; import java.util.Map; @@ -49,7 +48,7 @@ import org.junit.Rule; import org.junit.Test; -public class WorkflowSlotTests extends BaseNexusTest { +public class WorkflowSlotTests { private final int MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE = 100; private final int MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE = 1000; private final int MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE = 10000; @@ -98,11 +97,6 @@ public void setup() { didFail = false; } - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - @After public void tearDown() { testWorkflowRule.getTestEnvironment().close(); @@ -181,7 +175,6 @@ public static class SleepingWorkflowImpl implements TestWorkflow { Workflow.newNexusServiceStub( TestNexusServices.TestNexusService1.class, NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) .setOperationOptions( NexusOperationOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofSeconds(10)) diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java index a2394be79..e9e8e260a 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java @@ -37,7 +37,6 @@ import io.temporal.workflow.NexusOperationOptions; import io.temporal.workflow.NexusServiceOptions; import io.temporal.workflow.Workflow; -import io.temporal.workflow.nexus.BaseNexusTest; import io.temporal.workflow.shared.TestNexusServices; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import java.time.Duration; @@ -47,7 +46,7 @@ import org.junit.Rule; import org.junit.Test; -public class CleanNexusWorkerShutdownTest extends BaseNexusTest { +public class CleanNexusWorkerShutdownTest { private static final String COMPLETED = "Completed"; private static final String INTERRUPTED = "Interrupted"; @@ -114,18 +113,12 @@ public void testShutdownNow() throws InterruptedException { assertTrue("Contains NexusOperationCompleted", found); } - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - public static class TestWorkflowImpl implements TestWorkflow1 { private final TestNexusServices.TestNexusService1 service = Workflow.newNexusServiceStub( TestNexusServices.TestNexusService1.class, NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) .setOperationOptions( NexusOperationOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofSeconds(10)) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java index f1a89e77d..fde1572a7 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java @@ -35,7 +35,7 @@ import org.junit.Rule; import org.junit.Test; -public class SyncOperationCancelledTest extends BaseNexusTest { +public class SyncOperationCancelledTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() @@ -43,11 +43,6 @@ public class SyncOperationCancelledTest extends BaseNexusTest { .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - @Test public void syncOperationImmediatelyCancelled() { TestWorkflows.TestWorkflow1 workflowStub = @@ -82,10 +77,7 @@ public String execute(String input) { .setScheduleToCloseTimeout(Duration.ofSeconds(10)) .build(); NexusServiceOptions serviceOptions = - NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) - .setOperationOptions(options) - .build(); + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); TestNexusServices.TestNexusService1 serviceStub = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); Workflow.newCancellationScope( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java index bec6de547..b6ec13642 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java @@ -34,7 +34,7 @@ import java.time.Duration; import org.junit.*; -public class SyncOperationFailTest extends BaseNexusTest { +public class SyncOperationFailTest { @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -56,11 +56,6 @@ public void failSyncOperation() { Assert.assertEquals("failed to call operation", applicationFailure.getOriginalMessage()); } - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - public static class TestNexus implements TestWorkflow1 { @Override public String execute(String endpoint) { @@ -70,10 +65,7 @@ public String execute(String endpoint) { .build(); NexusServiceOptions serviceOptions = - NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) - .setOperationOptions(options) - .build(); + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); TestNexusServices.TestNexusService1 testNexusService = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); try { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java index 22c5112b6..cce9ecc5a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java @@ -32,7 +32,7 @@ import org.junit.Rule; import org.junit.Test; -public class SyncOperationStubTest extends BaseNexusTest { +public class SyncOperationStubTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() @@ -40,11 +40,6 @@ public class SyncOperationStubTest extends BaseNexusTest { .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - @Test public void typedNexusServiceStub() { TestWorkflows.TestWorkflow1 workflowStub = @@ -61,10 +56,7 @@ public String execute(String input) { .setScheduleToCloseTimeout(Duration.ofSeconds(5)) .build(); NexusServiceOptions serviceOptions = - NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) - .setOperationOptions(options) - .build(); + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); // Try to call a synchronous operation in a blocking way TestNexusServices.TestNexusService1 serviceStub = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java index ccdd00a5d..e79731cba 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java @@ -35,7 +35,7 @@ import org.junit.Rule; import org.junit.Test; -public class SyncOperationTimeoutTest extends BaseNexusTest { +public class SyncOperationTimeoutTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() @@ -43,11 +43,6 @@ public class SyncOperationTimeoutTest extends BaseNexusTest { .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - @Test public void typedOperationTimeout() { TestWorkflows.TestWorkflow1 workflowStub = @@ -69,10 +64,7 @@ public String execute(String input) { .setScheduleToCloseTimeout(Duration.ofSeconds(1)) .build(); NexusServiceOptions serviceOptions = - NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) - .setOperationOptions(options) - .build(); + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); // Try to call a synchronous operation in a blocking way TestNexusServices.TestNexusService1 serviceStub = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java index 5a9fd0a65..27340fbec 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java @@ -29,22 +29,17 @@ import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; import org.junit.Assert; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; -public class UntypedSyncOperationStubTest extends BaseNexusTest { - @Rule - public SDKTestWorkflowRule testWorkflowRule = +public class UntypedSyncOperationStubTest { + @ClassRule + public static SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() .setWorkflowTypes(TestNexus.class) .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); - @Override - protected SDKTestWorkflowRule getTestWorkflowRule() { - return testWorkflowRule; - } - @Test public void untypedNexusServiceStub() { TestWorkflows.TestWorkflow1 workflowStub = @@ -62,7 +57,7 @@ public String execute(String name) { .build(); NexusServiceOptions serviceOptions = NexusServiceOptions.newBuilder() - .setEndpoint(getEndpointName()) + .setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName()) .setOperationOptions(options) .build(); NexusServiceStub serviceStub = @@ -90,7 +85,7 @@ public String execute(String name) { } @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public class TestNexusServiceImpl { + public static class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { // Implemented inline diff --git a/temporal-testing/build.gradle b/temporal-testing/build.gradle index 606f88fcc..98dddfa81 100644 --- a/temporal-testing/build.gradle +++ b/temporal-testing/build.gradle @@ -23,6 +23,8 @@ dependencies { implementation("com.jayway.jsonpath:json-path:$jsonPathVersion"){ exclude group: 'org.slf4j', module: 'slf4j-api' } + implementation "io.nexusrpc:nexus-sdk:$nexusVersion" + junit4Api 'junit:junit:4.13.2' diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java index b03e21cd4..0066a19c5 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java @@ -22,7 +22,9 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.IndexedValueType; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; +import io.temporal.common.Experimental; import io.temporal.common.WorkflowExecutionHistory; import io.temporal.serviceclient.OperatorServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -159,6 +161,24 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) { */ boolean registerSearchAttribute(String name, IndexedValueType type); + /** + * Register a Nexus Endpoint with the server. + * + * @param name Nexus Endpoint name + * @param taskQueue Task Queue to be used for the endpoint + * @return Endpoint object + */ + @Experimental + Endpoint createNexusEndpoint(String name, String taskQueue); + + /** + * Delete a Nexus Endpoint on the server. + * + * @param endpoint current endpoint to be deleted + */ + @Experimental + void deleteNexusEndpoint(Endpoint endpoint); + /** * @return the in-memory test Temporal service that is owned by this. * @deprecated use {{@link #getWorkflowServiceStubs()} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index 0c0868584..25cee90cb 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -22,14 +22,20 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ObjectArrays; +import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.uber.m3.tally.NoopScope; import com.uber.m3.tally.Scope; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.IndexedValueType; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; import io.temporal.api.operatorservice.v1.AddSearchAttributesRequest; +import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest; import io.temporal.api.testservice.v1.SleepRequest; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; @@ -219,6 +225,38 @@ public boolean registerSearchAttribute(String name, IndexedValueType type) { } } + @Override + public Endpoint createNexusEndpoint(String name, String taskQueue) { + EndpointSpec spec = + EndpointSpec.newBuilder() + .setName(name) + .setDescription( + Payload.newBuilder() + .setData( + ByteString.copyFromUtf8( + "Test Nexus endpoint created by the Java SDK WorkflowTestEnvironment"))) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(getNamespace()) + .setTaskQueue(taskQueue))) + .build(); + CreateNexusEndpointRequest request = + CreateNexusEndpointRequest.newBuilder().setSpec(spec).build(); + return operatorServiceStubs.blockingStub().createNexusEndpoint(request).getEndpoint(); + } + + public void deleteNexusEndpoint(Endpoint endpoint) { + operatorServiceStubs + .blockingStub() + .deleteNexusEndpoint( + io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest.newBuilder() + .setId(endpoint.getId()) + .setVersion(endpoint.getVersion()) + .build()); + } + @Deprecated public WorkflowServiceStubs getWorkflowService() { return getWorkflowServiceStubs(); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowExtension.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowExtension.java index 391eb2c9e..80a7a70ee 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowExtension.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowExtension.java @@ -20,11 +20,15 @@ package io.temporal.testing; +import static io.temporal.testing.internal.TestServiceUtils.applyNexusServiceOptions; + import com.uber.m3.tally.Scope; import io.temporal.api.enums.v1.IndexedValueType; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowOptions; +import io.temporal.common.Experimental; import io.temporal.common.metadata.POJOWorkflowImplMetadata; import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata; import io.temporal.serviceclient.WorkflowServiceStubsOptions; @@ -36,11 +40,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Parameter; import java.time.Instant; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import javax.annotation.Nonnull; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -89,15 +89,18 @@ public class TestWorkflowExtension private static final String TEST_ENVIRONMENT_KEY = "testEnvironment"; private static final String WORKER_KEY = "worker"; private static final String WORKFLOW_OPTIONS_KEY = "workflowOptions"; + private static final String NEXUS_ENDPOINT_KEY = "nexusEndpoint"; private final WorkerOptions workerOptions; private final WorkflowClientOptions workflowClientOptions; private final WorkerFactoryOptions workerFactoryOptions; private final Map, WorkflowImplementationOptions> workflowTypes; private final Object[] activityImplementations; + private final Object[] nexusServiceImplementations; private final boolean useExternalService; private final String target; private final boolean doNotStart; + private final boolean doNotSetupNexusEndpoint; private final long initialTimeMillis; private final boolean useTimeskipping; @Nonnull private final Map searchAttributes; @@ -117,9 +120,11 @@ private TestWorkflowExtension(Builder builder) { workerFactoryOptions = builder.workerFactoryOptions; workflowTypes = builder.workflowTypes; activityImplementations = builder.activityImplementations; + nexusServiceImplementations = builder.nexusServiceImplementations; useExternalService = builder.useExternalService; target = builder.target; doNotStart = builder.doNotStart; + doNotSetupNexusEndpoint = builder.doNotSetupNexusEndpoint; initialTimeMillis = builder.initialTimeMillis; useTimeskipping = builder.useTimeskipping; this.searchAttributes = builder.searchAttributes; @@ -212,13 +217,26 @@ public void beforeEach(ExtensionContext context) { String taskQueue = String.format("WorkflowTest-%s-%s", context.getDisplayName(), context.getUniqueId()); + String nexusEndpointName = String.format("WorkflowTestNexusEndpoint-%s", UUID.randomUUID()); + boolean createNexusEndpoint = + !doNotSetupNexusEndpoint && nexusServiceImplementations.length > 0; Worker worker = testEnvironment.newWorker(taskQueue, workerOptions); - workflowTypes.forEach((wft, o) -> worker.registerWorkflowImplementationTypes(o, wft)); + workflowTypes.forEach( + (wft, o) -> { + if (createNexusEndpoint) { + o = applyNexusServiceOptions(o, nexusServiceImplementations, nexusEndpointName); + } + worker.registerWorkflowImplementationTypes(o, wft); + }); worker.registerActivitiesImplementations(activityImplementations); + worker.registerNexusServiceImplementation(nexusServiceImplementations); if (!doNotStart) { testEnvironment.start(); } + if (createNexusEndpoint) { + setNexusEndpoint(context, testEnvironment.createNexusEndpoint(nexusEndpointName, taskQueue)); + } setTestEnvironment(context, testEnvironment); setWorker(context, worker); @@ -239,8 +257,12 @@ protected TestEnvironmentOptions createTestEnvOptions(long initialTimeMillis) { } @Override - public void afterEach(ExtensionContext context) throws Exception { + public void afterEach(ExtensionContext context) { + Endpoint endpoint = getNexusEndpoint(context); TestWorkflowEnvironment testEnvironment = getTestEnvironment(context); + if (endpoint != null && !testEnvironment.getOperatorServiceStubs().isShutdown()) { + testEnvironment.deleteNexusEndpoint(endpoint); + } testEnvironment.close(); } @@ -267,6 +289,14 @@ private void setWorker(ExtensionContext context, Worker worker) { getStore(context).put(WORKER_KEY, worker); } + private Endpoint getNexusEndpoint(ExtensionContext context) { + return getStore(context).get(NEXUS_ENDPOINT_KEY, Endpoint.class); + } + + private void setNexusEndpoint(ExtensionContext context, Endpoint endpoint) { + getStore(context).put(NEXUS_ENDPOINT_KEY, endpoint); + } + private WorkflowOptions getWorkflowOptions(ExtensionContext context) { return getStore(context).get(WORKFLOW_OPTIONS_KEY, WorkflowOptions.class); } @@ -284,6 +314,7 @@ private ExtensionContext.Store getStore(ExtensionContext context) { public static class Builder { private static final Object[] NO_ACTIVITIES = new Object[0]; + private static final Object[] NO_NEXUS_SERVICES = new Object[0]; private WorkerOptions workerOptions = WorkerOptions.getDefaultInstance(); private WorkflowClientOptions workflowClientOptions; @@ -291,9 +322,11 @@ public static class Builder { private String namespace = "UnitTest"; private Map, WorkflowImplementationOptions> workflowTypes = new HashMap<>(); private Object[] activityImplementations = NO_ACTIVITIES; + private Object[] nexusServiceImplementations = NO_NEXUS_SERVICES; private boolean useExternalService = false; private String target = null; private boolean doNotStart = false; + private boolean doNotSetupNexusEndpoint = false; private long initialTimeMillis; // Default to TestEnvironmentOptions isUseTimeskipping private boolean useTimeskipping = @@ -364,6 +397,22 @@ public Builder registerWorkflowImplementationTypes( return this; } + /** + * Specify Nexus service implementations to register with the Temporal workerIf any Nexus + * services are registered with the worker, the extension will automatically create a Nexus + * Endpoint for the test and the endpoint will be set on the per-service options and default + * options in {@link WorkflowImplementationOptions} if none are provided. + * + *

This can be disabled by setting {@link #setDoNotSetupNexusEndpoint(boolean)} to true. + * + * @see Worker#registerNexusServiceImplementation(Object...) + */ + @Experimental + public Builder setNexusServiceImplementation(Object... nexusServiceImplementations) { + this.nexusServiceImplementations = nexusServiceImplementations; + return this; + } + /** * Specify workflow implementation types to register with the Temporal worker. * @@ -429,6 +478,16 @@ public Builder setDoNotStart(boolean doNotStart) { return this; } + /** + * When set to true the {@link TestWorkflowEnvironment} will not automatically create a Nexus + * Endpoint. This is useful when you want to manually create a Nexus Endpoint for your test. + */ + @Experimental + public Builder setDoNotSetupNexusEndpoint(boolean doNotSetupNexusEndpoint) { + this.doNotSetupNexusEndpoint = doNotSetupNexusEndpoint; + return this; + } + /** * Set the initial time for the workflow virtual clock, milliseconds since epoch. * diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowRule.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowRule.java index 9f613a2a7..f09e34ff7 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowRule.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowRule.java @@ -20,10 +20,13 @@ package io.temporal.testing; +import static io.temporal.testing.internal.TestServiceUtils.applyNexusServiceOptions; + import com.uber.m3.tally.Scope; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.IndexedValueType; import io.temporal.api.history.v1.History; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; @@ -85,6 +88,7 @@ public class TestWorkflowRule implements TestRule { private final String namespace; private final boolean useExternalService; private final boolean doNotStart; + private final boolean doNotSetupNexusEndpoint; @Nullable private final Timeout globalTimeout; private final Class[] workflowTypes; @@ -102,6 +106,8 @@ public class TestWorkflowRule implements TestRule { @Nonnull private final Map searchAttributes; private String taskQueue; + private String nexusEndpointName; + private Endpoint nexusEndpoint; private final TestWorkflowEnvironment testEnvironment; private final TestWatcher watchman = new TestWatcher() { @@ -113,6 +119,7 @@ protected void failed(Throwable e, Description description) { private TestWorkflowRule(Builder builder) { this.doNotStart = builder.doNotStart; + this.doNotSetupNexusEndpoint = builder.doNotSetupNexusEndpoint; this.useExternalService = builder.useExternalService; this.namespace = (builder.namespace == null) ? RegisterTestNamespace.NAMESPACE : builder.namespace; @@ -181,6 +188,7 @@ public static class Builder { private String target; private boolean useExternalService; private boolean doNotStart; + private boolean doNotSetupNexusEndpoint; private long initialTimeMillis; // Default to TestEnvironmentOptions isUseTimeskipping private boolean useTimeskipping = @@ -241,6 +249,16 @@ public Builder setWorkflowTypes( return this; } + /** + * Specify Nexus service implementations to register with the Temporal worker. If any Nexus + * services are registered with the worker, the rule will automatically create a Nexus Endpoint + * for the test and the endpoint will be set on the per-service options and default options in + * {@link WorkflowImplementationOptions} if none are provided. + * + *

This can be disabled by setting {@link #setDoNotSetupNexusEndpoint(boolean)} to true. + * + * @see Worker#registerNexusServiceImplementation(Object...) + */ @Experimental public Builder setNexusServiceImplementation(Object... nexusServiceImplementations) { this.nexusServiceImplementations = nexusServiceImplementations; @@ -326,6 +344,16 @@ public Builder setDoNotStart(boolean doNotStart) { return this; } + /** + * When set to true the {@link TestWorkflowEnvironment} will not automatically create a Nexus + * Endpoint. This is useful when you want to manually create a Nexus Endpoint for your test. + */ + @Experimental + public Builder setDoNotSetupNexusEndpoint(boolean doNotSetupNexusEndpoint) { + this.doNotSetupNexusEndpoint = doNotSetupNexusEndpoint; + return this; + } + /** * Add a search attribute to be registered on the Temporal Server. * @@ -407,7 +435,15 @@ public void evaluate() throws Throwable { private String init(Description description) { String testMethod = description.getMethodName(); String taskQueue = "WorkflowTest-" + testMethod + "-" + UUID.randomUUID(); + nexusEndpointName = String.format("WorkflowTestNexusEndpoint-%s", UUID.randomUUID()); Worker worker = testEnvironment.newWorker(taskQueue, workerOptions); + WorkflowImplementationOptions workflowImplementationOptions = + this.workflowImplementationOptions; + if (!doNotSetupNexusEndpoint) { + workflowImplementationOptions = + applyNexusServiceOptions( + workflowImplementationOptions, nexusServiceImplementations, nexusEndpointName); + } worker.registerWorkflowImplementationTypes(workflowImplementationOptions, workflowTypes); worker.registerActivitiesImplementations(activityImplementations); worker.registerNexusServiceImplementation(nexusServiceImplementations); @@ -415,12 +451,18 @@ private String init(Description description) { } private void start() { + if (!doNotSetupNexusEndpoint && nexusServiceImplementations.length > 0) { + nexusEndpoint = testEnvironment.createNexusEndpoint(nexusEndpointName, taskQueue); + } if (!doNotStart) { testEnvironment.start(); } } protected void shutdown() { + if (nexusEndpoint != null && !testEnvironment.getOperatorServiceStubs().isShutdown()) { + testEnvironment.deleteNexusEndpoint(nexusEndpoint); + } testEnvironment.close(); } @@ -444,6 +486,13 @@ public String getTaskQueue() { return taskQueue; } + /** + * @return endpoint of the nexus service created for the test. + */ + public Endpoint getNexusEndpoint() { + return nexusEndpoint; + } + /** * @return client to the Temporal service used to start and query workflows. */ diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java index e1d37435c..bda485fdd 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java @@ -32,6 +32,7 @@ import io.temporal.api.enums.v1.IndexedValueType; import io.temporal.api.history.v1.History; import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowQueryException; @@ -263,6 +264,10 @@ public String getTaskQueue() { return testWorkflowRule.getTaskQueue(); } + public Endpoint getNexusEndpoint() { + return testWorkflowRule.getNexusEndpoint(); + } + public Worker getWorker() { return testWorkflowRule.getWorker(); } diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TestServiceUtils.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TestServiceUtils.java index fd54cea5d..2ef904332 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TestServiceUtils.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TestServiceUtils.java @@ -23,6 +23,7 @@ import static io.temporal.internal.common.InternalUtils.createNormalTaskQueue; import com.google.protobuf.ByteString; +import io.nexusrpc.handler.ServiceImplInstance; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; @@ -35,13 +36,53 @@ import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; public class TestServiceUtils { private TestServiceUtils() {} + public static WorkflowImplementationOptions applyNexusServiceOptions( + WorkflowImplementationOptions options, + Object[] nexusServiceImplementations, + String endpoint) { + Map newNexusServiceOptions = new HashMap<>(); + for (Object nexusService : nexusServiceImplementations) { + String serviceName = ServiceImplInstance.fromInstance(nexusService).getDefinition().getName(); + NexusServiceOptions serviceOptionWithEndpoint = + options.getNexusServiceOptions().get(serviceName); + if (serviceOptionWithEndpoint == null) { + serviceOptionWithEndpoint = NexusServiceOptions.newBuilder().build(); + } + serviceOptionWithEndpoint = + serviceOptionWithEndpoint.getEndpoint() == null + ? NexusServiceOptions.newBuilder(serviceOptionWithEndpoint) + .setEndpoint(endpoint) + .build() + : serviceOptionWithEndpoint; + newNexusServiceOptions.put(serviceName, serviceOptionWithEndpoint); + } + NexusServiceOptions defaultServiceOptions = + options.getDefaultNexusServiceOptions() == null + ? NexusServiceOptions.newBuilder().build() + : options.getDefaultNexusServiceOptions(); + if (defaultServiceOptions.getEndpoint() == null) { + options = + options.toBuilder() + .setDefaultNexusServiceOptions( + NexusServiceOptions.newBuilder(defaultServiceOptions) + .setEndpoint(endpoint) + .build()) + .build(); + } + return options.toBuilder().setNexusServiceOptions(newNexusServiceOptions).build(); + } + public static void startWorkflowExecution( String namespace, String taskqueueName, String workflowType, WorkflowServiceStubs service) throws Exception { diff --git a/temporal-testing/src/test/java/io/temporal/testing/junit5/TestWorkflowExtensionTest.java b/temporal-testing/src/test/java/io/temporal/testing/junit5/TestWorkflowExtensionTest.java index d03d34737..f4741da38 100644 --- a/temporal-testing/src/test/java/io/temporal/testing/junit5/TestWorkflowExtensionTest.java +++ b/temporal-testing/src/test/java/io/temporal/testing/junit5/TestWorkflowExtensionTest.java @@ -25,6 +25,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; import io.temporal.activity.Activity; import io.temporal.activity.ActivityInfo; import io.temporal.activity.ActivityInterface; @@ -35,9 +40,7 @@ import io.temporal.testing.TestWorkflowExtension; import io.temporal.testing.WorkflowInitialTime; import io.temporal.worker.Worker; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.*; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -55,9 +58,24 @@ public class TestWorkflowExtensionTest { TestWorkflowExtension.newBuilder() .registerWorkflowImplementationTypes(HelloWorkflowImpl.class) .setActivityImplementations(new HelloActivityImpl()) + .setNexusServiceImplementation(new TestNexusServiceImpl()) .setInitialTime(Instant.parse("2021-10-10T10:01:00Z")) .build(); + @Service + public interface TestNexusService { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusService.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync((ctx, details, name) -> "Hello, " + name + "!"); + } + } + @ActivityInterface public interface HelloActivity { String buildGreeting(String name); @@ -88,9 +106,20 @@ public static class HelloWorkflowImpl implements HelloWorkflow { HelloActivity.class, ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofMinutes(1)).build()); + private final TestNexusService nexusService = + Workflow.newNexusServiceStub( + TestNexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + @Override public String sayHello(String name) { logger.info("Hello, {}", name); + nexusService.operation(name); Workflow.sleep(Duration.ofHours(1)); return helloActivity.buildGreeting(name); }