Skip to content

Commit

Permalink
Add rest high level client wrapper
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 19, 2024
1 parent 6e163e7 commit 300501e
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import java.util.List;

import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

Expand Down Expand Up @@ -96,8 +96,8 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
*/
FlintWriter createWriter(String indexName);
/**
* Create {@link RestHighLevelClient}.
* @return {@link RestHighLevelClient}
* Create {@link RestHighLevelClientWrapper}.
* @return {@link RestHighLevelClientWrapper}
*/
public RestHighLevelClient createClient();
public RestHighLevelClientWrapper createClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.opensearch.flint.core.metrics;

public interface IRestHighLevelClient {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.flint.core.metrics;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;

import java.io.Closeable;
import java.io.IOException;
import java.util.logging.Logger;

public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable {
private static final Logger LOG = Logger.getLogger(RestHighLevelClientWrapper.class.getName());
private final RestHighLevelClient client;

public RestHighLevelClientWrapper(RestHighLevelClient client) {
this.client = client;
}

public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
return execute("get", () -> client.get(getRequest, options));
}

public Boolean exists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute("exists", () -> client.indices().exists(getIndexRequest, options));
}

public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
return execute("createIndex", () -> client.indices().create(createIndexRequest, options));
}

public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute("getIndex", () -> client.indices().get(getIndexRequest, options));
}

public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute("deleteIndex", () -> client.indices().delete(deleteIndexRequest, options));
}

public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return execute("index", () -> client.index(indexRequest, options));
}

public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute("bulk", () -> client.bulk(bulkRequest, options));
}

public DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
return execute("update", () -> client.update(updateRequest, options));
}

// Example wrapper for the delete method
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return execute("delete", () -> client.delete(deleteRequest, options));
}

public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return execute("search", () -> client.search(searchRequest, options));
}

public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
return execute("scroll", () -> client.scroll(searchScrollRequest, options));
}

public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
return execute("clearScroll", () -> client.clearScroll(clearScrollRequest, options));
}


// Generic method to execute and log synchronous operations
private <T> T execute(String operationName, IOCallable<T> operation) throws IOException {
long startTime = System.currentTimeMillis();
try {
T result = operation.call();
logMetric(operationName + " - success", System.currentTimeMillis() - startTime);
return result;
} catch (Exception e) {
logMetric(operationName + " - failure", System.currentTimeMillis() - startTime);
throw e;
}
}

private void logMetric(String operation, long duration) {
LOG.info(operation + ": " + duration + "ms");
}


@FunctionalInterface
private interface IOCallable<T> {
T call() throws IOException;
}

@Override
public void close() throws IOException {
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -96,8 +97,8 @@ public <T> OptimisticTransaction<T> startTransaction(String indexName, String da
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
try (RestHighLevelClient client = createClient()) {
if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
try (RestHighLevelClientWrapper client = createClient()) {
if (client.exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (forceInit) {
Expand Down Expand Up @@ -130,13 +131,13 @@ public void createIndex(String indexName, FlintMetadata metadata) {
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
try (RestHighLevelClientWrapper client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
if (settings.isDefined()) {
request.settings(settings.get(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
client.createIndex(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index " + osIndexName, e);
}
Expand All @@ -146,8 +147,8 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
try (RestHighLevelClientWrapper client = createClient()) {
return client.exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e);
}
Expand All @@ -157,9 +158,9 @@ public boolean exists(String indexName) {
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
try (RestHighLevelClientWrapper client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.map(index -> FlintMetadata.apply(
Expand All @@ -175,9 +176,9 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
try (RestHighLevelClientWrapper client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
Expand All @@ -191,10 +192,9 @@ public FlintMetadata getIndexMetadata(String indexName) {
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
try (RestHighLevelClientWrapper client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

client.indices().delete(request, RequestOptions.DEFAULT);
client.deleteIndex(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e);
}
Expand Down Expand Up @@ -233,7 +233,7 @@ public FlintWriter createWriter(String indexName) {
}

@Override
public RestHighLevelClient createClient() {
public RestHighLevelClientWrapper createClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand Down Expand Up @@ -283,7 +283,7 @@ public RestHighLevelClient createClient() {
final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClient(restClientBuilder);
return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
Expand Down Expand Up @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
@Override
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

Expand All @@ -102,7 +102,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -150,8 +150,8 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {

private FlintMetadataLogEntry writeLogEntry(
FlintMetadataLogEntry logEntry,
CheckedFunction<RestHighLevelClient, DocWriteResponse> write) {
try (RestHighLevelClient client = flintClient.createClient()) {
CheckedFunction<RestHighLevelClientWrapper, DocWriteResponse> write) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
// Write (create or update) the doc
DocWriteResponse response = write.apply(client);

Expand All @@ -174,8 +174,8 @@ private FlintMetadataLogEntry writeLogEntry(

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (RestHighLevelClient client = flintClient.createClient()) {
return client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
return client.exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;
import org.opensearch.search.SearchHit;

import java.io.IOException;
Expand All @@ -24,14 +24,14 @@ public abstract class OpenSearchReader implements FlintReader {
/** Search request source builder. */
private final SearchRequest searchRequest;

protected final RestHighLevelClient client;
protected final RestHighLevelClientWrapper client;

/**
* iterator of one-shot search result.
*/
private Iterator<SearchHit> iterator = null;

public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) {
public OpenSearchReader(RestHighLevelClientWrapper client, SearchRequest searchRequest) {
this.client = client;
this.searchRequest = searchRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand All @@ -35,7 +35,7 @@ public class OpenSearchScrollReader extends OpenSearchReader {

private String scrollId = null;

public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
public OpenSearchScrollReader(RestHighLevelClientWrapper client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
this.options = options;
this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper;

import java.io.IOException;
import java.util.logging.Level;
Expand All @@ -30,7 +30,7 @@ public void upsert(String id, String doc) {
// credentials may expire.
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (RestHighLevelClient client = flintClient.createClient()) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -47,7 +47,7 @@ public void upsert(String id, String doc) {
}

public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -63,7 +63,7 @@ public void update(String id, String doc) {
}

public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
try (RestHighLevelClientWrapper client = flintClient.createClient()) {
assertIndexExist(client, indexName);
UpdateRequest
updateRequest =
Expand All @@ -80,9 +80,9 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
}
}

private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException {
private void assertIndexExist(RestHighLevelClientWrapper client, String indexName) throws IOException {
LOG.info("Checking if index exists " + indexName);
if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
if (!client.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
String errorMsg = "Index not found " + indexName;
LOG.log(Level.SEVERE, errorMsg);
throw new IllegalStateException(errorMsg);
Expand Down
Loading

0 comments on commit 300501e

Please sign in to comment.