Skip to content

Commit

Permalink
Create a Config XContent model for Config index (opensearch-project#679)
Browse files Browse the repository at this point in the history
* Create a Config XContent model for Config index

Signed-off-by: Daniel Widdis <[email protected]>

* Move XContent builder into lambda

Signed-off-by: Daniel Widdis <[email protected]>

* Improve test coverage

Signed-off-by: Daniel Widdis <[email protected]>

* Even more test coverage

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored May 2, 2024
1 parent 28ea3dd commit 40d9efc
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Maintenance
### Refactoring
- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642))
- Create a Config XContent model for Config index ([#679](https://github.com/opensearch-project/flow-framework/pull/679))
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
client,
clusterService,
Expand Down
100 changes: 100 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/Config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
import java.time.Instant;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CREATE_TIME;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY;

/**
* Flow Framework Configuration
*/
public class Config implements ToXContentObject {

private final String masterKey;
private final Instant createTime;

/**
* Instantiate this object
*
* @param masterKey The encryption master key
* @param createTime The config creation time
*/
public Config(String masterKey, Instant createTime) {
this.masterKey = masterKey;
this.createTime = createTime;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(MASTER_KEY, this.masterKey);
xContentBuilder.field(CREATE_TIME, this.createTime.toEpochMilli());
return xContentBuilder.endObject();
}

/**
* Parse raw xContent into a Config instance.
*
* @param parser xContent based content parser
* @return an instance of the config
* @throws IOException if content can't be parsed correctly
*/
public static Config parse(XContentParser parser) throws IOException {
String masterKey = null;
Instant createTime = Instant.now();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case MASTER_KEY:
masterKey = parser.text();
break;
case CREATE_TIME:
createTime = ParseUtils.parseInstant(parser);
break;
default:
throw new FlowFrameworkException(
"Unable to parse field [" + fieldName + "] in a config object.",
RestStatus.BAD_REQUEST
);
}
}
if (masterKey == null) {
throw new FlowFrameworkException("The config object requires a master key.", RestStatus.BAD_REQUEST);
}
return new Config(masterKey, createTime);
}

/**
* @return the masterKey
*/
public String masterKey() {
return masterKey;
}

/**
* @return the createTime
*/
public Instant createTime() {
return createTime;
}
}
58 changes: 43 additions & 15 deletions src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Config;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowNode;
Expand All @@ -42,8 +48,8 @@
import com.amazonaws.encryptionsdk.CryptoResult;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
import static org.opensearch.flowframework.common.CommonValue.CREATE_TIME;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY;

