Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Jul 26, 2024
1 parent b49d8d3 commit 92b0fc2
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class FlintOptions implements Serializable {

public static final String SUPPORT_SHARD = "read.support_shard";

public static final String SUPPORT_SHARD_DEFAULT = "true";
public static final String DEFAULT_SUPPORT_SHARD = "true";

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down Expand Up @@ -179,6 +179,7 @@ public String getCustomFlintMetadataLogServiceClass() {
}

public boolean supportShard() {
return options.getOrDefault(SUPPORT_SHARD, SUPPORT_SHARD_DEFAULT).equalsIgnoreCase(SUPPORT_SHARD_DEFAULT);
return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase(
DEFAULT_SUPPORT_SHARD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.table
package org.opensearch.flint.core

/**
* Schema in OpenSearch index mapping format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.table
package org.opensearch.flint.core

import org.opensearch.flint.core.metadata.FlintMetadata

/**
* OpenSearch Table metadata.
*
* @param name
* name
* @param properties
* properties
* @param setting
* setting
*/
case class MetaData(name: String, properties: String, setting: String)

object MetaData {
Expand Down
12 changes: 11 additions & 1 deletion flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.table
package org.opensearch.flint.core

/**
* Table Schema.
*/
trait Schema {

/**
* Return table schema as Json.
*
* @return
* schema.
*/
def asJson(): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.table
package org.opensearch.flint.core

import org.opensearch.flint.core.storage.FlintReader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -22,8 +23,9 @@
*/
public abstract class OpenSearchReader implements FlintReader {

@VisibleForTesting
/** Search request source builder. */
private final SearchRequest searchRequest;
public final SearchRequest searchRequest;

protected final IRestHighLevelClient client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@
package org.opensearch.flint.core.table

import org.opensearch.action.search.SearchRequest
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.{FlintOptions, MetaData, Table}
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.table.{MetaData, OpenSearchIndexTable, Table}
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder

/**
* Represents an OpenSearch index shard.
*
* @param metaData
* MetaData containing information about the OpenSearch index.
* @param option
* FlintOptions containing configuration options for the Flint client.
* @param shardId
* Shard Id.
*/
class OpenSearchIndexShardTable(metaData: MetaData, option: FlintOptions, shardId: Int)
extends OpenSearchIndexTable(metaData, option) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.table
package org.opensearch.flint.core.table

import java.util.logging.Logger

Expand All @@ -14,14 +14,11 @@ import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods
import org.json4s.native.Serialization
import org.opensearch.action.search.SearchRequest
import org.opensearch.client.opensearch._types.Time
import org.opensearch.client.opensearch.core.pit.CreatePitRequest
import org.opensearch.client.opensearch.indices.IndicesStatsRequest
import org.opensearch.client.opensearch.indices.stats.IndicesStats
import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions, JsonSchema, MetaData, Schema, Table}
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.core.table.OpenSearchIndexShardTable
import org.opensearch.flint.table.OpenSearchIndexTable.{maxSplitSizeBytes, LOG}
import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder

Expand Down Expand Up @@ -117,7 +114,8 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
new SearchSourceBuilder()
.query(FlintOpenSearchClient.queryBuilder(query))
.size(pageSize)
.sort("_doc", SortOrder.ASC)))
.sort("_doc", SortOrder.ASC)
.sort("_id", SortOrder.ASC)))
}

/**
Expand Down Expand Up @@ -148,6 +146,9 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab
object OpenSearchIndexTable {
private val LOG = Logger.getLogger(classOf[OpenSearchIndexTable].getName)

/**
* Max OpenSearch Request Page size is 10MB.
*/
val maxSplitSizeBytes = 10 * 1024 * 1024
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import java.util.Optional

import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.opensearch.client.opensearch.core.pit.{CreatePitRequest, CreatePitResponse}
import org.opensearch.client.opensearch.indices.stats.IndicesStats
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.client.opensearch.indices.{IndicesStatsRequest, IndicesStatsResponse}
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient, JsonSchema, MetaData}
import org.opensearch.flint.core.storage.{OpenSearchClientUtils, OpenSearchSearchAfterQueryReader}
import org.opensearch.flint.table.{JsonSchema, MetaData, OpenSearchIndexTable}
import org.opensearch.search.builder.SearchSourceBuilder
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -27,15 +26,11 @@ class OpenSearchIndexTableSpec

private val clientUtils = mockStatic(classOf[OpenSearchClientUtils])
private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS)
private val pitResponse = mock[CreatePitResponse]

before {
clientUtils
.when(() => OpenSearchClientUtils.createClient(any(classOf[FlintOptions])))
.thenReturn(openSearchClient)
when(openSearchClient.createPit(any[CreatePitRequest]))
.thenReturn(pitResponse)
when(pitResponse.pitId()).thenReturn("")
}

def mockTable(
Expand All @@ -46,7 +41,7 @@ class OpenSearchIndexTableSpec
numberOfShards: Int = 1): OpenSearchIndexTable = {
val metaData = mock[MetaData]
val options = mock[FlintOptions]
val mockIndexStats = mock[IndicesStats](RETURNS_DEEP_STUBS)
val mockIndicesStatsResp = mock[IndicesStatsResponse](RETURNS_DEEP_STUBS)

when(metaData.name).thenReturn("test-index")
when(metaData.setting).thenReturn(s"""{"index.number_of_shards":"$numberOfShards"}""")
Expand All @@ -56,11 +51,14 @@ class OpenSearchIndexTableSpec
case None => when(options.getScrollSize).thenReturn(Optional.empty[Integer]())
}
when(options.supportShard()).thenReturn(supportShard)
when(mockIndexStats.total().docs().count()).thenReturn(docCount)
when(mockIndexStats.total().store().sizeInBytes).thenReturn(storeSizeInBytes)

