Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/multi_tenancy] Add UpdateDataObject interface, Client, and Connector Implementations #2520

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions common/src/main/java/org/opensearch/sdk/SdkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ default PutDataObjectResponse putDataObject(PutDataObjectRequest request) {
try {
return putDataObjectAsync(request).toCompletableFuture().join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new OpenSearchException(cause);
throw unwrapAndConvertToRuntime(e);
}
}

Expand Down Expand Up @@ -76,11 +72,37 @@ default GetDataObjectResponse getDataObject(GetDataObjectRequest request) {
try {
return getDataObjectAsync(request).toCompletableFuture().join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new OpenSearchException(cause);
throw unwrapAndConvertToRuntime(e);
}
}

/**
* Update a data object/document in a table/index.
* @param request A request identifying the data object to update
* @param executor the executor to use for asynchronous execution
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request, Executor executor);

/**
* Update a data object/document in a table/index.
* @param request A request identifying the data object to update
* @return A completion stage encapsulating the response or exception
*/
default CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request) {
return updateDataObjectAsync(request, ForkJoinPool.commonPool());
}

/**
* Update a data object/document in a table/index.
* @param request A request identifying the data object to update
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
*/
default UpdateDataObjectResponse updateDataObject(UpdateDataObjectRequest request) {
try {
return updateDataObjectAsync(request).toCompletableFuture().join();
} catch (CompletionException e) {
throw unwrapAndConvertToRuntime(e);
}
}

Expand Down Expand Up @@ -110,11 +132,18 @@ default DeleteDataObjectResponse deleteDataObject(DeleteDataObjectRequest reques
try {
return deleteDataObjectAsync(request).toCompletableFuture().join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new OpenSearchException(cause);
throw unwrapAndConvertToRuntime(e);
}
}

private static RuntimeException unwrapAndConvertToRuntime(CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (cause instanceof RuntimeException) {
return (RuntimeException) cause;
}
return new OpenSearchException(cause);
}
}
108 changes: 108 additions & 0 deletions common/src/main/java/org/opensearch/sdk/UpdateDataObjectRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import org.opensearch.core.xcontent.ToXContentObject;

public class UpdateDataObjectRequest {

private final String index;
private final String id;
private final ToXContentObject dataObject;

/**
* Instantiate this request with an index and data object.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the data object may be referred to as an item.
* @param index the index location to update the object
* @param id the document id
* @param dataObject the data object
*/
public UpdateDataObjectRequest(String index, String id, ToXContentObject dataObject) {
this.index = index;
this.id = id;
this.dataObject = dataObject;
}

/**
* Returns the index
* @return the index
*/
public String index() {
return this.index;
}

/**
* Returns the document id
* @return the id
*/
public String id() {
return this.id;
}

/**
* Returns the data object
* @return the data object
*/
public ToXContentObject dataObject() {
return this.dataObject;
}

/**
* Class for constructing a Builder for this Request Object
*/
public static class Builder {
private String index = null;
private String id = null;
private ToXContentObject dataObject = null;

/**
* Empty Constructor for the Builder object
*/
public Builder() {}

/**
* Add an index to this builder
* @param index the index to put the object
* @return the updated builder
*/
public Builder index(String index) {
this.index = index;
return this;
}

/**
* Add an id to this builder
* @param id the document id
* @return the updated builder
*/
public Builder id(String id) {
this.id = id;
return this;
}

/**
* Add a data object to this builder
* @param dataObject the data object
* @return the updated builder
*/
public Builder dataObject(ToXContentObject dataObject) {
this.dataObject = dataObject;
return this;
}

/**
* Builds the request
* @return A {@link UpdateDataObjectRequest}
*/
public UpdateDataObjectRequest build() {
return new UpdateDataObjectRequest(this.index, this.id, this.dataObject);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.shard.ShardId;

public class UpdateDataObjectResponse {
private final String id;
private final ShardId shardId;
private final ShardInfo shardInfo;
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this information required? Can this be null for non-OpenSearch data stores?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can not be null, you have asked this same question before and it was answered here: #2459 (comment)

private final boolean updated;

/**
* Instantiate this request with an id and update status.
* <p>
* For data storage implementations other than OpenSearch, the id may be referred to as a primary key.
* @param id the document id
* @param shardId the shard id
* @param shardInfo the shard info
* @param updated Whether the object was updated.
*/
public UpdateDataObjectResponse(String id, ShardId shardId, ShardInfo shardInfo, boolean updated) {
this.id = id;
this.shardId = shardId;
this.shardInfo = shardInfo;
this.updated = updated;
}

/**
* Returns the document id
* @return the id
*/
public String id() {
return id;
}

/**
* Returns the shard id.
* @return the shard id, or a generated id if shards are not applicable
*/
public ShardId shardId() {
return shardId;
}

/**
* Returns the shard info.
* @return the shard info, or generated info if shards are not applicable
*/
public ShardInfo shardInfo() {
return shardInfo;
}

/**
* Returns whether update was successful
* @return true if update was successful
*/
public boolean updated() {
return updated;
}

/**
* Class for constructing a Builder for this Response Object
*/
public static class Builder {
private String id = null;
private ShardId shardId = null;
private ShardInfo shardInfo = null;
private boolean updated = false;

/**
* Empty Constructor for the Builder object
*/
public Builder() {}

/**
* Add an id to this builder
* @param id the id to add
* @return the updated builder
*/
public Builder id(String id) {
this.id = id;
return this;
}

/**
* Adds a shard id to this builder
* @param shardId the shard id to add
* @return the updated builder
*/
public Builder shardId(ShardId shardId) {
this.shardId = shardId;
return this;
}

/**
* Adds a generated shard id to this builder
* @param indexName the index name to generate a shard id
* @return the updated builder
*/
public Builder shardId(String indexName) {
this.shardId = new ShardId(indexName, Strings.UNKNOWN_UUID_VALUE, 0);
return this;
}

/**
* Adds shard information (statistics) to this builder
* @param shardInfo the shard info to add
* @return the updated builder
*/
public Builder shardInfo(ShardInfo shardInfo) {
this.shardInfo = shardInfo;
return this;
}
/**
* Add a updated status to this builder
* @param updated the updated status to add
* @return the updated builder
*/
public Builder updated(boolean updated) {
this.updated = updated;
return this;
}

/**
* Builds the object
* @return A {@link UpdateDataObjectResponse}
*/
public UpdateDataObjectResponse build() {
return new UpdateDataObjectResponse(this.id, this.shardId, this.shardInfo, this.updated);
}
}
}
Loading
Loading