Skip to content

Commit

Permalink
Move max workflows setting update consumer to settings class
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 7, 2024
1 parent de6c41a commit 9f361d8
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Collection<Object> createComponents(
flowFrameworkSettings
);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler);
return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class FlowFrameworkSettings {
private volatile TimeValue retryDuration;
/** Max workflow steps that can be created*/
private volatile Integer maxWorkflowSteps;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
Expand Down Expand Up @@ -83,9 +85,11 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) {
this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings);
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
}

/**
Expand All @@ -111,4 +115,12 @@ public TimeValue getRetryDuration() {
public Integer getMaxWorkflowSteps() {
return maxWorkflowSteps;
}

/**
* Getter for max workflows
* @return max workflows
*/
public Integer getMaxWorkflows() {
return maxWorkflows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BaseRestHandler;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
Expand All @@ -22,8 +21,6 @@
public abstract class AbstractWorkflowAction extends BaseRestHandler {
/** Timeout for the request*/
protected volatile TimeValue requestTimeout;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;

/**
* Instantiates a new AbstractWorkflowAction
Expand All @@ -33,10 +30,8 @@ public abstract class AbstractWorkflowAction extends BaseRestHandler {
*/
public AbstractWorkflowAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,17 @@ public class RestCreateWorkflowAction extends AbstractWorkflowAction {
private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
private FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new RestCreateWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param flowFrameworkSettings The settings for the flow framework plugin
* @param settings Environment settings
* @param clusterService clusterService
*/
public RestCreateWorkflowAction(
FlowFrameworkSettings flowFrameworkFeatureEnabledSetting,
Settings settings,
ClusterService clusterService
) {
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings, Settings settings, ClusterService clusterService) {
super(settings, clusterService);
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
Expand All @@ -80,7 +76,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
Expand All @@ -96,14 +92,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
requestTimeout,
maxWorkflows
);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision, requestTimeout);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;
private final Settings settings;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;

/**
Expand All @@ -64,7 +64,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
* @param actionFilters action filters
* @param workflowProcessSorter the workflow process sorter
* @param flowFrameworkIndicesHandler The handler for the global context index
* @param settings Environment settings
* @param flowFrameworkSettings Plugin settings
* @param client The client used to make the request to OS
* @param pluginsService The plugin service
*/
Expand All @@ -74,14 +74,14 @@ public CreateWorkflowTransportAction(
ActionFilters actionFilters,
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Settings settings,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
PluginsService pluginsService
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.settings = settings;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.pluginsService = pluginsService;
}
Expand Down Expand Up @@ -116,9 +116,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

if (request.getWorkflowId() == null) {
// Throttle incoming requests
checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> {
checkMaxWorkflows(request.getRequestTimeout(), flowFrameworkSettings.getMaxWorkflows(), ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows();
String errorMessage = "Maximum workflows limit reached " + flowFrameworkSettings.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,13 @@ public class WorkflowRequest extends ActionRequest {
*/
private TimeValue requestTimeout;

/**
* Max workflows
*/
private Integer maxWorkflows;

/**
* Instantiates a new WorkflowRequest, set validation to false and set requestTimeout and maxWorkflows to null
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
this(workflowId, template, new String[] { "all" }, false, null, null);
this(workflowId, template, new String[] { "all" }, false, null);
}

/**
Expand All @@ -69,13 +64,8 @@ public WorkflowRequest(@Nullable String workflowId, @Nullable Template template)
* @param requestTimeout timeout of the request
* @param maxWorkflows max number of workflows
*/
public WorkflowRequest(
@Nullable String workflowId,
@Nullable Template template,
@Nullable TimeValue requestTimeout,
@Nullable Integer maxWorkflows
) {
this(workflowId, template, new String[] { "all" }, false, requestTimeout, maxWorkflows);
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, @Nullable TimeValue requestTimeout) {
this(workflowId, template, new String[] { "all" }, false, requestTimeout);
}

/**
Expand All @@ -85,22 +75,19 @@ public WorkflowRequest(
* @param validation flag to indicate if validation is necessary
* @param provision flag to indicate if provision is necessary
* @param requestTimeout timeout of the request
* @param maxWorkflows max number of workflows
*/
public WorkflowRequest(
@Nullable String workflowId,
@Nullable Template template,
String[] validation,
boolean provision,
@Nullable TimeValue requestTimeout,
@Nullable Integer maxWorkflows
@Nullable TimeValue requestTimeout
) {
this.workflowId = workflowId;
this.template = template;
this.validation = validation;
this.provision = provision;
this.requestTimeout = requestTimeout;
this.maxWorkflows = maxWorkflows;
}

/**
Expand All @@ -116,7 +103,6 @@ public WorkflowRequest(StreamInput in) throws IOException {
this.validation = in.readStringArray();
this.provision = in.readBoolean();
this.requestTimeout = in.readOptionalTimeValue();
this.maxWorkflows = in.readOptionalInt();
}

/**
Expand Down Expand Up @@ -161,14 +147,6 @@ public TimeValue getRequestTimeout() {
return requestTimeout;
}

/**
* Gets the max workflows
* @return the maxWorkflows
*/
public Integer getMaxWorkflows() {
return maxWorkflows;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -177,7 +155,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(validation);
out.writeBoolean(provision);
out.writeOptionalTimeValue(requestTimeout);
out.writeOptionalInt(maxWorkflows);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void tearDown() throws Exception {
public void testPlugin() throws IOException {
try (FlowFrameworkPlugin ffp = new FlowFrameworkPlugin()) {
assertEquals(
4,
5,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(8, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void setUp() throws Exception {
Stream.of(
FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED,
FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION,
FlowFrameworkSettings.MAX_WORKFLOW_STEPS
FlowFrameworkSettings.MAX_WORKFLOW_STEPS,
FlowFrameworkSettings.MAX_WORKFLOWS
)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
Expand All @@ -59,5 +60,6 @@ public void testSettings() throws IOException {
assertFalse(flowFrameworkSettings.isFlowFrameworkEnabled());
assertEquals(Optional.of(TimeValue.timeValueSeconds(5)), Optional.ofNullable(flowFrameworkSettings.getRetryDuration()));
assertEquals(Optional.of(50), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflowSteps()));
assertEquals(Optional.of(1000), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflows()));
}
}
Loading

0 comments on commit 9f361d8

Please sign in to comment.