Skip to content

Commit

Permalink
Adding JQ expression support for http call
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
  • Loading branch information
fjtirado committed Nov 4, 2024
1 parent 0863b31 commit fbea79b
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 35 deletions.
6 changes: 6 additions & 0 deletions impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<artifactId>serverlessworkflow-impl</artifactId>
<properties>
<version.org.glassfish.jersey>3.1.9</version.org.glassfish.jersey>
<version.net.thisptr>1.0.1</version.net.thisptr>
</properties>
<dependencies>
<dependency>
Expand All @@ -25,6 +26,11 @@
<artifactId>jersey-media-json-jackson</artifactId>
<version>${version.org.glassfish.jersey}</version>
</dependency>
<dependency>
<groupId>net.thisptr</groupId>
<artifactId>jackson-jq</artifactId>
<version>${version.net.thisptr}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {

protected final T task;
protected final ExpressionFactory exprFactory;

protected AbstractTaskExecutor(T task) {
protected AbstractTaskExecutor(T task, ExpressionFactory exprFactory) {
this.task = task;
this.exprFactory = exprFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@
import io.serverlessworkflow.api.types.CallTask;
import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.impl.jq.JQExpressionFactory;

public class DefaultTaskExecutorFactory implements TaskExecutorFactory {

protected DefaultTaskExecutorFactory() {}
private final ExpressionFactory exprFactory;

private static TaskExecutorFactory instance = new DefaultTaskExecutorFactory();
private static TaskExecutorFactory instance =
new DefaultTaskExecutorFactory(JQExpressionFactory.get());

public static TaskExecutorFactory get() {
return instance;
}

public static TaskExecutorFactory get(ExpressionFactory factory) {
return new DefaultTaskExecutorFactory(factory);
}

protected DefaultTaskExecutorFactory(ExpressionFactory exprFactory) {
this.exprFactory = exprFactory;
}

public TaskExecutor<? extends TaskBase> getTaskExecutor(Task task) {

if (task.getCallTask() != null) {
CallTask callTask = task.getCallTask();
if (callTask.getCallHTTP() != null) {
return new HttpExecutor(callTask.getCallHTTP());
return new HttpExecutor(callTask.getCallHTTP(), exprFactory);
}
}
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
Expand Down
22 changes: 22 additions & 0 deletions impl/src/main/java/io/serverlessworkflow/impl/Expression.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;

public interface Expression {
JsonNode eval(JsonNode input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public interface ExpressionFactory {

Expression getExpression(String expression);
}
40 changes: 40 additions & 0 deletions impl/src/main/java/io/serverlessworkflow/impl/ExpressionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public class ExpressionUtils {

private static final String EXPR_PREFIX = "${";
private static final String EXPR_SUFFIX = "}";

private ExpressionUtils() {}

public static String trimExpr(String expr) {
expr = expr.trim();
if (expr.startsWith(EXPR_PREFIX)) {
expr = trimExpr(expr, EXPR_PREFIX, EXPR_SUFFIX);
}
return expr.trim();
}

private static String trimExpr(String expr, String prefix, String suffix) {
expr = expr.substring(prefix.length());
if (expr.endsWith(suffix)) {
expr = expr.substring(0, expr.length() - suffix.length());
}
return expr;
}
}
95 changes: 79 additions & 16 deletions impl/src/main/java/io/serverlessworkflow/impl/HttpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.CallHTTP;
import io.serverlessworkflow.api.types.Endpoint;
import io.serverlessworkflow.api.types.EndpointUri;
import io.serverlessworkflow.api.types.HTTPArguments;
import io.serverlessworkflow.api.types.UriTemplate;
import io.serverlessworkflow.api.types.WithHTTPHeaders;
import io.serverlessworkflow.api.types.WithHTTPQuery;
import jakarta.ws.rs.HttpMethod;
Expand All @@ -27,40 +30,33 @@
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation.Builder;
import jakarta.ws.rs.client.WebTarget;
import java.net.URI;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;

public class HttpExecutor extends AbstractTaskExecutor<CallHTTP> {

private static final Client client = ClientBuilder.newClient();

public HttpExecutor(CallHTTP task) {
super(task);
private final Function<JsonNode, WebTarget> targetSupplier;

public HttpExecutor(CallHTTP task, ExpressionFactory factory) {
super(task, factory);
this.targetSupplier = getTargetSupplier(task.getWith().getEndpoint());
}

@Override
protected JsonNode internalExecute(JsonNode node) {
HTTPArguments httpArgs = task.getWith();
// missing checks
String uri =
httpArgs
.getEndpoint()
.getEndpointConfiguration()
.getUri()
.getLiteralEndpointURI()
.getLiteralUriTemplate();
WebTarget target = client.target(uri);
WithHTTPQuery query = httpArgs.getQuery();
WebTarget target = targetSupplier.apply(node);
if (query != null) {
for (Entry<String, Object> entry : query.getAdditionalProperties().entrySet()) {
target = target.queryParam(entry.getKey(), entry.getValue());
}
}
Builder request =
target
.resolveTemplates(
JsonUtils.mapper().convertValue(node, new TypeReference<Map<String, Object>>() {}))
.request();
Builder request = target.request();
WithHTTPHeaders headers = httpArgs.getHeaders();
if (headers != null) {
headers.getAdditionalProperties().forEach(request::header);
Expand All @@ -73,4 +69,71 @@ protected JsonNode internalExecute(JsonNode node) {
return request.post(Entity.json(httpArgs.getBody()), JsonNode.class);
}
}

private Function<JsonNode, WebTarget> getTargetSupplier(Endpoint endpoint) {
if (endpoint.getEndpointConfiguration() != null) {
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
if (uri.getLiteralEndpointURI() != null) {
return getURISupplier(uri.getLiteralEndpointURI());
} else if (uri.getExpressionEndpointURI() != null) {
return new ExpressionURISupplier(uri.getExpressionEndpointURI());
}
} else if (endpoint.getRuntimeExpression() != null) {
return new ExpressionURISupplier(endpoint.getRuntimeExpression());
} else if (endpoint.getUriTemplate() != null) {
return getURISupplier(endpoint.getUriTemplate());
}
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
}

private Function<JsonNode, WebTarget> getURISupplier(UriTemplate template) {
if (template.getLiteralUri() != null) {
return new URISupplier(template.getLiteralUri());
} else if (template.getLiteralUriTemplate() != null) {
return new URITemplateSupplier(template.getLiteralUriTemplate());
}
throw new IllegalArgumentException("Invalid uritemplate definition " + template);
}

private class URISupplier implements Function<JsonNode, WebTarget> {
private final URI uri;

public URISupplier(URI uri) {
this.uri = uri;
}

@Override
public WebTarget apply(JsonNode input) {
return client.target(uri);
}
}

private class URITemplateSupplier implements Function<JsonNode, WebTarget> {
private final String uri;

public URITemplateSupplier(String uri) {
this.uri = uri;
}

@Override
public WebTarget apply(JsonNode input) {
return client
.target(uri)
.resolveTemplates(
JsonUtils.mapper().convertValue(input, new TypeReference<Map<String, Object>>() {}));
}
}

private class ExpressionURISupplier implements Function<JsonNode, WebTarget> {
private Expression expr;

public ExpressionURISupplier(String expr) {
this.expr = exprFactory.getExpression(expr);
}

@Override
public WebTarget apply(JsonNode input) {
return client.target(expr.eval(input).asText());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ public class WorkflowDefinition {

private WorkflowDefinition(
Workflow workflow,
TaskExecutorFactory factory,
TaskExecutorFactory taskFactory,
Collection<WorkflowExecutionListener> listeners) {
this.workflow = workflow;
this.factory = factory;
this.taskFactory = taskFactory;
this.listeners = listeners;
}

private final Workflow workflow;
private final Collection<WorkflowExecutionListener> listeners;
private final TaskExecutorFactory factory;
private final TaskExecutorFactory taskFactory;
private final Map<JsonPointer, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();

public static class Builder {
private final Workflow workflow;
private TaskExecutorFactory factory = DefaultTaskExecutorFactory.get();
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get();
private Collection<WorkflowExecutionListener> listeners;

private Builder(Workflow workflow) {
Expand All @@ -64,14 +64,14 @@ public Builder withListener(WorkflowExecutionListener listener) {
}

public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
this.factory = factory;
this.taskFactory = factory;
return this;
}

public WorkflowDefinition build() {
return new WorkflowDefinition(
workflow,
factory,
taskFactory,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners));
Expand All @@ -83,7 +83,7 @@ public static Builder builder(Workflow workflow) {
}

public WorkflowInstance execute(Object input) {
return new WorkflowInstance(factory, JsonUtils.fromValue(input));
return new WorkflowInstance(taskFactory, JsonUtils.fromValue(input));
}

enum State {
Expand All @@ -97,7 +97,6 @@ public class WorkflowInstance {
private final JsonNode input;
private JsonNode output;
private State state;
private TaskExecutorFactory factory;

private JsonPointer currentPos;

Expand All @@ -106,7 +105,6 @@ private WorkflowInstance(TaskExecutorFactory factory, JsonNode input) {
this.output = object();
this.state = State.STARTED;
this.currentPos = JsonPointer.compile("/");
this.factory = factory;
processDo(workflow.getDo());
}

Expand All @@ -119,7 +117,7 @@ private void processDo(List<TaskItem> tasks) {
this.output =
MergeUtils.merge(
taskExecutors
.computeIfAbsent(currentPos, k -> factory.getTaskExecutor(task.getTask()))
.computeIfAbsent(currentPos, k -> taskFactory.getTaskExecutor(task.getTask()))
.apply(input),
output);
listeners.forEach(l -> l.onTaskEnded(currentPos, task.getTask()));
Expand Down
Loading

0 comments on commit fbea79b

Please sign in to comment.