Skip to content

Commit

Permalink
Nexus worker support (temporalio#2214)
Browse files Browse the repository at this point in the history
Nexus worker support
  • Loading branch information
Quinn-With-Two-Ns committed Oct 10, 2024
1 parent e01a49e commit 3ef1df8
Show file tree
Hide file tree
Showing 49 changed files with 2,929 additions and 95 deletions.
10 changes: 9 additions & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ history.MaxBufferedQueryCount:
worker.buildIdScavengerEnabled:
- value: true
worker.removableBuildIdDurationSinceDefault:
- value: 1
- value: 1
system.enableNexus:
- value: true
component.nexusoperations.callback.endpoint.template:
- value: http://localhost:7243/api/v1/namespaces/{{.NamespaceName}}/nexus/callback
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
AllowInsecure: true
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ public static Duration parseRequestTimeout(String timeout) {
try {
if (timeout.endsWith("m")) {
return Duration.ofMillis(
Math.round(
10e3 * Double.parseDouble(timeout.substring(0, timeout.length() - 1)) / 60.0));
} else if (timeout.endsWith("s")) {
return Duration.ofMillis(
Math.round(10e3 * Double.parseDouble(timeout.substring(0, timeout.length() - 1))));
Math.round(1e3 * 60 * Double.parseDouble(timeout.substring(0, timeout.length() - 1))));
} else if (timeout.endsWith("ms")) {
return Duration.ofMillis(
Math.round(Double.parseDouble(timeout.substring(0, timeout.length() - 2))));
} else if (timeout.endsWith("s")) {
return Duration.ofMillis(
Math.round(1e3 * Double.parseDouble(timeout.substring(0, timeout.length() - 1))));
} else {
throw new IllegalArgumentException("Invalid timeout format: " + timeout);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.nexus;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.uber.m3.tally.Scope;
import io.nexusrpc.FailureInfo;
import io.nexusrpc.Header;
import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.handler.*;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.nexus.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.worker.TypeAlreadyRegisteredException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class NexusTaskHandlerImpl implements NexusTaskHandler {
private final DataConverter dataConverter;
private final String namespace;
private final String taskQueue;
private final WorkflowClient client;
private ServiceHandler serviceHandler;
private final Map<String, ServiceImplInstance> serviceImplInstances =
Collections.synchronizedMap(new HashMap<>());
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

public NexusTaskHandlerImpl(
WorkflowClient client, String namespace, String taskQueue, DataConverter dataConverter) {
this.client = client;
this.namespace = namespace;
this.taskQueue = taskQueue;
this.dataConverter = dataConverter;
}

@Override
public boolean start() {
if (serviceImplInstances.isEmpty()) {
return false;
}
ServiceHandler.Builder serviceHandlerBuilder =
ServiceHandler.newBuilder().setSerializer(new PayloadSerializer(dataConverter));
serviceImplInstances.forEach((name, instance) -> serviceHandlerBuilder.addInstance(instance));
serviceHandler = serviceHandlerBuilder.build();
return true;
}

@Override
public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException {
Request request = task.getResponse().getRequest();
Map<String, String> headers = request.getHeaderMap();
if (headers == null) {
headers = Collections.emptyMap();
}

OperationContext.Builder ctx = OperationContext.newBuilder();
headers.forEach(ctx::putHeader);
OperationMethodCanceller canceller = new OperationMethodCanceller();
ctx.setMethodCanceller(canceller);

ScheduledFuture<?> timeoutTask = null;
AtomicBoolean timedOut = new AtomicBoolean(false);
try {
String timeoutString = headers.get(Header.REQUEST_TIMEOUT);
if (timeoutString != null) {
try {
Duration timeout = NexusUtil.parseRequestTimeout(timeoutString);
timeoutTask =
scheduler.schedule(
() -> {
timedOut.set(true);
canceller.cancel("timeout");
},
timeout.toMillis(),
java.util.concurrent.TimeUnit.MILLISECONDS);
} catch (IllegalArgumentException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.BAD_REQUEST.toString())
.setFailure(
Failure.newBuilder().setMessage("cannot parse request timeout").build())
.build());
}
}

switch (request.getVariantCase()) {
case START_OPERATION:
StartOperationResponse startResponse =
handleStartOperation(ctx, request.getStartOperation());
return new Result(Response.newBuilder().setStartOperation(startResponse).build());
case CANCEL_OPERATION:
CancelOperationResponse cancelResponse =
handleCancelledOperation(ctx, request.getCancelOperation());
return new Result(Response.newBuilder().setCancelOperation(cancelResponse).build());
default:
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.NOT_IMPLEMENTED.toString())
.setFailure(Failure.newBuilder().setMessage("unknown request type").build())
.build());
}
} catch (OperationHandlerException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setFailure(createFailure(e.getFailureInfo()))
.build());
} catch (Throwable e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.INTERNAL.toString())
.setFailure(
Failure.newBuilder()
.setMessage("internal error")
.setDetails(ByteString.copyFromUtf8(e.toString()))
.build())
.build());
} finally {
// If the task timed out, we should not send a response back to the server
if (timedOut.get()) {
throw new TimeoutException("Nexus task completed after timeout.");
}
canceller.cancel("");
if (timeoutTask != null) {
timeoutTask.cancel(false);
}
}
}

