Skip to content

Commit

Permalink
Nexus java SDK workflow operations (temporalio#2231)
Browse files Browse the repository at this point in the history
Nexus workflow operation support
  • Loading branch information
Quinn-With-Two-Ns committed Oct 10, 2024
1 parent 3ef1df8 commit 5615a65
Show file tree
Hide file tree
Showing 42 changed files with 3,239 additions and 33 deletions.
2 changes: 2 additions & 0 deletions docker/github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ services:
- "6934:6934"
- "6935:6935"
- "6939:6939"
- "7243:7243"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ENABLE_ES=true"
- "ES_SEEDS=elasticsearch"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
- "FRONTEND_HTTP_PORT=7243"
depends_on:
- cassandra
- elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ worker.removableBuildIdDurationSinceDefault:
system.enableNexus:
- value: true
component.nexusoperations.callback.endpoint.template:
- value: http://localhost:7243/api/v1/namespaces/{{.NamespaceName}}/nexus/callback
- value: http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.RootWorkflowClientInvoker;
import io.temporal.internal.client.WorkerFactoryRegistry;
import io.temporal.internal.client.WorkflowClientInternal;
Expand Down Expand Up @@ -567,4 +568,16 @@ public void registerWorkerFactory(WorkerFactory workerFactory) {
public void deregisterWorkerFactory(WorkerFactory workerFactory) {
workerFactoryRegistry.deregister(workerFactory);
}

@Override
public WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow) {
enforceNonWorkflowThread();
WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START_NEXUS, request);
try {
workflow.apply();
return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
} finally {
WorkflowInvocationHandler.closeAsyncInvocation();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package io.temporal.client;

import static io.temporal.internal.common.InternalUtils.createNexusBoundStub;

import com.google.common.base.Defaults;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
Expand All @@ -30,15 +32,15 @@
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.common.metadata.WorkflowMethodType;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.sync.StubMarker;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.UpdateMethod;
import io.temporal.workflow.WorkflowMethod;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Optional;
import java.util.*;

/**
* Dynamic implementation of a strongly typed workflow interface that can be used to start, signal
Expand All @@ -51,6 +53,7 @@ public enum InvocationType {
START,
EXECUTE,
SIGNAL_WITH_START,
START_NEXUS,
UPDATE_WITH_START
}

Expand Down Expand Up @@ -87,6 +90,9 @@ static <T> void initAsyncInvocation(InvocationType type, T value) {
} else if (type == InvocationType.SIGNAL_WITH_START) {
SignalWithStartBatchRequest batch = (SignalWithStartBatchRequest) value;
invocationContext.set(new SignalWithStartWorkflowInvocationHandler(batch));
} else if (type == InvocationType.START_NEXUS) {
NexusStartWorkflowRequest request = (NexusStartWorkflowRequest) value;
invocationContext.set(new StartNexusOperationInvocationHandler(request));
} else if (type == InvocationType.UPDATE_WITH_START) {
UpdateWithStartWorkflowOperation operation = (UpdateWithStartWorkflowOperation) value;
invocationContext.set(new UpdateWithStartInvocationHandler(operation));
Expand Down Expand Up @@ -400,6 +406,41 @@ public <R> R getResult(Class<R> resultClass) {
}
}

private static class StartNexusOperationInvocationHandler implements SpecificInvocationHandler {
private final NexusStartWorkflowRequest request;
private Object result;

public StartNexusOperationInvocationHandler(NexusStartWorkflowRequest request) {
this.request = request;
}

@Override
public InvocationType getInvocationType() {
return InvocationType.START_NEXUS;
}

@Override
public void invoke(
POJOWorkflowInterfaceMetadata workflowMetadata,
WorkflowStub untyped,
Method method,
Object[] args) {
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
if (workflowMethod == null) {
throw new IllegalArgumentException(
"Only on a method annotated with @WorkflowMethod can be used to start a Nexus operation.");
}

result = createNexusBoundStub(untyped, request).start(args);
}

@Override
@SuppressWarnings("unchecked")
public <R> R getResult(Class<R> resultClass) {
return (R) result;
}
}

private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler {

enum State {
Expand Down
99 changes: 80 additions & 19 deletions temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.client;

import com.google.common.base.Objects;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.common.*;
Expand Down Expand Up @@ -79,6 +80,8 @@ public static WorkflowOptions merge(
.setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy())
.setStaticSummary(o.getStaticSummary())
.setStaticDetails(o.getStaticDetails())
.setRequestId(o.getRequestId())
.setCompletionCallbacks(o.getCompletionCallbacks())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -112,12 +115,16 @@ public static final class Builder {

private Duration startDelay;

private WorkflowIdConflictPolicy workflowIdConflictpolicy;
private WorkflowIdConflictPolicy workflowIdConflictPolicy;

private String staticSummary;

private String staticDetails;

private String requestId;

private List<Callback> completionCallbacks;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand All @@ -138,9 +145,11 @@ private Builder(WorkflowOptions options) {
this.contextPropagators = options.contextPropagators;
this.disableEagerExecution = options.disableEagerExecution;
this.startDelay = options.startDelay;
this.workflowIdConflictpolicy = options.workflowIdConflictpolicy;
this.workflowIdConflictPolicy = options.workflowIdConflictPolicy;
this.staticSummary = options.staticSummary;
this.staticDetails = options.staticDetails;
this.requestId = options.requestId;
this.completionCallbacks = options.completionCallbacks;
}

/**
Expand Down Expand Up @@ -191,8 +200,8 @@ public Builder setWorkflowIdReusePolicy(WorkflowIdReusePolicy workflowIdReusePol
* <li><b>TerminateExisting</b> Terminate the running workflow before starting a new one.
* </ul>
*/
public Builder setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy workflowIdConflictpolicy) {
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
public Builder setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy workflowIdConflictPolicy) {
this.workflowIdConflictPolicy = workflowIdConflictPolicy;
return this;
}

Expand Down Expand Up @@ -413,6 +422,28 @@ public Builder setStaticDetails(String staticDetails) {
return this;
}

/**
* A unique identifier for this start request.
*
* <p>WARNING: Not intended for User Code.
*/
@Experimental
public Builder setRequestId(String requestId) {
this.requestId = requestId;
return this;
}

/**
* Callbacks to be called by the server when this workflow reaches a terminal state.
*
* <p>WARNING: Not intended for User Code.
*/
@Experimental
public Builder setCompletionCallbacks(List<Callback> completionCallbacks) {
this.completionCallbacks = completionCallbacks;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -429,9 +460,11 @@ public WorkflowOptions build() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy,
workflowIdConflictPolicy,
staticSummary,
staticDetails);
staticDetails,
requestId,
completionCallbacks);
}