when(openSearchClient.stats(any[IndicesStatsRequest])).thenReturn(mockIndicesStatsResp)
when(mockIndicesStatsResp.indices().get(any[String]).total().docs().count())
.thenReturn(docCount)
when(mockIndicesStatsResp.indices().get(any[String]).total().store().sizeInBytes)
.thenReturn(storeSizeInBytes)

new OpenSearchIndexTable(metaData, options) {
override lazy val indexStats: IndicesStats = mockIndexStats
override lazy val maxResultWindow: Int = 10000
}
}
Expand Down Expand Up @@ -142,5 +140,38 @@ class OpenSearchIndexTableSpec
val table = mockTable(None, 1000L, 10000000L, numberOfShards = 1)
val reader = table.createReader(query)
reader shouldBe a[OpenSearchSearchAfterQueryReader]

val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest
searchRequest.indices() should contain("test-index")

val sourceBuilder = searchRequest.source().asInstanceOf[SearchSourceBuilder]
sourceBuilder.query() should not be null
sourceBuilder.size() shouldBe table.pageSize

val sorts = sourceBuilder.sorts()
sorts.size() shouldBe 2
sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}")
sorts.get(1).toString should include("{\n \"_id\" : {\n \"order\" : \"asc\"\n }\n}")
}

"OpenSearchIndexShardTable" should "create reader correctly" in {
val query = ""
val indexTable = mockTable(None, 1000L, 10000000L, numberOfShards = 3)
val table = indexTable.slice().head
val reader = table.createReader(query)
reader shouldBe a[OpenSearchSearchAfterQueryReader]

val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest
searchRequest.indices() should contain("test-index")

searchRequest.preference() shouldBe "_shards:0"

val sourceBuilder = searchRequest.source()
sourceBuilder.query() should not be null
sourceBuilder.size() shouldBe indexTable.pageSize

val sorts = sourceBuilder.sorts()
sorts.size() shouldBe 1
sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.apache.spark.sql.flint

import java.util

import org.opensearch.flint.table.OpenSearchCluster
import org.opensearch.flint.core.table.OpenSearchCluster

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
Expand Down Expand Up @@ -38,7 +38,7 @@ class FlintReadOnlyTable(

lazy val name: String = flintSparkConf.tableName()

lazy val tables: Seq[org.opensearch.flint.table.Table] =
lazy val tables: Seq[org.opensearch.flint.core.Table] =
OpenSearchCluster.apply(name, flintSparkConf.flintOptions())

lazy val resolvedTablesSchema: StructType = tables.headOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types.StructType

case class FlintScan(
tables: Seq[org.opensearch.flint.table.Table],
tables: Seq[org.opensearch.flint.core.Table],
schema: StructType,
options: FlintSparkConf,
pushedPredicates: Array[Predicate])
Expand Down Expand Up @@ -54,7 +54,7 @@ case class FlintScan(
* Each OpenSearchSplit is backed by an OpenSearch index table.
*
* @param table
* {@link org.opensearch.flint.table.Table}
* {@link org.opensearch.flint.core.Table}
*/
private[spark] case class OpenSearchSplit(table: org.opensearch.flint.table.Table)
private[spark] case class OpenSearchSplit(table: org.opensearch.flint.core.Table)
extends InputPartition {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.sql.flint.storage.FlintQueryCompiler
import org.apache.spark.sql.types.StructType

case class FlintScanBuilder(
tables: Seq[org.opensearch.flint.table.Table],
tables: Seq[org.opensearch.flint.core.Table],
schema: StructType,
options: FlintSparkConf)
extends ScanBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object FlintSparkConf {
val SUPPORT_SHARD = FlintConfig(s"spark.datasource.flint.${FlintOptions.SUPPORT_SHARD}")
.datasourceOption()
.doc("indicate does index support shard or not")
.createWithDefault(String.valueOf(FlintOptions.SUPPORT_SHARD_DEFAULT))
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SUPPORT_SHARD))

val MAX_RETRIES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.MAX_RETRIES}")
.datasourceOption()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
FlintDataType.deserialize(flintDataType) should contain theSameElementsAs sparkStructType
}

ignore("deserialize unsupported flint data type throw exception") {
test("deserialize unsupported flint data type throw exception") {
val unsupportedField = """{
"properties": {
"rangeField": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
| "number_of_replicas": "0"
|}""".stripMargin

val multipleShardSetting = """{
| "number_of_shards": "2",
| "number_of_replicas": "0"
|}""".stripMargin

def simpleIndex(indexName: String): Unit = {
val mappings = """{
| "properties": {
Expand Down Expand Up @@ -101,6 +106,19 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
| }
|}""".stripMargin

val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin
index(indexName, multipleShardSetting, mappings, docs)
}

def multipleShardAndDocIndex(indexName: String, N: Int): Unit = {
val mappings = """{
| "properties": {
| "id": {
| "type": "integer"
| }
| }
|}""".stripMargin

val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin
index(indexName, oneNodeSetting, mappings, docs)
}
Expand Down
Loading

0 comments on commit 92b0fc2

Please sign in to comment.