From c874a38da4af5973aae241fd358854e0c450ca7e Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 30 Apr 2024 15:29:05 -0700 Subject: [PATCH] Abstract FlintClient Signed-off-by: Louis Chu --- .../opensearch/flint/core/FlintClient.java | 6 ++--- .../core/storage/FlintOpenSearchClient.java | 26 +++++++++++-------- .../storage/FlintOpenSearchMetadataLog.java | 8 +++--- .../flint/core/storage/OpenSearchUpdater.java | 2 +- .../scala/org/apache/spark/sql/OSClient.scala | 13 +++++----- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index ee78aa512..85a7d81a7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -104,8 +104,8 @@ OptimisticTransaction startTransaction(String indexName, String dataSourc FlintWriter createWriter(String indexName); /** - * Create {@link IRestHighLevelClient}. - * @return {@link IRestHighLevelClient} + * Create {@link Object}. + * @return {@link Object} */ - IRestHighLevelClient createClient(); + Object createClient(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e71e3ded5..e16d38b8d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -101,7 +101,7 @@ public OptimisticTransaction startTransaction( String indexName, String dataSourceName, boolean forceInit) { LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); String metaLogIndexName = constructMetaLogIndexName(dataSourceName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); } else { @@ -135,7 +135,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); if (settings.isDefined()) { @@ -151,7 +151,7 @@ protected void createIndex(String indexName, String mapping, Option sett public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { return client.doesIndexExist(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); @@ -162,7 +162,7 @@ public boolean exists(String indexName) { public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); @@ -181,7 +181,7 @@ public List getAllIndexMetadata(String indexNamePattern) { public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); @@ -197,7 +197,7 @@ public FlintMetadata getIndexMetadata(String indexName) { public void updateIndex(String indexName, FlintMetadata metadata) { LOG.info("Updating Flint index " + indexName + " with metadata " + metadata); String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { PutMappingRequest request = new PutMappingRequest(osIndexName); request.source(metadata.getContent(), XContentType.JSON); client.updateIndexMapping(request, RequestOptions.DEFAULT); @@ -210,7 +210,7 @@ public void updateIndex(String indexName, FlintMetadata metadata) { public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); client.deleteIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { @@ -236,7 +236,7 @@ public FlintReader createReader(String indexName, String query) { XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } - return new OpenSearchScrollReader(createClient(), + return new OpenSearchScrollReader(createOpenSearchClient(), sanitizeIndexName(indexName), new SearchSourceBuilder().query(queryBuilder), options); @@ -247,11 +247,15 @@ public FlintReader createReader(String indexName, String query) { public FlintWriter createWriter(String indexName) { LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); + return new OpenSearchWriter(createOpenSearchClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); } @Override - public IRestHighLevelClient createClient() { + public Object createClient() { + return createOpenSearchClient(); + } + + private IRestHighLevelClient createOpenSearchClient() { RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); @@ -332,7 +336,7 @@ private FlintMetadata constructFlintMetadata(String indexName, String mapping, S : META_LOG_NAME_PREFIX + "_" + dataSourceName; Optional latest = Optional.empty(); - try (IRestHighLevelClient client = createClient()) { + try (IRestHighLevelClient client = createOpenSearchClient()) { if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { LOG.info("Found metadata log index " + metaLogIndexName); FlintOpenSearchMetadataLog metadataLog = diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 7195ae177..86d6a8010 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -77,7 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { @Override public Optional getLatest() { LOG.info("Fetching latest log entry with id " + latestId); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) { GetResponse response = client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -102,7 +102,7 @@ public Optional getLatest() { @Override public void purge() { LOG.info("Purging log entry with id " + latestId); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) { DeleteResponse response = client.delete( new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); @@ -151,7 +151,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { private FlintMetadataLogEntry writeLogEntry( FlintMetadataLogEntry logEntry, CheckedFunction write) { - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) { // Write (create or update) the doc DocWriteResponse response = write.apply(client); @@ -174,7 +174,7 @@ private FlintMetadataLogEntry writeLogEntry( private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) { return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 0d84b4956..4c46b324e 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -57,7 +57,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l // credentials may expire. // also, failure to close the client causes the job to be stuck in the running state as the client resource // is not released. - try (IRestHighLevelClient client = flintClient.createClient()) { + try (IRestHighLevelClient client = (IRestHighLevelClient) flintClient.createClient()) { assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id) .doc(doc, XContentType.JSON) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index f5e4ec2be..6c13aa2e2 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -42,7 +42,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { new SearchModule(Settings.builder.build, new ArrayList[SearchPlugin]).getNamedXContents) def getIndexMetadata(osIndexName: String): String = { - using(flintClient.createClient()) { client => + using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client => val request = new GetIndexRequest(osIndexName) try { val response = client.getIndex(request, RequestOptions.DEFAULT) @@ -70,7 +70,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { def createIndex(osIndexName: String, mapping: String): Unit = { logInfo(s"create $osIndexName") - using(flintClient.createClient()) { client => + using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client => val request = new CreateIndexRequest(osIndexName) request.mapping(mapping, XContentType.JSON) @@ -116,7 +116,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { new OpenSearchUpdater(indexName, flintClient) def getDoc(osIndexName: String, id: String): GetResponse = { - using(flintClient.createClient()) { client => + using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client => val request = new GetRequest(osIndexName, id) val result = Try(client.get(request, RequestOptions.DEFAULT)) result match { @@ -147,7 +147,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser) } new OpenSearchScrollReader( - flintClient.createClient(), + flintClient.createClient().asInstanceOf[IRestHighLevelClient], indexName, new SearchSourceBuilder().query(queryBuilder).sort(sort, SortOrder.ASC), flintOptions) @@ -157,7 +157,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { } def doesIndexExist(indexName: String): Boolean = { - using(flintClient.createClient()) { client => + using(flintClient.createClient().asInstanceOf[IRestHighLevelClient]) { client => try { val request = new GetIndexRequest(indexName) client.doesIndexExist(request, RequestOptions.DEFAULT) @@ -179,8 +179,9 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query) queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser) } + new OpenSearchQueryReader( - flintClient.createClient(), + flintClient.createClient().asInstanceOf[IRestHighLevelClient], indexName, new SearchSourceBuilder().query(queryBuilder).sort(sort, sortOrder)) } catch {