/**
Expand All @@ -453,9 +486,11 @@ public WorkflowOptions validateBuildWithDefaults() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy,
workflowIdConflictPolicy,
staticSummary,
staticDetails);
staticDetails,
requestId,
completionCallbacks);
}
}

Expand Down Expand Up @@ -487,12 +522,16 @@ public WorkflowOptions validateBuildWithDefaults() {

private final Duration startDelay;

private final WorkflowIdConflictPolicy workflowIdConflictpolicy;
private final WorkflowIdConflictPolicy workflowIdConflictPolicy;

private final String staticSummary;

private final String staticDetails;

private final String requestId;

private final List<Callback> completionCallbacks;

private WorkflowOptions(
String workflowId,
WorkflowIdReusePolicy workflowIdReusePolicy,
Expand All @@ -508,9 +547,11 @@ private WorkflowOptions(
List<ContextPropagator> contextPropagators,
boolean disableEagerExecution,
Duration startDelay,
WorkflowIdConflictPolicy workflowIdConflictpolicy,
WorkflowIdConflictPolicy workflowIdConflictPolicy,
String staticSummary,
String staticDetails) {
String staticDetails,
String requestId,
List<Callback> completionCallbacks) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -525,9 +566,11 @@ private WorkflowOptions(
this.contextPropagators = contextPropagators;
this.disableEagerExecution = disableEagerExecution;
this.startDelay = startDelay;
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
this.workflowIdConflictPolicy = workflowIdConflictPolicy;
this.staticSummary = staticSummary;
this.staticDetails = staticDetails;
this.requestId = requestId;
this.completionCallbacks = completionCallbacks;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -596,7 +639,17 @@ public boolean isDisableEagerExecution() {
}

public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() {
return workflowIdConflictpolicy;
return workflowIdConflictPolicy;
}

@Experimental
public String getRequestId() {
return requestId;
}

@Experimental
public List<Callback> getCompletionCallbacks() {
return completionCallbacks;
}

public String getStaticSummary() {
Expand Down Expand Up @@ -630,9 +683,11 @@ public boolean equals(Object o) {
&& Objects.equal(contextPropagators, that.contextPropagators)
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
&& Objects.equal(startDelay, that.startDelay)
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy)
&& Objects.equal(workflowIdConflictPolicy, that.workflowIdConflictPolicy)
&& Objects.equal(staticSummary, that.staticSummary)
&& Objects.equal(staticDetails, that.staticDetails);
&& Objects.equal(staticDetails, that.staticDetails)
&& Objects.equal(requestId, that.requestId)
&& Objects.equal(completionCallbacks, that.completionCallbacks);
}

@Override
Expand All @@ -652,9 +707,11 @@ public int hashCode() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy,
workflowIdConflictPolicy,
staticSummary,
staticDetails);
staticDetails,
requestId,
completionCallbacks);
}

@Override
Expand Down Expand Up @@ -691,12 +748,16 @@ public String toString() {
+ disableEagerExecution
+ ", startDelay="
+ startDelay
+ ", workflowIdConflictpolicy="
+ workflowIdConflictpolicy
+ ", workflowIdConflictPolicy="
+ workflowIdConflictPolicy
+ ", staticSummary="
+ staticSummary
+ ", staticDetails="
+ staticDetails
+ ", requestId="
+ requestId
+ ", completionCallbacks="
+ completionCallbacks
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,12 @@ <R> CompletableFuture<R> getResultAsync(
void terminate(@Nullable String reason, Object... details);

Optional<WorkflowOptions> getOptions();

/**
* Creates a new stub that can be used to start a new run of the same workflow type with the same
* options.
*
* @param options new options to use for the stub
*/
WorkflowStub newInstance(WorkflowOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ public Optional<WorkflowOptions> getOptions() {
return Optional.ofNullable(options);
}

@Override
public WorkflowStub newInstance(WorkflowOptions options) {
return new WorkflowStubImpl(
clientOptions, workflowClientInvoker, workflowType.orElse(null), options);
}

private void checkStarted() {
if (execution.get() == null || execution.get().getWorkflowId() == null) {
throw new IllegalStateException("Null workflowId. Was workflow started?");
Expand Down
Loading

0 comments on commit 5615a65

Please sign in to comment.