Skip to content

Commit

Permalink
Add Nexus support to TestWorkflowExtension (temporalio#2238)
Browse files Browse the repository at this point in the history
Add Nexus support to TestWorkflowExtension
  • Loading branch information
Quinn-With-Two-Ns committed Oct 10, 2024
1 parent 5615a65 commit 3f6d1ff
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public static Builder newBuilder() {
return new Builder();
}

public static Builder newBuilder(WorkflowImplementationOptions options) {
return new Builder(options);
}

public Builder toBuilder() {
return new Builder(this);
}

public static final class Builder {

private Class<? extends Throwable>[] failWorkflowExceptionTypes;
Expand All @@ -55,6 +63,19 @@ public static final class Builder {

private Builder() {}

private Builder(WorkflowImplementationOptions options) {
if (options == null) {
return;
}
this.failWorkflowExceptionTypes = options.getFailWorkflowExceptionTypes();
this.activityOptions = options.getActivityOptions();
this.defaultActivityOptions = options.getDefaultActivityOptions();
this.localActivityOptions = options.getLocalActivityOptions();
this.defaultLocalActivityOptions = options.getDefaultLocalActivityOptions();
this.nexusServiceOptions = options.getNexusServiceOptions();
this.defaultNexusServiceOptions = options.getDefaultNexusServiceOptions();
}

/**
* Optional: Sets how workflow worker deals with exceptions thrown from the workflow code which
* include non-deterministic history events (presumably arising from non-deterministic workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.temporal.worker.WorkerOptions;
import io.temporal.worker.tuning.*;
import io.temporal.workflow.*;
import io.temporal.workflow.nexus.BaseNexusTest;
import io.temporal.workflow.shared.TestNexusServices;
import java.time.Duration;
import java.util.Map;
Expand All @@ -49,7 +48,7 @@
import org.junit.Rule;
import org.junit.Test;

public class WorkflowSlotTests extends BaseNexusTest {
public class WorkflowSlotTests {
private final int MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE = 100;
private final int MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE = 1000;
private final int MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE = 10000;
Expand Down Expand Up @@ -98,11 +97,6 @@ public void setup() {
didFail = false;
}

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@After
public void tearDown() {
testWorkflowRule.getTestEnvironment().close();
Expand Down Expand Up @@ -181,7 +175,6 @@ public static class SleepingWorkflowImpl implements TestWorkflow {
Workflow.newNexusServiceStub(
TestNexusServices.TestNexusService1.class,
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.temporal.workflow.NexusOperationOptions;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.nexus.BaseNexusTest;
import io.temporal.workflow.shared.TestNexusServices;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.time.Duration;
Expand All @@ -47,7 +46,7 @@
import org.junit.Rule;
import org.junit.Test;

public class CleanNexusWorkerShutdownTest extends BaseNexusTest {
public class CleanNexusWorkerShutdownTest {

private static final String COMPLETED = "Completed";
private static final String INTERRUPTED = "Interrupted";
Expand Down Expand Up @@ -114,18 +113,12 @@ public void testShutdownNow() throws InterruptedException {
assertTrue("Contains NexusOperationCompleted", found);
}

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

public static class TestWorkflowImpl implements TestWorkflow1 {

private final TestNexusServices.TestNexusService1 service =
Workflow.newNexusServiceStub(
TestNexusServices.TestNexusService1.class,
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,14 @@
import org.junit.Rule;
import org.junit.Test;

public class SyncOperationCancelledTest extends BaseNexusTest {
public class SyncOperationCancelledTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void syncOperationImmediatelyCancelled() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand Down Expand Up @@ -82,10 +77,7 @@ public String execute(String input) {
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
Workflow.newCancellationScope(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.time.Duration;
import org.junit.*;

public class SyncOperationFailTest extends BaseNexusTest {
public class SyncOperationFailTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand All @@ -56,11 +56,6 @@ public void failSyncOperation() {
Assert.assertEquals("failed to call operation", applicationFailure.getOriginalMessage());
}

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

public static class TestNexus implements TestWorkflow1 {
@Override
public String execute(String endpoint) {
Expand All @@ -70,10 +65,7 @@ public String execute(String endpoint) {
.build();

NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
TestNexusServices.TestNexusService1 testNexusService =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,14 @@
import org.junit.Rule;
import org.junit.Test;

public class SyncOperationStubTest extends BaseNexusTest {
public class SyncOperationStubTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void typedNexusServiceStub() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand All @@ -61,10 +56,7 @@ public String execute(String input) {
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
// Try to call a synchronous operation in a blocking way
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,14 @@
import org.junit.Rule;
import org.junit.Test;

public class SyncOperationTimeoutTest extends BaseNexusTest {
public class SyncOperationTimeoutTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void typedOperationTimeout() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand All @@ -69,10 +64,7 @@ public String execute(String input) {
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setOperationOptions(options)
.build();
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
// Try to call a synchronous operation in a blocking way
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,17 @@
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.ClassRule;
import org.junit.Test;

public class UntypedSyncOperationStubTest extends BaseNexusTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
public class UntypedSyncOperationStubTest {
@ClassRule
public static SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Override
protected SDKTestWorkflowRule getTestWorkflowRule() {
return testWorkflowRule;
}

@Test
public void untypedNexusServiceStub() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand All @@ -62,7 +57,7 @@ public String execute(String name) {
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(getEndpointName())
.setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName())
.setOperationOptions(options)
.build();
NexusServiceStub serviceStub =
Expand Down Expand Up @@ -90,7 +85,7 @@ public String execute(String name) {
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
public static class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
// Implemented inline
Expand Down
2 changes: 2 additions & 0 deletions temporal-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ dependencies {
implementation("com.jayway.jsonpath:json-path:$jsonPathVersion"){
exclude group: 'org.slf4j', module: 'slf4j-api'
}
implementation "io.nexusrpc:nexus-sdk:$nexusVersion"


junit4Api 'junit:junit:4.13.2'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.IndexedValueType;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.serviceclient.OperatorServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -159,6 +161,24 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
*/
boolean registerSearchAttribute(String name, IndexedValueType type);

/**
* Register a Nexus Endpoint with the server.
*
* @param name Nexus Endpoint name
* @param taskQueue Task Queue to be used for the endpoint
* @return Endpoint object
*/
@Experimental
Endpoint createNexusEndpoint(String name, String taskQueue);

/**
* Delete a Nexus Endpoint on the server.
*
* @param endpoint current endpoint to be deleted
*/
@Experimental
void deleteNexusEndpoint(Endpoint endpoint);

/**
* @return the in-memory test Temporal service that is owned by this.
* @deprecated use {{@link #getWorkflowServiceStubs()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.Scope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.IndexedValueType;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.nexus.v1.EndpointSpec;
import io.temporal.api.nexus.v1.EndpointTarget;
import io.temporal.api.operatorservice.v1.AddSearchAttributesRequest;
import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest;
import io.temporal.api.testservice.v1.SleepRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
Expand Down Expand Up @@ -219,6 +225,38 @@ public boolean registerSearchAttribute(String name, IndexedValueType type) {
}
}

@Override
public Endpoint createNexusEndpoint(String name, String taskQueue) {
EndpointSpec spec =
EndpointSpec.newBuilder()
.setName(name)
.setDescription(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
"Test Nexus endpoint created by the Java SDK WorkflowTestEnvironment")))
.setTarget(
EndpointTarget.newBuilder()
.setWorker(
EndpointTarget.Worker.newBuilder()
.setNamespace(getNamespace())
.setTaskQueue(taskQueue)))
.build();
CreateNexusEndpointRequest request =
CreateNexusEndpointRequest.newBuilder().setSpec(spec).build();
return operatorServiceStubs.blockingStub().createNexusEndpoint(request).getEndpoint();
}

public void deleteNexusEndpoint(Endpoint endpoint) {
operatorServiceStubs
.blockingStub()
.deleteNexusEndpoint(
io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest.newBuilder()
.setId(endpoint.getId())
.setVersion(endpoint.getVersion())
.build());
}

@Deprecated
public WorkflowServiceStubs getWorkflowService() {
return getWorkflowServiceStubs();
Expand Down
Loading

0 comments on commit 3f6d1ff

Please sign in to comment.