private Failure createFailure(FailureInfo failInfo) {
Failure.Builder failure = Failure.newBuilder();
if (failInfo.getMessage() != null) {
failure.setMessage(failInfo.getMessage());
}
if (failInfo.getDetailsJson() != null) {
failure.setDetails(ByteString.copyFromUtf8(failInfo.getDetailsJson()));
}
if (!failInfo.getMetadata().isEmpty()) {
failure.putAllMetadata(failInfo.getMetadata());
}
return failure.build();
}

private CancelOperationResponse handleCancelledOperation(
OperationContext.Builder ctx, CancelOperationRequest task) {
ctx.setService(task.getService()).setOperation(task.getOperation());

OperationCancelDetails operationCancelDetails =
OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build();

serviceHandler.cancelOperation(ctx.build(), operationCancelDetails);

return CancelOperationResponse.newBuilder().build();
}

private StartOperationResponse handleStartOperation(
OperationContext.Builder ctx, StartOperationRequest task)
throws InvalidProtocolBufferException {
ctx.setService(task.getService()).setOperation(task.getOperation());

OperationStartDetails.Builder operationStartDetails =
OperationStartDetails.newBuilder()
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());

StartOperationResponse.Builder startResponseBuilder = StartOperationResponse.newBuilder();
try {
OperationStartResult<HandlerResultContent> result =
serviceHandler.startOperation(ctx.build(), operationStartDetails.build(), input.build());
if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.build());
} else {
startResponseBuilder.setAsyncSuccess(
StartOperationResponse.Async.newBuilder()
.setOperationId(result.getAsyncOperationId())
.build());
}
} catch (OperationUnsuccessfulException e) {
startResponseBuilder.setOperationError(
UnsuccessfulOperationError.newBuilder()
.setOperationState(e.getState().toString().toLowerCase())
.setFailure(
Failure.newBuilder()
.setMessage(e.getFailureInfo().getMessage())
.putAllMetadata(e.getFailureInfo().getMetadata())
.build())
.build());
}
return startResponseBuilder.build();
}

public void registerNexusServiceImplementations(Object[] nexusServiceImplementation) {
for (Object nexusService : nexusServiceImplementation) {
registerNexusService(nexusService);
}
}

private void registerNexusService(Object nexusService) {
if (nexusService instanceof Class) {
throw new IllegalArgumentException("Nexus service object instance expected, not the class");
}
ServiceImplInstance instance = ServiceImplInstance.fromInstance(nexusService);
if (serviceImplInstances.put(instance.getDefinition().getName(), instance) != null) {
throw new TypeAlreadyRegisteredException(
instance.getDefinition().getName(),
"\""
+ instance.getDefinition().getName()
+ "\" service type is already registered with the worker");
}
}

public CompletionStage<Void> shutdown(ShutdownManager shutdownManager, boolean unused) {
return shutdownManager.shutdownExecutorNow(
scheduler, "NexusTaskHandlerImpl#scheduler", Duration.ofSeconds(5));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.nexus;

import com.google.protobuf.InvalidProtocolBufferException;
import io.nexusrpc.Serializer;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DataConverter;
import java.lang.reflect.Type;
import java.util.Optional;
import javax.annotation.Nullable;

/**
* PayloadSerializer is a serializer that converts objects to and from {@link
* io.nexusrpc.Serializer.Content} objects by using the {@link DataConverter} to convert objects to
* and from {@link Payload} objects.
*/
class PayloadSerializer implements Serializer {
DataConverter dataConverter;

PayloadSerializer(DataConverter dataConverter) {
this.dataConverter = dataConverter;
}

@Override
public Content serialize(@Nullable Object o) {
Optional<Payload> payload = dataConverter.toPayload(o);
Content.Builder content = Content.newBuilder();
content.setData(payload.get().toByteArray());
return content.build();
}

@Override
public @Nullable Object deserialize(Content content, Type type) {
try {
Payload payload = Payload.parseFrom(content.getData());
return dataConverter.fromPayload(payload, type.getClass(), type);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private void cancelNexusOperationCommand() {
Failure canceledFailure =
Failure.newBuilder()
.setSource(JAVA_SDK)
.setMessage("operation canceled before it was started")
.setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
.build();
NexusOperationFailureInfo nexusFailureInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ public SyncWorkflowContext(
workflowImplementationOptions.getDefaultLocalActivityOptions();
this.localActivityOptionsMap =
new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
// TODO(SDK-1829) Add NexusServiceOptions to WorkflowImplementationOptions
this.defaultNexusServiceOptions = NexusServiceOptions.getDefaultInstance();
this.nexusServiceOptionsMap = new HashMap<>();
this.defaultNexusServiceOptions =
workflowImplementationOptions.getDefaultNexusServiceOptions();
this.nexusServiceOptionsMap =
new HashMap<>(workflowImplementationOptions.getNexusServiceOptions());
}
this.workflowImplementationOptions =
workflowImplementationOptions == null
Expand Down
Loading

0 comments on commit 3ef1df8

Please sign in to comment.