Skip to content

Commit

Permalink
Update search call in Update and Delete Connector actions using new API
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jun 16, 2024
1 parent 4aecac9 commit 804ca81
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 151 deletions.
47 changes: 28 additions & 19 deletions common/src/main/java/org/opensearch/sdk/SdkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
*/
package org.opensearch.sdk;

import org.opensearch.OpenSearchException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

import org.opensearch.OpenSearchException;

public interface SdkClient {

/**
Expand All @@ -36,7 +37,7 @@ default CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectR
/**
* Create/Put/Index a data object/document into a table/index.
* @param request A request encapsulating the data object to store
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception.
*/
default PutDataObjectResponse putDataObject(PutDataObjectRequest request) {
try {
Expand All @@ -48,16 +49,18 @@ default PutDataObjectResponse putDataObject(PutDataObjectRequest request) {

/**
* Read/Get a data object/document from a table/index.
* @param request A request identifying the data object to retrieve
*
* @param request A request identifying the data object to retrieve
* @param executor the executor to use for asynchronous execution
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request, Executor executor);

/**
* Read/Get a data object/document from a table/index.
*
* @param request A request identifying the data object to retrieve
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A completion stage encapsulating the response or exception
*/
default CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request) {
return getDataObjectAsync(request, ForkJoinPool.commonPool());
Expand All @@ -66,7 +69,7 @@ default CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectR
/**
* Read/Get a data object/document from a table/index.
* @param request A request identifying the data object to retrieve
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception.
*/
default GetDataObjectResponse getDataObject(GetDataObjectRequest request) {
try {
Expand All @@ -78,14 +81,16 @@ default GetDataObjectResponse getDataObject(GetDataObjectRequest request) {

/**
* Update a data object/document in a table/index.
* @param request A request identifying the data object to update
*
* @param request A request identifying the data object to update
* @param executor the executor to use for asynchronous execution
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request, Executor executor);

/**
* Update a data object/document in a table/index.
*
* @param request A request identifying the data object to update
* @return A completion stage encapsulating the response or exception
*/
Expand All @@ -96,7 +101,7 @@ default CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDa
/**
* Update a data object/document in a table/index.
* @param request A request identifying the data object to update
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception.
*/
default UpdateDataObjectResponse updateDataObject(UpdateDataObjectRequest request) {
try {
Expand All @@ -108,14 +113,16 @@ default UpdateDataObjectResponse updateDataObject(UpdateDataObjectRequest reques

/**
* Delete a data object/document from a table/index.
* @param request A request identifying the data object to delete
*
* @param request A request identifying the data object to delete
* @param executor the executor to use for asynchronous execution
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request, Executor executor);

/**
* Delete a data object/document from a table/index.
*
* @param request A request identifying the data object to delete
* @return A completion stage encapsulating the response or exception
*/
Expand All @@ -126,7 +133,7 @@ default CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDa
/**
* Delete a data object/document from a table/index.
* @param request A request identifying the data object to delete
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception.
*/
default DeleteDataObjectResponse deleteDataObject(DeleteDataObjectRequest request) {
try {
Expand All @@ -137,26 +144,28 @@ default DeleteDataObjectResponse deleteDataObject(DeleteDataObjectRequest reques
}

/**
* Search for a data object/document in a table/index.
* @param request A request identifying the data object to retrieve
* Search for data objects/documents in a table/index.
*
* @param request A request identifying the data objects to search for
* @param executor the executor to use for asynchronous execution
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDataObjectRequest request, Executor executor);

/**
* Search for a data object/document in a table/index.
* @param request A request identifying the data object to retrieve
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* Search for data objects/documents in a table/index.
*
* @param request A request identifying the data objects to search for
* @return A completion stage encapsulating the response or exception
*/
default CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDataObjectRequest request) {
return searchDataObjectAsync(request, ForkJoinPool.commonPool());
}

/**
* Search for a data object/document in a table/index.
* @param request A request identifying the data object to retrieve
* @return A response on success. Throws {@link OpenSearchException} wrapping the cause on exception.
* Search for data objects/documents in a table/index.
* @param request A request identifying the data objects to search for
* @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception.
*/
default SearchDataObjectResponse searchDataObject(SearchDataObjectRequest request) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ public class SearchDataObjectRequest {
private final SearchSourceBuilder searchSourceBuilder;

/**
* Instantiate this request with an index and id
* Instantiate this request with an optional list of indices and search source
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param indices the indices to search for the object
* @param searchSourceBuilder the context to use when fetching _source
* For data storage implementations other than OpenSearch, an index may be referred to as a table
*
* @param indices the indices to search for the object
* @param searchSourceBuilder the search body containing the query
*/
public SearchDataObjectRequest(String[] indices, SearchSourceBuilder searchSourceBuilder) {
this.indices = indices;
Expand Down Expand Up @@ -72,7 +73,7 @@ public Builder indices(String... indices) {
*/
public Builder searchSourceBuilder(SearchSourceBuilder searchSourceBuilder) {
this.searchSourceBuilder = searchSourceBuilder;

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.core.xcontent.XContentParser;


public class SearchDataObjectResponse {
private final XContentParser parser;

Expand All @@ -21,7 +20,7 @@ public class SearchDataObjectResponse {
public SearchDataObjectResponse(XContentParser parser) {
this.parser = parser;
}

/**
* Returns the parser
* @return the parser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
Expand All @@ -40,6 +40,7 @@
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
import org.opensearch.sdk.SdkClient;
import org.opensearch.sdk.SearchDataObjectRequest;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -117,22 +118,41 @@ private void handleConnectorAccessValidationFailure(String connectorId, Exceptio

private void checkForModelsUsingConnector(String connectorId, String tenantId, ActionListener<DeleteResponse> actionListener) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
SearchRequest searchRequest = new SearchRequest(ML_MODEL_INDEX);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery(MLModel.CONNECTOR_ID_FIELD, connectorId));
if (mlFeatureEnabledSetting.isMultiTenancyEnabled()) {
sourceBuilder.query(QueryBuilders.matchQuery(TENANT_ID, tenantId));
}
searchRequest.source(sourceBuilder);
// TODO: Use SDK client not client.
client.search(searchRequest, ActionListener.runBefore(ActionListener.wrap(searchResponse -> {
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length == 0) {
deleteConnector(connectorId, actionListener);
} else {
handleModelsUsingConnector(searchHits, connectorId, actionListener);
}
}, e -> handleSearchFailure(connectorId, e, actionListener)), context::restore));

SearchDataObjectRequest searchDataObjectRequest = new SearchDataObjectRequest.Builder()
.indices(ML_MODEL_INDEX)
.searchSourceBuilder(sourceBuilder)
.build();
sdkClient
.searchDataObjectAsync(searchDataObjectRequest, client.threadPool().executor(GENERAL_THREAD_POOL))
.whenComplete((sr, st) -> {
context.restore();
if (sr != null) {
try {
SearchResponse searchResponse = SearchResponse.fromXContent(sr.parser());
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length == 0) {
deleteConnector(connectorId, actionListener);
} else {
handleModelsUsingConnector(searchHits, connectorId, actionListener);
}
} catch (Exception e) {
log.error("Failed to parse search response", e);
actionListener
.onFailure(
new OpenSearchStatusException("Failed to parse search response", RestStatus.INTERNAL_SERVER_ERROR)
);
}
} else {
Throwable cause = st.getCause() == null ? st : st.getCause();
handleSearchFailure(connectorId, cause, actionListener);
}
});
} catch (Exception e) {
log.error("Failed to check for models using connector: " + connectorId, e);
actionListener.onFailure(e);
Expand All @@ -156,13 +176,17 @@ private void handleModelsUsingConnector(SearchHit[] searchHits, String connector
);
}

private void handleSearchFailure(String connectorId, Exception e, ActionListener<DeleteResponse> actionListener) {
if (e instanceof IndexNotFoundException) {
private void handleSearchFailure(String connectorId, Throwable cause, ActionListener<DeleteResponse> actionListener) {
if (cause instanceof IndexNotFoundException) {
deleteConnector(connectorId, actionListener);
return;
}
log.error("Failed to search for models using connector: {}", connectorId, e);
actionListener.onFailure(e);
log.error("Failed to search for models using connector: {}", connectorId, cause);
if (cause instanceof Exception) {
actionListener.onFailure((Exception) cause);
} else {
actionListener.onFailure(new OpenSearchException(cause));
}
}

private void deleteConnector(String connectorId, ActionListener<DeleteResponse> actionListener) {
Expand Down
Loading

0 comments on commit 804ca81

Please sign in to comment.