Skip to content

Commit

Permalink
Remove redundant timeout setting
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 7, 2024
1 parent 9f361d8 commit 628d882
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
Expand All @@ -84,8 +83,6 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkSettings flowFrameworkSettings;

private ClusterService clusterService;

/**
* Instantiate this plugin.
*/
Expand All @@ -106,7 +103,6 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
Expand Down Expand Up @@ -141,7 +137,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkflowAction(flowFrameworkSettings, settings, clusterService),
new RestCreateWorkflowAction(flowFrameworkSettings),
new RestDeleteWorkflowAction(flowFrameworkSettings),
new RestProvisionWorkflowAction(flowFrameworkSettings),
new RestDeprovisionWorkflowAction(flowFrameworkSettings),
Expand All @@ -168,7 +164,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, TASK_REQUEST_RETRY_DURATION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -23,6 +21,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -40,7 +39,7 @@
/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends AbstractWorkflowAction {
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";
Expand All @@ -50,11 +49,8 @@ public class RestCreateWorkflowAction extends AbstractWorkflowAction {
/**
* Instantiates a new RestCreateWorkflowAction
* @param flowFrameworkSettings The settings for the flow framework plugin
* @param settings Environment settings
* @param clusterService clusterService
*/
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings, Settings settings, ClusterService clusterService) {
super(settings, clusterService);
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings) {
this.flowFrameworkSettings = flowFrameworkSettings;
}

Expand Down Expand Up @@ -92,7 +88,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision, requestTimeout);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision);

Check warning on line 91 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L91

Added line #L91 was not covered by tests

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -116,7 +115,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

if (request.getWorkflowId() == null) {
// Throttle incoming requests
checkMaxWorkflows(request.getRequestTimeout(), flowFrameworkSettings.getMaxWorkflows(), ActionListener.wrap(max -> {
checkMaxWorkflows(flowFrameworkSettings.getMaxWorkflows(), ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + flowFrameworkSettings.getMaxWorkflows();
logger.error(errorMessage);
Expand Down Expand Up @@ -243,16 +242,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

/**
* Checks if the max workflows limit has been reachesd
* @param requestTimeOut request time out
* @param maxWorkflow max workflows
* @param internalListener listener for search request
*/
protected void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, ActionListener<Boolean> internalListener) {
protected void checkMaxWorkflows(Integer maxWorkflow, ActionListener<Boolean> internalListener) {
if (!flowFrameworkIndicesHandler.doesIndexExist(CommonValue.GLOBAL_CONTEXT_INDEX)) {
internalListener.onResponse(true);
} else {
QueryBuilder query = QueryBuilders.matchAllQuery();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0);

Check warning on line 253 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L253

Added line #L253 was not covered by tests

SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.flowframework.model.Template;
Expand Down Expand Up @@ -43,29 +42,13 @@ public class WorkflowRequest extends ActionRequest {
*/
private boolean provision;

/**
* Timeout for request
*/
private TimeValue requestTimeout;

/**
* Instantiates a new WorkflowRequest, set validation to false and set requestTimeout and maxWorkflows to null
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
this(workflowId, template, new String[] { "all" }, false, null);
}

/**
* Instantiates a new WorkflowRequest and set validation to false
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
* @param requestTimeout timeout of the request
* @param maxWorkflows max number of workflows
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, @Nullable TimeValue requestTimeout) {
this(workflowId, template, new String[] { "all" }, false, requestTimeout);
this(workflowId, template, new String[] { "all" }, false);
}

/**
Expand All @@ -74,20 +57,12 @@ public WorkflowRequest(@Nullable String workflowId, @Nullable Template template,
* @param template the use case template which describes the workflow
* @param validation flag to indicate if validation is necessary
* @param provision flag to indicate if provision is necessary
* @param requestTimeout timeout of the request
*/
public WorkflowRequest(
@Nullable String workflowId,
@Nullable Template template,
String[] validation,
boolean provision,
@Nullable TimeValue requestTimeout
) {
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, String[] validation, boolean provision) {
this.workflowId = workflowId;
this.template = template;
this.validation = validation;
this.provision = provision;
this.requestTimeout = requestTimeout;
}

/**
Expand All @@ -102,7 +77,6 @@ public WorkflowRequest(StreamInput in) throws IOException {
this.template = templateJson == null ? null : Template.parse(templateJson);
this.validation = in.readStringArray();
this.provision = in.readBoolean();
this.requestTimeout = in.readOptionalTimeValue();
}

/**
Expand Down Expand Up @@ -139,27 +113,17 @@ public boolean isProvision() {
return this.provision;
}

/**
* Gets the timeout of the request
* @return the requestTimeout
*/
public TimeValue getRequestTimeout() {
return requestTimeout;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(workflowId);
out.writeOptionalString(template == null ? null : template.toJson());
out.writeStringArray(validation);
out.writeBoolean(provision);
out.writeOptionalTimeValue(requestTimeout);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -63,7 +62,7 @@ public void setUp() throws Exception {

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
Expand All @@ -85,7 +84,7 @@ public void testPlugin() throws IOException {
assertEquals(8, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(8, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(5, ffp.getSettings().size());
assertEquals(4, ffp.getSettings().size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -89,7 +88,7 @@ public void testWorkflowStepFactoryHasValidators() throws IOException {

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingsSet);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@

import org.opensearch.Version;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
Expand All @@ -35,10 +31,7 @@
import java.util.Map;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class RestCreateWorkflowActionTests extends OpenSearchTestCase {
Expand All @@ -49,20 +42,11 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase {
private String updateWorkflowPath;
private NodeClient nodeClient;
private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
private Settings settings;
private ClusterService clusterService;

@Override
public void setUp() throws Exception {
super.setUp();
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class);
settings = Settings.builder()
.put(WORKFLOW_REQUEST_TIMEOUT.getKey(), TimeValue.timeValueMillis(10))
.put(MAX_WORKFLOWS.getKey(), 2)
.build();

ClusterSettings clusterSettings = TestHelpers.clusterSetting(settings, WORKFLOW_REQUEST_TIMEOUT, MAX_WORKFLOWS);
clusterService = spy(new ClusterService(settings, clusterSettings, null));

when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);

Expand All @@ -88,8 +72,7 @@ public void setUp() throws Exception {

// Invalid template configuration, wrong field name
this.invalidTemplate = template.toJson().replace("use_case", "invalid");
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService);
this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.createWorkflowPath = String.format(Locale.ROOT, "%s", WORKFLOW_URI);
this.updateWorkflowPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id");
this.nodeClient = mock(NodeClient.class);
Expand Down
Loading

0 comments on commit 628d882

Please sign in to comment.