Skip to content

Commit

Permalink
Add workflow metadata query
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 30, 2024
1 parent 0b192d3 commit a2607e4
Show file tree
Hide file tree
Showing 19 changed files with 441 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public interface WorkflowClient {
/** Use this constant as a query type to get a workflow stack trace. */
String QUERY_TYPE_STACK_TRACE = "__stack_trace";

/** Use this constant as a query type to get the workflow metadata. */
String QUERY_TYPE_WORKFLOW_METADATA = "__temporal_workflow_metadata";

/** Replays workflow to the current state and returns empty result or error if replay failed. */
String QUERY_TYPE_REPLAY_ONLY = "__replay_only";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ public Header getHeader() {

final class SignalRegistrationRequest {
private final String signalType;
private final String description;
private final HandlerUnfinishedPolicy unfinishedPolicy;
private final Class<?>[] argTypes;
private final Type[] genericArgTypes;
Expand All @@ -464,19 +465,37 @@ public SignalRegistrationRequest(
Type[] genericArgTypes,
Functions.Proc1<Object[]> callback) {
this.signalType = signalType;
this.description = "";
this.unfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
this.callback = callback;
}

// Kept for backward compatibility
public SignalRegistrationRequest(
String signalType,
HandlerUnfinishedPolicy unfinishedPolicy,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Proc1<Object[]> callback) {
this.signalType = signalType;
this.description = "";
this.unfinishedPolicy = unfinishedPolicy;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
this.callback = callback;
}

public SignalRegistrationRequest(
String signalType,
String description,
HandlerUnfinishedPolicy unfinishedPolicy,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Proc1<Object[]> callback) {
this.signalType = signalType;
this.description = description;
this.unfinishedPolicy = unfinishedPolicy;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
Expand All @@ -487,6 +506,11 @@ public String getSignalType() {
return signalType;
}

@Experimental
public String getDescription() {
return description;
}

public HandlerUnfinishedPolicy getUnfinishedPolicy() {
return unfinishedPolicy;
}
Expand Down Expand Up @@ -519,20 +543,40 @@ public List<SignalRegistrationRequest> getRequests() {
@Experimental
final class UpdateRegistrationRequest {
private final String updateName;
private final String description;
private final HandlerUnfinishedPolicy unfinishedPolicy;
private final Class<?>[] argTypes;
private final Type[] genericArgTypes;
private final Functions.Func1<Object[], Object> executeCallback;
private final Functions.Proc1<Object[]> validateCallback;

// Kept for backward compatibility
public UpdateRegistrationRequest(
String updateName,
HandlerUnfinishedPolicy unfinishedPolicy,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Proc1<Object[]> validateCallback,
Functions.Func1<Object[], Object> executeCallback) {
this.updateName = updateName;
this.description = "";
this.unfinishedPolicy = unfinishedPolicy;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
this.validateCallback = validateCallback;
this.executeCallback = executeCallback;
}

public UpdateRegistrationRequest(
String updateName,
String description,
HandlerUnfinishedPolicy unfinishedPolicy,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Proc1<Object[]> validateCallback,
Functions.Func1<Object[], Object> executeCallback) {
this.updateName = updateName;
this.description = description;
this.unfinishedPolicy = unfinishedPolicy;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
Expand All @@ -544,6 +588,11 @@ public String getUpdateName() {
return updateName;
}

@Experimental
public String getDescription() {
return description;
}

public HandlerUnfinishedPolicy getUnfinishedPolicy() {
return unfinishedPolicy;
}
Expand Down Expand Up @@ -580,16 +629,32 @@ public List<UpdateRegistrationRequest> getRequests() {

final class RegisterQueryInput {
private final String queryType;
private final String description;
private final Class<?>[] argTypes;
private final Type[] genericArgTypes;
private final Functions.Func1<Object[], Object> callback;

// Kept for backward compatibility
public RegisterQueryInput(
String queryType,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Func1<Object[], Object> callback) {
this.queryType = queryType;
this.description = "";
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
this.callback = callback;
}

public RegisterQueryInput(
String queryType,
String description,
Class<?>[] argTypes,
Type[] genericArgTypes,
Functions.Func1<Object[], Object> callback) {
this.queryType = queryType;
this.description = description;
this.argTypes = argTypes;
this.genericArgTypes = genericArgTypes;
this.callback = callback;
Expand All @@ -599,6 +664,11 @@ public String getQueryType() {
return queryType;
}

@Experimental
public String getDescription() {
return description;
}

public Class<?>[] getArgTypes() {
return argTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class POJOWorkflowMethod {
private final WorkflowMethodType type;
private final Method method;
private final Optional<String> nameFromAnnotation;
private final Optional<String> descriptionFromAnnotation;

POJOWorkflowMethod(Method method) {
this.method = Objects.requireNonNull(method);
Expand All @@ -43,6 +44,7 @@ final class POJOWorkflowMethod {
int count = 0;
WorkflowMethodType type = null;
String name = null;
String description = null;
if (workflowMethod != null) {
type = WorkflowMethodType.WORKFLOW;
count++;
Expand All @@ -56,6 +58,7 @@ final class POJOWorkflowMethod {
}
count++;
name = signalMethod.name();
description = signalMethod.description();
}
if (queryMethod != null) {
type = WorkflowMethodType.QUERY;
Expand All @@ -65,11 +68,13 @@ final class POJOWorkflowMethod {
}
count++;
name = queryMethod.name();
description = queryMethod.description();
}
if (updateMethod != null) {
type = WorkflowMethodType.UPDATE;
count++;
name = updateMethod.name();
description = updateMethod.description();
}
if (updateValidatorMethod != null) {
type = WorkflowMethodType.UPDATE_VALIDATOR;
Expand All @@ -86,13 +91,19 @@ final class POJOWorkflowMethod {
throw new IllegalArgumentException(
method
+ " must contain exactly one annotation "
+ "of @WorkflowMethod, @QueryMethod or @SignalMethod");
+ "of @WorkflowMethod, @QueryMethod @UpdateMethod or @SignalMethod");
}
if (Strings.isNullOrEmpty(name)) {
this.nameFromAnnotation = Optional.empty();
} else {
this.nameFromAnnotation = Optional.of(name);
}

if (Strings.isNullOrEmpty(description)) {
this.descriptionFromAnnotation = Optional.empty();
} else {
this.descriptionFromAnnotation = Optional.of(description);
}
this.type = Objects.requireNonNull(type);
}

Expand All @@ -108,6 +119,10 @@ public Optional<String> getNameFromAnnotation() {
return nameFromAnnotation;
}

public Optional<String> getDescriptionFromAnnotation() {
return descriptionFromAnnotation;
}

/** Compare and hash on method only. */
@Override
public boolean equals(Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class POJOWorkflowMethodMetadata {

private final POJOWorkflowMethod workflowMethod;
private final String name;
private final String description;
private final Class<?> workflowInterface;

POJOWorkflowMethodMetadata(POJOWorkflowMethod methodMetadata, Class<?> workflowInterface) {
Expand All @@ -47,6 +48,7 @@ public final class POJOWorkflowMethodMetadata {
} else {
this.name = nameFromAnnotation.orElse(methodMetadata.getMethod().getName());
}
this.description = workflowMethod.getDescriptionFromAnnotation().orElse("");
}

public WorkflowMethodType getType() {
Expand All @@ -62,6 +64,10 @@ public String getName() {
return name;
}

public String getDescription() {
return description;
}

public Method getWorkflowMethod() {
return workflowMethod.getMethod();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
package io.temporal.internal.sync;

import io.temporal.api.common.v1.Payloads;
import io.temporal.api.sdk.v1.WorkflowInteractionDefinition;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.DynamicQueryHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,4 +102,25 @@ public void registerDynamicQueryHandler(
WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput input) {
dynamicQueryHandler = input.getHandler();
}

public List<WorkflowInteractionDefinition> getQueryHandlers() {
List<WorkflowInteractionDefinition> handlers = new ArrayList<>(queryCallbacks.size() + 1);
for (Map.Entry<String, WorkflowOutboundCallsInterceptor.RegisterQueryInput> entry :
queryCallbacks.entrySet()) {
WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = entry.getValue();
handlers.add(
WorkflowInteractionDefinition.newBuilder()
.setName(handler.getQueryType())
.setDescription(handler.getDescription())
.build());
}
if (dynamicQueryHandler != null) {
handlers.add(
WorkflowInteractionDefinition.newBuilder()
.setDescription(dynamicQueryHandler.getDescription())
.build());
}
handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName));
return handlers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.sync;

import io.temporal.api.common.v1.Payloads;
import io.temporal.api.sdk.v1.WorkflowInteractionDefinition;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.DataConverterException;
import io.temporal.common.converter.EncodedValues;
Expand Down Expand Up @@ -160,6 +161,27 @@ private void logSerializationException(
Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
}

public List<WorkflowInteractionDefinition> getSignalHandlers() {
List<WorkflowInteractionDefinition> handlers = new ArrayList<>(signalCallbacks.size() + 1);
for (Map.Entry<String, WorkflowOutboundCallsInterceptor.SignalRegistrationRequest> entry :
signalCallbacks.entrySet()) {
WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler = entry.getValue();
handlers.add(
WorkflowInteractionDefinition.newBuilder()
.setName(handler.getSignalType())
.setDescription(handler.getDescription())
.build());
}
if (dynamicSignalHandler != null) {
handlers.add(
WorkflowInteractionDefinition.newBuilder()
.setDescription(dynamicSignalHandler.getDescription())
.build());
}
handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName));
return handlers;
}

private static class SignalData {
private final String signalName;
private final Optional<Payloads> payload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.temporal.internal.statemachines.UpdateProtocolCallback;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.payload.context.WorkflowSerializationContext;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.UpdateInfo;
import java.util.List;
Expand Down Expand Up @@ -69,6 +70,7 @@ class SyncWorkflow implements ReplayWorkflow {
private WorkflowExecutionHandler workflowProc;
private DeterministicRunner runner;
private DataConverter dataConverter;
private DataConverter dataConverterWithWorkflowContext;

public SyncWorkflow(
String namespace,
Expand All @@ -92,6 +94,9 @@ public SyncWorkflow(
this.cache = cache;
this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
this.dataConverter = dataConverter;
this.dataConverterWithWorkflowContext =
dataConverter.withContext(
new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
this.workflowContext =
new SyncWorkflowContext(
namespace,
Expand Down Expand Up @@ -238,6 +243,9 @@ public Optional<Payloads> query(WorkflowQuery query) {
// converter
return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
}
if (WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA.equals(query.getQueryType())) {
return dataConverterWithWorkflowContext.toPayloads(workflowContext.getWorkflowMetadata());
}
Optional<Payloads> args =
query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args);
Expand Down
Loading

0 comments on commit a2607e4

Please sign in to comment.