Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Aug 5, 2024
1 parent eb69d2e commit d270b41
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 50 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
- `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance.
- `spark.flint.index.hybridscan.enabled`: default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX,
return execute(OS_READ_OP_METRIC_PREFIX,
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

/**
* FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard
* operation, but shard info is exposed in index settings. Remove this setting when AOSS fix
* the bug.
*
* @return
*/
public boolean supportShard() {
return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase(
DEFAULT_SUPPORT_SHARD);
Expand Down
31 changes: 31 additions & 0 deletions flint-core/src/main/scala/org/opensearch/flint/core/Table.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@

package org.opensearch.flint.core

import java.io.IOException
import java.util

import com.google.common.base.Strings
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentType}
import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS
import org.opensearch.flint.core.storage.FlintReader
import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder}
import org.opensearch.plugins.SearchPlugin
import org.opensearch.search.SearchModule

/**
* A OpenSearch Table.
Expand Down Expand Up @@ -52,3 +62,24 @@ trait Table extends Serializable {
*/
def schema(): Schema
}

object Table {

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link
* QueryBuilder} from DSL query string.
*/
val xContentRegistry = new NamedXContentRegistry(
new SearchModule(Settings.builder.build, new util.ArrayList[SearchPlugin]).getNamedXContents)

@throws[IOException]
def queryBuilder(query: String): QueryBuilder = {
if (!Strings.isNullOrEmpty(query)) {
val parser =
XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query)
AbstractQueryBuilder.parseInnerQueryBuilder(parser)
} else {
new MatchAllQueryBuilder
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.flint.core.storage;

import com.google.common.base.Strings;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.CreateIndexRequest;
Expand All @@ -14,21 +13,14 @@
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import scala.Option;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
Expand All @@ -37,23 +29,13 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

/**
* Flint client implementation for OpenSearch storage.
*/
public class FlintOpenSearchClient implements FlintClient {

private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName());

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string.
*/
public final static NamedXContentRegistry
xContentRegistry =
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());

/**
* Invalid index name characters to percent-encode,
* excluding '*' because it's reserved for pattern matching.
Expand All @@ -67,17 +49,6 @@ public FlintOpenSearchClient(FlintOptions options) {
this.options = options;
}

public static QueryBuilder queryBuilder(String query) throws IOException {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
if (!Strings.isNullOrEmpty(query)) {
XContentParser
parser =
XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query);
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return queryBuilder;
}

@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.core.table

import org.opensearch.action.search.SearchRequest
import org.opensearch.flint.core.{FlintOptions, MetaData, Table}
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder

Expand Down Expand Up @@ -35,7 +35,7 @@ class OpenSearchIndexShardTable(metaData: MetaData, option: FlintOptions, shardI
.indices(name)
.source(
new SearchSourceBuilder()
.query(FlintOpenSearchClient.queryBuilder(query))
.query(Table.queryBuilder(query))
.size(pageSize)
.sort("_doc", SortOrder.ASC))
.preference(s"_shards:$shardId"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.core.table

import java.util.logging.Logger

import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
Expand All @@ -16,8 +14,8 @@ import org.json4s.native.Serialization
import org.opensearch.action.search.SearchRequest
import org.opensearch.client.opensearch.indices.IndicesStatsRequest
import org.opensearch.client.opensearch.indices.stats.IndicesStats
import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions, JsonSchema, MetaData, Schema, Table}
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.core._
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -55,7 +53,7 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
if (option.getScrollSize.isPresent) {
option.getScrollSize.get()
} else {
val docCount = indexStats.total().docs().count()
val docCount = indexStats.primaries().docs().count()
if (docCount == 0) {
maxResultWindow
} else {
Expand Down Expand Up @@ -102,8 +100,6 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
* The query string.
* @return
* A FlintReader instance.
* @throws UnsupportedOperationException
* if this method is called.
*/
override def createReader(query: String): FlintReader = {
new OpenSearchSearchAfterQueryReader(
Expand All @@ -112,8 +108,8 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
.indices(name)
.source(
new SearchSourceBuilder()
.query(FlintOpenSearchClient.queryBuilder(query))
.size(pageSize)
.query(Table.queryBuilder(query))
.size((pageSize))
.sort("_doc", SortOrder.ASC)
.sort("_id", SortOrder.ASC)))
}
Expand Down Expand Up @@ -144,7 +140,6 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
}

object OpenSearchIndexTable {
private val LOG = Logger.getLogger(classOf[OpenSearchIndexTable].getName)

/**
* Max OpenSearch Request Page size is 10MB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ case class FlintScanBuilder(

private var pushedPredicate = Array.empty[Predicate]

lazy val scan = FlintScan(tables, schema, options, pushedPredicate)

override def build(): Scan = {
scan
FlintScan(tables, schema, options, pushedPredicate)
}

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.collection.JavaConverters._

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS}
import org.opensearch.client.opensearch.core.pit.{CreatePitRequest, CreatePitResponse}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient}
Expand Down Expand Up @@ -38,7 +37,6 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
/** Mock IRestHighLevelClient to avoid looking for real OpenSearch cluster */
private val clientUtils = mockStatic(classOf[OpenSearchClientUtils])
private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS)
private val pitResponse = mock[CreatePitResponse]

/** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */
private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS)
Expand Down Expand Up @@ -278,9 +276,6 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
indexes.foreach { index =>
when(client.getAllIndexMetadata(index.name()))
.thenReturn(Map.apply(index.name() -> index.metadata()).asJava)
when(openSearchClient.createPit(any[CreatePitRequest]))
.thenReturn(pitResponse)
when(pitResponse.pitId()).thenReturn("")
}
rule.apply(plan)
}
Expand Down

0 comments on commit d270b41

Please sign in to comment.