Expand All @@ -60,19 +66,22 @@ public class EncryptorUtils {
// https://github.com/aws/aws-encryption-sdk-java/issues/1879
private static final String WRAPPING_ALGORITHM = "AES/GCM/NOPADDING";

private ClusterService clusterService;
private Client client;
private final ClusterService clusterService;
private final Client client;
private String masterKey;
private final NamedXContentRegistry xContentRegistry;

/**
* Instantiates a new EncryptorUtils object
* @param clusterService the cluster service
* @param client the node client
* @param xContentRegistry the OpenSearch XContent Registry
*/
public EncryptorUtils(ClusterService clusterService, Client client) {
public EncryptorUtils(ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry) {
this.masterKey = null;
this.clusterService = clusterService;
this.client = client;
this.xContentRegistry = xContentRegistry;
}

/**
Expand Down Expand Up @@ -247,22 +256,21 @@ public void initializeMasterKey(ActionListener<Boolean> listener) {
// This is necessary in case of global context index restoration from snapshot, will need to use the same master key to decrypt
// stored credentials
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

// Using the master_key string as the document id
GetRequest getRequest = new GetRequest(CONFIG_INDEX).id(MASTER_KEY);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (!getResponse.isExists()) {

// Generate new key and index
final String generatedKey = generateMasterKey();
Config config = new Config(generateMasterKey(), Instant.now());
IndexRequest masterKeyIndexRequest = new IndexRequest(CONFIG_INDEX).id(MASTER_KEY)
.source(Map.ofEntries(Map.entry(MASTER_KEY, generatedKey), Map.entry(CREATE_TIME, Instant.now().toEpochMilli())))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
masterKeyIndexRequest.source(config.toXContent(builder, ToXContent.EMPTY_PARAMS));
}
client.index(masterKeyIndexRequest, ActionListener.wrap(indexResponse -> {
context.restore();
// Set generated key to master
logger.info("Config has been initialized successfully");
this.masterKey = generatedKey;
this.masterKey = config.masterKey();
listener.onResponse(true);
}, indexException -> {
logger.error("Failed to index config", indexException);
Expand All @@ -272,9 +280,20 @@ public void initializeMasterKey(ActionListener<Boolean> listener) {
} else {
context.restore();
// Set existing key to master
logger.info("Config has already been initialized");
this.masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
listener.onResponse(true);
logger.debug("Config has already been initialized, fetching key");
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(
xContentRegistry,
getResponse.getSourceAsBytesRef()
)
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Config config = Config.parse(parser);
this.masterKey = config.masterKey();
listener.onResponse(true);
} catch (FlowFrameworkException e) {
listener.onFailure(e);
}
}
}, getRequestException -> {
logger.error("Failed to search for config from config index", getRequestException);
Expand Down Expand Up @@ -303,7 +322,16 @@ void initializeMasterKeyIfAbsent() {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (response.isExists()) {
this.masterKey = (String) response.getSourceAsMap().get(MASTER_KEY);
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(
xContentRegistry,
response.getSourceAsBytesRef()
)
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Config config = Config.parse(parser);
this.masterKey = config.masterKey();
}
} else {
throw new FlowFrameworkException("Master key has not been initialized in config index", RestStatus.NOT_FOUND);
}
Expand Down
81 changes: 81 additions & 0 deletions src/test/java/org/opensearch/flowframework/model/ConfigTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.mockito.Mockito.mock;

public class ConfigTests extends OpenSearchTestCase {
private NamedXContentRegistry xContentRegistry;

@Override
public void setUp() throws Exception {
super.setUp();
this.xContentRegistry = mock(NamedXContentRegistry.class);
}

public void testConfig() throws IOException {
String masterKey = "foo";
Instant createTime = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Config config = new Config(masterKey, createTime);

assertEquals(masterKey, config.masterKey());
assertEquals(createTime, config.createTime());

BytesReference bytesRef;
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = config.toXContent(builder, ToXContent.EMPTY_PARAMS);
bytesRef = BytesReference.bytes(source);
}
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, bytesRef)) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
config = Config.parse(parser);
}
assertEquals(masterKey, config.masterKey());
assertEquals(createTime, config.createTime());
}

public void testBadConfig() throws IOException {
BytesReference bytesRef;
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject().endObject();
bytesRef = BytesReference.bytes(builder);
}
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, bytesRef)) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
FlowFrameworkException e = assertThrows(FlowFrameworkException.class, () -> Config.parse(parser));
assertEquals("The config object requires a master key.", e.getMessage());
}

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject().field("foo", "bar").endObject();
bytesRef = BytesReference.bytes(builder);
}
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, bytesRef)) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
FlowFrameworkException e = assertThrows(FlowFrameworkException.class, () -> Config.parse(parser));
assertEquals("Unable to parse field [foo] in a config object.", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -50,8 +51,8 @@

public class GetWorkflowTransportActionTests extends OpenSearchTestCase {

private ThreadPool threadPool;
private Client client;
private NamedXContentRegistry xContentRegistry;
private GetWorkflowTransportAction getTemplateTransportAction;
private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private Template template;
Expand All @@ -60,10 +61,10 @@ public class GetWorkflowTransportActionTests extends OpenSearchTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
this.threadPool = mock(ThreadPool.class);
this.client = mock(Client.class);
this.xContentRegistry = mock(NamedXContentRegistry.class);
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
this.encryptorUtils = new EncryptorUtils(mock(ClusterService.class), client);
this.encryptorUtils = new EncryptorUtils(mock(ClusterService.class), client, xContentRegistry);
this.getTemplateTransportAction = new GetWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
Expand Down
Loading

0 comments on commit 40d9efc

Please sign in to comment.