Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/multi_tenancy] SdkClient interface with local and remote implementations for Connector #2459

Conversation

dbwiddis
Copy link
Member

@dbwiddis dbwiddis commented May 17, 2024

Description

  • Creates a new SdkClient Interface with Put, Get, and Delete operations (more to come later)
  • Adds implementations for both Local (NodeClient) and Remote (OpenSearch Java Client) clusters
  • Uses the destination-agnostic interface for Connector Put (Index), Get, and Delete actions

Issues Resolved

Part of implementation in support of RFC opensearch-project/OpenSearch#13336

Previous Comments

PR #2430 was auto-closed due to a branch deletion, some comments are there.

Guided Review

  1. Start with the SdkClient interface and the request/response classes in the org.opensearch.sdk package. These are eventually intended to be moved outside of this plugin to a common repository (probably in the OpenSearch repo as a new module) but remain here during initial development to ease development iteration.
  2. The changes to the Plugin initialization (inject the local client and a client module) are probably going to stay in somewhat a similar state, but the SdkClientModule is currently temporary (creating an http client without TLS/SSL) pending a more properly thought out long term pluggable/injectable solution.
  3. Glance at the org.opensearch.ml.action.connector changes to get a quick idea of how the new interface calls the clients. I have not yet done detailed exception handling here. That's a TODO below.
  4. Look at the LocalClusterIndicesClient which essentially wraps current NodeClient implementation.
  5. Look at the RemoteClusterIndicesClient which tries to make using a remote cluster via the Java Client transparent to the user.
  6. Look at everything else.

How to manually test locally

Checkout this PR and create the zip file for it

gh pr checkout 2459
./gradlew assemble

Publish the latest OpenSearch 2.x branch snapshot to local (change darwin-tar to your own OS if needed)

cd ~/git/OpenSearch 
git fetch upstream
git checkout 2.x
git pull upstream 2.x
./gradlew clean localDistro
cd distribution/archives/darwin-tar/build/install/opensearch-2.15.0-SNAPSHOT

Install the plugin (change the path as necessary for your own local environment)

./bin/opensearch-plugin install file:///Users/widdisd/git/ml-commons/plugin/build/distributions/opensearch-ml-2.15.0.0-SNAPSHOT.zip

Start OpenSearch and test the local implementation using your favorite REST client (curl, Postman, etc.)

bin/opensearch

Ctrl-C to close. Switch to using the remote client by making this change to your environment:

export REMOTE_METADATA_ENDPOINT=http://127.0.0.1:9200

Restart OpenSearch and test the remote implementation using your favorite REST client (curl, Postman, etc.). The Connector Put, Get, and Delete will use the remote client even though they're still operating on the same cluster; other APIs will still work locally.

bin/opensearch

TODO Items

Will continue to work on these in a new PR if this one needs to be merged to unblock other work

  • Fix a bug where shard stats return null on deletion
  • Update currently commented-out Connector Unit tests with proper mocks/tests of new functionality
  • Re-enable code coverage checks
  • Implement ability to specify a thread pool executor as part of the interface
  • Properly handle all exceptions
  • Extend functionality to Update and Search

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

dbwiddis added 15 commits May 17, 2024 15:11
Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: Daniel Widdis <[email protected]>
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Error
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Error
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Failure
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Error
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Error
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 01:49 — with GitHub Actions Failure
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Failure
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Error
@dbwiddis dbwiddis had a problem deploying to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Error
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Inactive
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Inactive
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 05:50 — with GitHub Actions Inactive
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 06:43 — with GitHub Actions Inactive
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 06:43 — with GitHub Actions Inactive
@dbwiddis dbwiddis temporarily deployed to ml-commons-cicd-env May 20, 2024 06:43 — with GitHub Actions Inactive
Comment on lines +17 to +18
private final ShardId shardId;
private final ShardInfo shardInfo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these fields required? Shard details are specific to opensearch which will make it difficult to extend the interface to non-opensearch data stores.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously answered that question when you asked it here.

The return value to the end user is created from a DeleteResponse object, which requires a non-null ShardId:

this.shardId = Objects.requireNonNull(shardId);

So I have to have a Shard ID. I can create one, or I can pass along the actual one that is returned from cluster-based OpenSearch. Which do you prefer? I see no good reason to delete information that exists. It can be ignored for other implementations.

Similarly, for ShardInfo, the return value from the Delete Connector API looks like this:

{
    "_index": ".plugins-ml-connector",
    "_id": "qU00aowBH9rhI2IwnrH3",
    "_version": 2,
    "result": "deleted",
    "_shards": {
        "total": 1,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 2,
    "_primary_term": 1
}

I can either pass along those actual statistics or make them up. Which is easier to explain to a customer, that a statistic might be irrelevant for their storage option, or that we're taking away information they used to have?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 127 performs mlIndicesHandler.initMLConnectorIndex(), which would create connector index if missing. mlIndicesHandler class needs to be updated to make use of SDKClient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not yet implemented any index creation/deletion code in the SDKClient.

This has to be developed incrementally. We're already at over 2000 lines of code being added.

@@ -673,7 +682,8 @@ public Collection<Object> createComponents(
clusterManagerEventListener,
mlCircuitBreakerService,
mlModelAutoRedeployer,
cmHandler
cmHandler,
localClusterIndicesClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hard coding to local indices client, need to make use of SDKClient interface to dynamically load client based SDK client initialized in SDKClientModule.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my very lenghty initial comment on this PR. I discussed this.

  1. ... These are eventually intended to be moved outside of this plugin to a common repository (probably in the OpenSearch repo as a new module) but remain here during initial development to ease development iteration.
  2. The changes to the Plugin initialization (inject the local client and a client module) are probably going to stay in somewhat a similar state, but the SdkClientModule is currently temporary (creating an http client without TLS/SSL) pending a more properly thought out long term pluggable/injectable solution.

* @return the updated builder
*/
public Builder shardId(String indexName) {
this.shardId = new ShardId(indexName, Strings.UNKNOWN_UUID_VALUE, 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand this part more. What's the significance of generating a random shard ID here? How this can help end user?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered above and also here, but I"ll repeat:

The DeleteResponse object requires a non-null ShardId.

this.shardId = Objects.requireNonNull(shardId);

When there isn't one provided (such as DynamoDb) this code creates a simulated one.

Copy link
Collaborator

@dhrubo-os dhrubo-os May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw your answer earlier too. I think for DDB / other non OS cases, if we just share a random shardID, that could be confusing for customers. I would prefer sharing any static String, maybe like Not Applicable or N/A, which can convey the actual meaning rather than causing unnecessary confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer sharing any static String, maybe like Not Applicable or N/A, which can convey the actual meaning rather than causing unnecessary confusion.

The Strings.UNKNOWN_UUID_VALUE is _na_.


public class GetDataObjectResponse {
private final String id;
private final Optional<XContentParser> parser;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why optional? Without a parser, can we parse the response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still get a GetResponse even if you didn't find an object. The (Cluster) GetResponse has a field isExists() that checks whether there is actually a result present. The (Java Client) GetResponse has a similar field found() which serves the same purpose.

I can either have a null parser and another boolean field that you have to check, or I can have an Optional. I dislike passing around nulls. So you check if the optional isPresent() and then you get it to parse.

The Optional not being present means the document/item/data was not found.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Optional not being present means the document/item/data was not found.

Should we add a comment regarding this in the code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometime before parsing, we need to get information from the source like this: https://github.com/opensearch-project/ml-commons/blob/main/plugin/src/main/java/org/opensearch/ml/action/models/GetModelTransportAction.java#L94-L96

How can we address this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #2489. Please comment there if you think the javadoc needs more amplification on the fact that a non-present optional and empty source map mean that the Get call returned that the document didn't exist in the index.

if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new OpenSearchException(cause);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this change the current user experience? For example currently we throw OpenSearchStatusException with status code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't yet aligned exception handling. That's on my to do list.

Please see the PR description where I mention this, and a few TODOs in the test classes.

I plan on not changing the user experience. I put in placeholders during my initial testing.

.whenComplete((r, throwable) -> {
context.restore();
if (throwable != null) {
actionListener.onFailure(new RuntimeException(throwable));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why RuntimeException by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above that I haven't fully hashed out the exception handling. I used placeholders until I got things working. I left a rather detailed initial comment on this PR.

log.debug("Completed Get Connector Request, id:{}", connectorId);
if (throwable != null) {
Throwable cause = throwable.getCause() == null ? throwable : throwable.getCause();
if (cause instanceof IndexNotFoundException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for ResourceNotFoundException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Repeat comment on planning on aligning all exception handling with current user experience.)

} else {
actionListener
.onFailure(
new OpenSearchStatusException(
"You don't have permission to access this connector",
RestStatus.FORBIDDEN
"Failed to find connector with the provided connector id: " + connectorId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering if this can break customer's current user experience or not. @ylwu-amzn what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Repeat comment on planning on aligning all exception handling with current user experience.)

.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString());
log.info("Retrieved data object");
return new GetDataObjectResponse.Builder().id(getResponse.getId()).parser(Optional.of(parser)).build();
} catch (OpenSearchStatusException | IndexNotFoundException notFound) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for RestStatus.UNAUTHORIZED, RestStatus.FORBIDDEN, RestStatus.CONFLICT, RestStatus.INTERNAL_SERVER_ERROR, RestStatus.BAD_REQUEST, we will throw notFound? This doesn't seem right to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Repeat comment on planning on aligning all exception handling with current user experience.)

);
return new DeleteDataObjectResponse.Builder()
.id(deleteResponse.id())
.shardId(deleteResponse.index())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] if we are just trying to put some random value for other remote stores, let's a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a comment in the DeleteDataObjectResponse javadocs. Where else should this be commented?

}
})
.build();
ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this introduce any insecure de-serialization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's what we use in the Java Client. What are your particular security concerns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should consider configuring the ObjectMapper instance with the appropriate security settings, such as disabling the ALLOW_UNSAFE_POLYMORPHIC setting and limiting the classes that can be deserialized?

@dhrubo-os
Copy link
Collaborator

Merging this PR in the branch so that I can continue my development on top of this PR. @dbwiddis please address the changes (if needed) in separate PR. Thanks.

@dhrubo-os dhrubo-os merged commit 59450ae into opensearch-project:feature/multi_tenancy May 20, 2024
9 of 12 checks passed
}).when(client).get(any(), any());
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
future.onResponse(getResponse);
when(client.get(any(GetRequest.class))).thenReturn(future);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why client.get? Shouldn't we use sdkClient? Are you sure these tests are reflecting the actual behavior we have in the class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why client.get? Shouldn't we use sdkClient?

  1. sdkClient is an actual instance instantiated with this mocked client.
  2. getConnectorTransportAction was instantiated with the sdkClient instance.
  3. Therefore, when getConnectorTransportAction.doExecute() is called on line 145, it internally executes sdkClient.getDataObjectAsync().
  4. The SDKClient implementations use the Client implementation returning an ActionFuture. That method internally implements the needed ActionListener as an ActionFuture. That client call inside the sdkClient instance field of the getConnectorTransportAction is what this mock imitates:
GetResponse getResponse = client.get(new GetRequest(request.index(), request.id())).actionGet();

Are you sure these tests are reflecting the actual behavior we have in the class?

I'm not perfect, but in this case I'm pretty sure I did it right.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

return null;
}).when(client).get(any(), any());
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
future.onResponse(getResponse);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be CompletionStage<GetDataObjectResponse>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar answer as above; this is the response of the mocked client wrapped by an actual sdkClient instance used to initialize a getConnectorTransportAction and is eventually invoked when doExecute() is called.

* @return the updated builder
*/
public Builder fetchSourceContext(FetchSourceContext fetchSourceContext) {
this.fetchSourceContext = fetchSourceContext;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the fetchSourceContext is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the default value for the GetRequest if it's not provided. Although I did notice that I didn't pass this on to the GetRequest so I'll fix that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants