From d270b41f571e24996b68c7ca55bde36c906b4f70 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 5 Aug 2024 09:59:25 -0700 Subject: [PATCH] address comments Signed-off-by: Peng Huo --- docs/index.md | 1 + .../core/RestHighLevelClientWrapper.java | 2 +- .../opensearch/flint/core/FlintOptions.java | 7 +++++ .../org/opensearch/flint/core/Table.scala | 31 +++++++++++++++++++ .../core/storage/FlintOpenSearchClient.java | 29 ----------------- .../table/OpenSearchIndexShardTable.scala | 4 +-- .../core/table/OpenSearchIndexTable.scala | 15 +++------ .../spark/sql/flint/FlintScanBuilder.scala | 4 +-- .../ApplyFlintSparkCoveringIndexSuite.scala | 5 --- 9 files changed, 48 insertions(+), 50 deletions(-) diff --git a/docs/index.md b/docs/index.md index 249e7a770..981ed16f0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -532,6 +532,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. +- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. - `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance. - `spark.flint.index.hybridscan.enabled`: default is false. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index b9c58fd8d..f26a6c158 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -126,7 +126,7 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options @Override public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { - return execute(OS_WRITE_OP_METRIC_PREFIX, + return execute(OS_READ_OP_METRIC_PREFIX, () -> { OpenSearchClient openSearchClient = new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index a57573828..2678a8f67 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -186,6 +186,13 @@ public String getCustomFlintMetadataLogServiceClass() { return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); } + /** + * FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard + * operation, but shard info is exposed in index settings. Remove this setting when AOSS fix + * the bug. + * + * @return + */ public boolean supportShard() { return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase( DEFAULT_SUPPORT_SHARD); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala index 336ecd64a..a714542e7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala @@ -5,7 +5,17 @@ package org.opensearch.flint.core +import java.io.IOException +import java.util + +import com.google.common.base.Strings +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentType} +import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS import org.opensearch.flint.core.storage.FlintReader +import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder} +import org.opensearch.plugins.SearchPlugin +import org.opensearch.search.SearchModule /** * A OpenSearch Table. @@ -52,3 +62,24 @@ trait Table extends Serializable { */ def schema(): Schema } + +object Table { + + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link + * QueryBuilder} from DSL query string. + */ + val xContentRegistry = new NamedXContentRegistry( + new SearchModule(Settings.builder.build, new util.ArrayList[SearchPlugin]).getNamedXContents) + + @throws[IOException] + def queryBuilder(query: String): QueryBuilder = { + if (!Strings.isNullOrEmpty(query)) { + val parser = + XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query) + AbstractQueryBuilder.parseInnerQueryBuilder(parser) + } else { + new MatchAllQueryBuilder + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index a9cf7b288..1a7c976c2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,7 +5,6 @@ package org.opensearch.flint.core.storage; -import com.google.common.base.Strings; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; @@ -14,21 +13,14 @@ import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.index.query.AbstractQueryBuilder; -import org.opensearch.index.query.MatchAllQueryBuilder; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.search.SearchModule; import scala.Option; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Locale; import java.util.Map; @@ -37,8 +29,6 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; - /** * Flint client implementation for OpenSearch storage. */ @@ -46,14 +36,6 @@ public class FlintOpenSearchClient implements FlintClient { private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName()); - /** - * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. - */ - public final static NamedXContentRegistry - xContentRegistry = - new NamedXContentRegistry(new SearchModule(Settings.builder().build(), - new ArrayList<>()).getNamedXContents()); - /** * Invalid index name characters to percent-encode, * excluding '*' because it's reserved for pattern matching. @@ -67,17 +49,6 @@ public FlintOpenSearchClient(FlintOptions options) { this.options = options; } - public static QueryBuilder queryBuilder(String query) throws IOException { - QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - if (!Strings.isNullOrEmpty(query)) { - XContentParser - parser = - XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); - queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); - } - return queryBuilder; - } - @Override public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala index 8ee10d780..594db748b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.core.table import org.opensearch.action.search.SearchRequest import org.opensearch.flint.core.{FlintOptions, MetaData, Table} -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} +import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -35,7 +35,7 @@ class OpenSearchIndexShardTable(metaData: MetaData, option: FlintOptions, shardI .indices(name) .source( new SearchSourceBuilder() - .query(FlintOpenSearchClient.queryBuilder(query)) + .query(Table.queryBuilder(query)) .size(pageSize) .sort("_doc", SortOrder.ASC)) .preference(s"_shards:$shardId")) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala index 554e0a674..d016fe9b7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.core.table -import java.util.logging.Logger - import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} @@ -16,8 +14,8 @@ import org.json4s.native.Serialization import org.opensearch.action.search.SearchRequest import org.opensearch.client.opensearch.indices.IndicesStatsRequest import org.opensearch.client.opensearch.indices.stats.IndicesStats -import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions, JsonSchema, MetaData, Schema, Table} -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} +import org.opensearch.flint.core._ +import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -55,7 +53,7 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab if (option.getScrollSize.isPresent) { option.getScrollSize.get() } else { - val docCount = indexStats.total().docs().count() + val docCount = indexStats.primaries().docs().count() if (docCount == 0) { maxResultWindow } else { @@ -102,8 +100,6 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab * The query string. * @return * A FlintReader instance. - * @throws UnsupportedOperationException - * if this method is called. */ override def createReader(query: String): FlintReader = { new OpenSearchSearchAfterQueryReader( @@ -112,8 +108,8 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab .indices(name) .source( new SearchSourceBuilder() - .query(FlintOpenSearchClient.queryBuilder(query)) - .size(pageSize) + .query(Table.queryBuilder(query)) + .size((pageSize)) .sort("_doc", SortOrder.ASC) .sort("_id", SortOrder.ASC))) } @@ -144,7 +140,6 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab } object OpenSearchIndexTable { - private val LOG = Logger.getLogger(classOf[OpenSearchIndexTable].getName) /** * Max OpenSearch Request Page size is 10MB. diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index 0daecb280..0c6f7d700 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -22,10 +22,8 @@ case class FlintScanBuilder( private var pushedPredicate = Array.empty[Predicate] - lazy val scan = FlintScan(tables, schema, options, pushedPredicate) - override def build(): Scan = { - scan + FlintScan(tables, schema, options, pushedPredicate) } override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index f63c27557..a590eccb1 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -9,7 +9,6 @@ import scala.collection.JavaConverters._ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} -import org.opensearch.client.opensearch.core.pit.{CreatePitRequest, CreatePitResponse} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} @@ -38,7 +37,6 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { /** Mock IRestHighLevelClient to avoid looking for real OpenSearch cluster */ private val clientUtils = mockStatic(classOf[OpenSearchClientUtils]) private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS) - private val pitResponse = mock[CreatePitResponse] /** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */ private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS) @@ -278,9 +276,6 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { indexes.foreach { index => when(client.getAllIndexMetadata(index.name())) .thenReturn(Map.apply(index.name() -> index.metadata()).asJava) - when(openSearchClient.createPit(any[CreatePitRequest])) - .thenReturn(pitResponse) - when(pitResponse.pitId()).thenReturn("") } rule.apply(plan) }