From 92b0fc20a385a9905d38ecd0e0bd873fd09271c2 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 25 Jul 2024 17:40:35 -0700 Subject: [PATCH] update Signed-off-by: Peng Huo --- .../opensearch/flint/core/FlintClient.java | 1 - .../opensearch/flint/core/FlintOptions.java | 5 +- .../opensearch/flint/core/JsonSchema.scala | 2 +- .../org/opensearch/flint/core/MetaData.scala | 12 +++- .../org/opensearch/flint/core/Schema.scala | 12 +++- .../org/opensearch/flint/core/Table.scala | 2 +- .../flint/core/storage/OpenSearchReader.java | 4 +- .../table/OpenSearchIndexShardTable.scala | 13 ++++- .../core/table/OpenSearchIndexTable.scala | 15 ++--- .../core/table/OpenSearchIndexTableSpec.scala | 55 +++++++++++++++---- .../spark/sql/flint/FlintReadOnlyTable.scala | 4 +- .../apache/spark/sql/flint/FlintScan.scala | 6 +- .../spark/sql/flint/FlintScanBuilder.scala | 2 +- .../sql/flint/config/FlintSparkConf.scala | 2 +- .../flint/datatype/FlintDataTypeSuite.scala | 2 +- .../opensearch/flint/OpenSearchSuite.scala | 18 ++++++ .../core/FlintOpenSearchClientSuite.scala | 48 ++++++++++++++-- 17 files changed, 160 insertions(+), 43 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 5f69fa82a..1a3775f0b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -8,7 +8,6 @@ import java.util.Map; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; /** diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 5b753dd5c..3c6e438c1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -95,7 +95,7 @@ public class FlintOptions implements Serializable { public static final String SUPPORT_SHARD = "read.support_shard"; - public static final String SUPPORT_SHARD_DEFAULT = "true"; + public static final String DEFAULT_SUPPORT_SHARD = "true"; public FlintOptions(Map options) { this.options = options; @@ -179,6 +179,7 @@ public String getCustomFlintMetadataLogServiceClass() { } public boolean supportShard() { - return options.getOrDefault(SUPPORT_SHARD, SUPPORT_SHARD_DEFAULT).equalsIgnoreCase(SUPPORT_SHARD_DEFAULT); + return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase( + DEFAULT_SUPPORT_SHARD); } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala b/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala index a0f3a3175..f631dda0c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.table +package org.opensearch.flint.core /** * Schema in OpenSearch index mapping format. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala index 3806879ba..98b7f8960 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala @@ -3,10 +3,20 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.table +package org.opensearch.flint.core import org.opensearch.flint.core.metadata.FlintMetadata +/** + * OpenSearch Table metadata. + * + * @param name + * name + * @param properties + * properties + * @param setting + * setting + */ case class MetaData(name: String, properties: String, setting: String) object MetaData { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala b/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala index b5cbf54cb..37cb2204f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala @@ -3,8 +3,18 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.table +package org.opensearch.flint.core +/** + * Table Schema. + */ trait Schema { + + /** + * Return table schema as Json. + * + * @return + * schema. + */ def asJson(): String } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala index 66c0eb35f..336ecd64a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.table +package org.opensearch.flint.core import org.opensearch.flint.core.storage.FlintReader diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index e2e831bd0..d5fb45f99 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core.storage; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -22,8 +23,9 @@ */ public abstract class OpenSearchReader implements FlintReader { + @VisibleForTesting /** Search request source builder. */ - private final SearchRequest searchRequest; + public final SearchRequest searchRequest; protected final IRestHighLevelClient client; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala index afde17136..8ee10d780 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala @@ -6,12 +6,21 @@ package org.opensearch.flint.core.table import org.opensearch.action.search.SearchRequest -import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.{FlintOptions, MetaData, Table} import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} -import org.opensearch.flint.table.{MetaData, OpenSearchIndexTable, Table} import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder +/** + * Represents an OpenSearch index shard. + * + * @param metaData + * MetaData containing information about the OpenSearch index. + * @param option + * FlintOptions containing configuration options for the Flint client. + * @param shardId + * Shard Id. + */ class OpenSearchIndexShardTable(metaData: MetaData, option: FlintOptions, shardId: Int) extends OpenSearchIndexTable(metaData, option) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala index 45ff263cd..554e0a674 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.table +package org.opensearch.flint.core.table import java.util.logging.Logger @@ -14,14 +14,11 @@ import org.json4s.JsonAST.JString import org.json4s.jackson.JsonMethods import org.json4s.native.Serialization import org.opensearch.action.search.SearchRequest -import org.opensearch.client.opensearch._types.Time -import org.opensearch.client.opensearch.core.pit.CreatePitRequest import org.opensearch.client.opensearch.indices.IndicesStatsRequest import org.opensearch.client.opensearch.indices.stats.IndicesStats -import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions, JsonSchema, MetaData, Schema, Table} import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} -import org.opensearch.flint.core.table.OpenSearchIndexShardTable -import org.opensearch.flint.table.OpenSearchIndexTable.{maxSplitSizeBytes, LOG} +import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -117,7 +114,8 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab new SearchSourceBuilder() .query(FlintOpenSearchClient.queryBuilder(query)) .size(pageSize) - .sort("_doc", SortOrder.ASC))) + .sort("_doc", SortOrder.ASC) + .sort("_id", SortOrder.ASC))) } /** @@ -148,6 +146,9 @@ class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Tab object OpenSearchIndexTable { private val LOG = Logger.getLogger(classOf[OpenSearchIndexTable].getName) + /** + * Max OpenSearch Request Page size is 10MB. + */ val maxSplitSizeBytes = 10 * 1024 * 1024 } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala b/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala index 758f8f950..a30d2de8b 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala @@ -9,11 +9,10 @@ import java.util.Optional import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ -import org.opensearch.client.opensearch.core.pit.{CreatePitRequest, CreatePitResponse} -import org.opensearch.client.opensearch.indices.stats.IndicesStats -import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} +import org.opensearch.client.opensearch.indices.{IndicesStatsRequest, IndicesStatsResponse} +import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient, JsonSchema, MetaData} import org.opensearch.flint.core.storage.{OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} -import org.opensearch.flint.table.{JsonSchema, MetaData, OpenSearchIndexTable} +import org.opensearch.search.builder.SearchSourceBuilder import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -27,15 +26,11 @@ class OpenSearchIndexTableSpec private val clientUtils = mockStatic(classOf[OpenSearchClientUtils]) private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS) - private val pitResponse = mock[CreatePitResponse] before { clientUtils .when(() => OpenSearchClientUtils.createClient(any(classOf[FlintOptions]))) .thenReturn(openSearchClient) - when(openSearchClient.createPit(any[CreatePitRequest])) - .thenReturn(pitResponse) - when(pitResponse.pitId()).thenReturn("") } def mockTable( @@ -46,7 +41,7 @@ class OpenSearchIndexTableSpec numberOfShards: Int = 1): OpenSearchIndexTable = { val metaData = mock[MetaData] val options = mock[FlintOptions] - val mockIndexStats = mock[IndicesStats](RETURNS_DEEP_STUBS) + val mockIndicesStatsResp = mock[IndicesStatsResponse](RETURNS_DEEP_STUBS) when(metaData.name).thenReturn("test-index") when(metaData.setting).thenReturn(s"""{"index.number_of_shards":"$numberOfShards"}""") @@ -56,11 +51,14 @@ class OpenSearchIndexTableSpec case None => when(options.getScrollSize).thenReturn(Optional.empty[Integer]()) } when(options.supportShard()).thenReturn(supportShard) - when(mockIndexStats.total().docs().count()).thenReturn(docCount) - when(mockIndexStats.total().store().sizeInBytes).thenReturn(storeSizeInBytes) + + when(openSearchClient.stats(any[IndicesStatsRequest])).thenReturn(mockIndicesStatsResp) + when(mockIndicesStatsResp.indices().get(any[String]).total().docs().count()) + .thenReturn(docCount) + when(mockIndicesStatsResp.indices().get(any[String]).total().store().sizeInBytes) + .thenReturn(storeSizeInBytes) new OpenSearchIndexTable(metaData, options) { - override lazy val indexStats: IndicesStats = mockIndexStats override lazy val maxResultWindow: Int = 10000 } } @@ -142,5 +140,38 @@ class OpenSearchIndexTableSpec val table = mockTable(None, 1000L, 10000000L, numberOfShards = 1) val reader = table.createReader(query) reader shouldBe a[OpenSearchSearchAfterQueryReader] + + val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest + searchRequest.indices() should contain("test-index") + + val sourceBuilder = searchRequest.source().asInstanceOf[SearchSourceBuilder] + sourceBuilder.query() should not be null + sourceBuilder.size() shouldBe table.pageSize + + val sorts = sourceBuilder.sorts() + sorts.size() shouldBe 2 + sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}") + sorts.get(1).toString should include("{\n \"_id\" : {\n \"order\" : \"asc\"\n }\n}") + } + + "OpenSearchIndexShardTable" should "create reader correctly" in { + val query = "" + val indexTable = mockTable(None, 1000L, 10000000L, numberOfShards = 3) + val table = indexTable.slice().head + val reader = table.createReader(query) + reader shouldBe a[OpenSearchSearchAfterQueryReader] + + val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest + searchRequest.indices() should contain("test-index") + + searchRequest.preference() shouldBe "_shards:0" + + val sourceBuilder = searchRequest.source() + sourceBuilder.query() should not be null + sourceBuilder.size() shouldBe indexTable.pageSize + + val sorts = sourceBuilder.sorts() + sorts.size() shouldBe 1 + sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}") } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala index eae0e6495..ed6902841 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -7,7 +7,7 @@ package org.apache.spark.sql.flint import java.util -import org.opensearch.flint.table.OpenSearchCluster +import org.opensearch.flint.core.table.OpenSearchCluster import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} @@ -38,7 +38,7 @@ class FlintReadOnlyTable( lazy val name: String = flintSparkConf.tableName() - lazy val tables: Seq[org.opensearch.flint.table.Table] = + lazy val tables: Seq[org.opensearch.flint.core.Table] = OpenSearchCluster.apply(name, flintSparkConf.flintOptions()) lazy val resolvedTablesSchema: StructType = tables.headOption diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala index 8140948f4..201e7c748 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala @@ -12,7 +12,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.StructType case class FlintScan( - tables: Seq[org.opensearch.flint.table.Table], + tables: Seq[org.opensearch.flint.core.Table], schema: StructType, options: FlintSparkConf, pushedPredicates: Array[Predicate]) @@ -54,7 +54,7 @@ case class FlintScan( * Each OpenSearchSplit is backed by an OpenSearch index table. * * @param table - * {@link org.opensearch.flint.table.Table} + * {@link org.opensearch.flint.core.Table} */ -private[spark] case class OpenSearchSplit(table: org.opensearch.flint.table.Table) +private[spark] case class OpenSearchSplit(table: org.opensearch.flint.core.Table) extends InputPartition {} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index fcebcefe0..0daecb280 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -13,7 +13,7 @@ import org.apache.spark.sql.flint.storage.FlintQueryCompiler import org.apache.spark.sql.types.StructType case class FlintScanBuilder( - tables: Seq[org.opensearch.flint.table.Table], + tables: Seq[org.opensearch.flint.core.Table], schema: StructType, options: FlintSparkConf) extends ScanBuilder diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 56de5d6b7..525172240 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -123,7 +123,7 @@ object FlintSparkConf { val SUPPORT_SHARD = FlintConfig(s"spark.datasource.flint.${FlintOptions.SUPPORT_SHARD}") .datasourceOption() .doc("indicate does index support shard or not") - .createWithDefault(String.valueOf(FlintOptions.SUPPORT_SHARD_DEFAULT)) + .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SUPPORT_SHARD)) val MAX_RETRIES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.MAX_RETRIES}") .datasourceOption() diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index ed401060e..9cf9c553f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -70,7 +70,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { FlintDataType.deserialize(flintDataType) should contain theSameElementsAs sparkStructType } - ignore("deserialize unsupported flint data type throw exception") { + test("deserialize unsupported flint data type throw exception") { val unsupportedField = """{ "properties": { "rangeField": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala index 47bedb69c..e1e967ded 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -70,6 +70,11 @@ trait OpenSearchSuite extends BeforeAndAfterAll { | "number_of_replicas": "0" |}""".stripMargin + val multipleShardSetting = """{ + | "number_of_shards": "2", + | "number_of_replicas": "0" + |}""".stripMargin + def simpleIndex(indexName: String): Unit = { val mappings = """{ | "properties": { @@ -101,6 +106,19 @@ trait OpenSearchSuite extends BeforeAndAfterAll { | } |}""".stripMargin + val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin + index(indexName, multipleShardSetting, mappings, docs) + } + + def multipleShardAndDocIndex(indexName: String, N: Int): Unit = { + val mappings = """{ + | "properties": { + | "id": { + | "type": "integer" + | } + | } + |}""".stripMargin + val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin index(indexName, oneNodeSetting, mappings, docs) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 447a1aa5b..53188fb5a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -11,19 +11,15 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.mockito.Mockito.when -import org.opensearch.OpenSearchStatusException -import org.opensearch.client.json.jackson.JacksonJsonpMapper -import org.opensearch.client.opensearch.OpenSearchClient -import org.opensearch.client.transport.rest_client.RestClientTransport import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient -import org.opensearch.flint.table.{OpenSearchCluster, Table} +import org.opensearch.flint.core.table.OpenSearchCluster import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE} +import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { @@ -198,6 +194,46 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M } } + it should "read docs from index with multiple shard successfully" in { + val indexName = "t0001" + val expectedCount = 5 + withIndexName(indexName) { + multipleShardAndDocIndex(indexName, expectedCount) + val match_all = null + val reader = createTable(indexName, options).createReader(match_all) + + var totalCount = 0 + while (reader.hasNext) { + reader.next() + totalCount += 1 + } + totalCount shouldBe expectedCount + } + } + + it should "read docs from shard table successfully" in { + val indexName = "t0001" + val expectedCount = 5 + withIndexName(indexName) { + multipleShardAndDocIndex(indexName, expectedCount) + val match_all = null + val totalCount = createTable(indexName, options) + .slice() + .map(shardTable => { + val reader = shardTable.createReader(match_all) + var count = 0 + while (reader.hasNext) { + reader.next() + count += 1 + } + count + }) + .sum + + totalCount shouldBe expectedCount + } + } + it should "write docs to index successfully " in { val indexName = "t0001" withIndexName(indexName) {