diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 6cdf5187d..335aaa79d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -7,9 +7,9 @@ import java.util.List; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; @@ -96,8 +96,8 @@ OptimisticTransaction startTransaction(String indexName, String dataSourc */ FlintWriter createWriter(String indexName); /** - * Create {@link RestHighLevelClient}. - * @return {@link RestHighLevelClient} + * Create {@link RestHighLevelClientWrapper}. + * @return {@link RestHighLevelClientWrapper} */ - public RestHighLevelClient createClient(); + public RestHighLevelClientWrapper createClient(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/IRestHighLevelClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/IRestHighLevelClient.java new file mode 100644 index 000000000..147999dfa --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/IRestHighLevelClient.java @@ -0,0 +1,4 @@ +package org.opensearch.flint.core.metrics; + +public interface IRestHighLevelClient { +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metrics/RestHighLevelClientWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/RestHighLevelClientWrapper.java new file mode 100644 index 000000000..cd68b1e71 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metrics/RestHighLevelClientWrapper.java @@ -0,0 +1,115 @@ +package org.opensearch.flint.core.metrics; + +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.client.RequestOptions; + +import java.io.Closeable; +import java.io.IOException; +import java.util.logging.Logger; + +public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable { + private static final Logger LOG = Logger.getLogger(RestHighLevelClientWrapper.class.getName()); + private final RestHighLevelClient client; + + public RestHighLevelClientWrapper(RestHighLevelClient client) { + this.client = client; + } + + public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException { + return execute("get", () -> client.get(getRequest, options)); + } + + public Boolean exists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { + return execute("exists", () -> client.indices().exists(getIndexRequest, options)); + } + + public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException { + return execute("createIndex", () -> client.indices().create(createIndexRequest, options)); + } + + public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException { + return execute("getIndex", () -> client.indices().get(getIndexRequest, options)); + } + + public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { + execute("deleteIndex", () -> client.indices().delete(deleteIndexRequest, options)); + } + + public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException { + return execute("index", () -> client.index(indexRequest, options)); + } + + public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { + return execute("bulk", () -> client.bulk(bulkRequest, options)); + } + + public DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException { + return execute("update", () -> client.update(updateRequest, options)); + } + + // Example wrapper for the delete method + public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { + return execute("delete", () -> client.delete(deleteRequest, options)); + } + + public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException { + return execute("search", () -> client.search(searchRequest, options)); + } + + public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException { + return execute("scroll", () -> client.scroll(searchScrollRequest, options)); + } + + public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException { + return execute("clearScroll", () -> client.clearScroll(clearScrollRequest, options)); + } + + + // Generic method to execute and log synchronous operations + private T execute(String operationName, IOCallable operation) throws IOException { + long startTime = System.currentTimeMillis(); + try { + T result = operation.call(); + logMetric(operationName + " - success", System.currentTimeMillis() - startTime); + return result; + } catch (Exception e) { + logMetric(operationName + " - failure", System.currentTimeMillis() - startTime); + throw e; + } + } + + private void logMetric(String operation, long duration) { + LOG.info(operation + ": " + duration + "ms"); + } + + + @FunctionalInterface + private interface IOCallable { + T call() throws IOException; + } + + @Override + public void close() throws IOException { + client.close(); + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 4e549df2b..772db9c81 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -49,6 +49,7 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -96,8 +97,8 @@ public OptimisticTransaction startTransaction(String indexName, String da LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; - try (RestHighLevelClient client = createClient()) { - if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { + try (RestHighLevelClientWrapper client = createClient()) { + if (client.exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); } else { if (forceInit) { @@ -130,13 +131,13 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); } - client.indices().create(request, RequestOptions.DEFAULT); + client.createIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } @@ -146,8 +147,8 @@ protected void createIndex(String indexName, String mapping, Option sett public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + try (RestHighLevelClientWrapper client = createClient()) { + return client.exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); } @@ -157,9 +158,9 @@ public boolean exists(String indexName) { public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) .map(index -> FlintMetadata.apply( @@ -175,9 +176,9 @@ public List getAllIndexMetadata(String indexNamePattern) { public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); @@ -191,10 +192,9 @@ public FlintMetadata getIndexMetadata(String indexName) { public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (RestHighLevelClient client = createClient()) { + try (RestHighLevelClientWrapper client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); - - client.indices().delete(request, RequestOptions.DEFAULT); + client.deleteIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } @@ -233,7 +233,7 @@ public FlintWriter createWriter(String indexName) { } @Override - public RestHighLevelClient createClient() { + public RestHighLevelClientWrapper createClient() { RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); @@ -283,7 +283,7 @@ public RestHighLevelClient createClient() { final RequestConfigurator callback = new RequestConfigurator(options); restClientBuilder.setRequestConfigCallback(callback); - return new RestHighLevelClient(restClientBuilder); + return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); } /* diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index ab38a5f60..b0e79a0e8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -21,12 +21,12 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { @Override public Optional getLatest() { LOG.info("Fetching latest log entry with id " + latestId); - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { GetResponse response = client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -102,7 +102,7 @@ public Optional getLatest() { @Override public void purge() { LOG.info("Purging log entry with id " + latestId); - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { DeleteResponse response = client.delete( new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -150,8 +150,8 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { private FlintMetadataLogEntry writeLogEntry( FlintMetadataLogEntry logEntry, - CheckedFunction write) { - try (RestHighLevelClient client = flintClient.createClient()) { + CheckedFunction write) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { // Write (create or update) the doc DocWriteResponse response = write.apply(client); @@ -174,8 +174,8 @@ private FlintMetadataLogEntry writeLogEntry( private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); - try (RestHighLevelClient client = flintClient.createClient()) { - return client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); + try (RestHighLevelClientWrapper client = flintClient.createClient()) { + return client.exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index 472431bf1..a83f21e80 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -7,7 +7,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.RestHighLevelClient; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import org.opensearch.search.SearchHit; import java.io.IOException; @@ -24,14 +24,14 @@ public abstract class OpenSearchReader implements FlintReader { /** Search request source builder. */ private final SearchRequest searchRequest; - protected final RestHighLevelClient client; + protected final RestHighLevelClientWrapper client; /** * iterator of one-shot search result. */ private Iterator iterator = null; - public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) { + public OpenSearchReader(RestHighLevelClientWrapper client, SearchRequest searchRequest) { this.client = client; this.searchRequest = searchRequest; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d71014c20..3349abd7a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -11,10 +11,10 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -35,7 +35,7 @@ public class OpenSearchScrollReader extends OpenSearchReader { private String scrollId = null; - public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { + public OpenSearchScrollReader(RestHighLevelClientWrapper client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize()))); this.options = options; this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration()); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 4a6424512..49a4fb9dd 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -3,10 +3,10 @@ import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import java.io.IOException; import java.util.logging.Level; @@ -30,7 +30,7 @@ public void upsert(String id, String doc) { // credentials may expire. // also, failure to close the client causes the job to be stuck in the running state as the client resource // is not released. - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -47,7 +47,7 @@ public void upsert(String id, String doc) { } public void update(String id, String doc) { - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -63,7 +63,7 @@ public void update(String id, String doc) { } public void updateIf(String id, String doc, long seqNo, long primaryTerm) { - try (RestHighLevelClient client = flintClient.createClient()) { + try (RestHighLevelClientWrapper client = flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = @@ -80,9 +80,9 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { } } - private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { + private void assertIndexExist(RestHighLevelClientWrapper client, String indexName) throws IOException { LOG.info("Checking if index exists " + indexName); - if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { + if (!client.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { String errorMsg = "Index not found " + indexName; LOG.log(Level.SEVERE, errorMsg); throw new IllegalStateException(errorMsg); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index 1e55084b2..0f7336435 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -11,8 +11,8 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.metrics.RestHighLevelClientWrapper; import org.opensearch.rest.RestStatus; import java.io.BufferedWriter; @@ -35,9 +35,9 @@ public class OpenSearchWriter extends FlintWriter { private StringBuilder sb; - private RestHighLevelClient client; + private RestHighLevelClientWrapper client; - public OpenSearchWriter(RestHighLevelClient client, String indexName, String refreshPolicy) { + public OpenSearchWriter(RestHighLevelClientWrapper client, String indexName, String refreshPolicy) { this.client = client; this.indexName = indexName; this.sb = new StringBuilder(); diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index ceacc7bcd..e70ac3ce8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -41,7 +41,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { using(flintClient.createClient()) { client => val request = new GetIndexRequest(osIndexName) try { - val response = client.indices.get(request, RequestOptions.DEFAULT) + val response = client.getIndex(request, RequestOptions.DEFAULT) response.getMappings.get(osIndexName).source.string } catch { case e: Exception => @@ -71,7 +71,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { request.mapping(mapping, XContentType.JSON) try { - client.indices.create(request, RequestOptions.DEFAULT) + client.createIndex(request, RequestOptions.DEFAULT) logInfo(s"create $osIndexName successfully") } catch { case e: Exception => @@ -145,7 +145,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { using(flintClient.createClient()) { client => try { val request = new GetIndexRequest(indexName) - client.indices().exists(request, RequestOptions.DEFAULT) + client.exists(request, RequestOptions.DEFAULT) } catch { case e: Exception => throw new IllegalStateException(s"Failed to check if index $indexName exists", e)