From 42530a8bbc9ca72b82724504c076d8fefa5ef1ba Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 16:28:59 -0700 Subject: [PATCH] update the flint client createIndex api add metadata creation parameters Signed-off-by: YANGDB --- .../flint/core/metadata/FlintMetadata.scala | 22 ++++++++++++------- .../core/storage/FlintOpenSearchClient.java | 10 +++++---- .../core/metadata/FlintMetadataSuite.scala | 2 +- .../apache/spark/sql/flint/FlintTable.scala | 2 +- .../opensearch/flint/spark/FlintSpark.scala | 14 +++++++----- .../covering/FlintSparkCoveringIndex.scala | 7 +++--- .../spark/mv/FlintSparkMaterializedView.scala | 7 +++--- .../FlintSparkCoveringIndexSuite.scala | 6 ++--- .../mv/FlintSparkMaterializedViewSuite.scala | 22 +++++++++---------- .../FlintSparkSkippingIndexSuite.scala | 2 +- .../core/FlintOpenSearchClientSuite.scala | 8 +++---- .../FlintSparkCoveringIndexITSuite.scala | 2 +- .../FlintSparkMaterializedViewITSuite.scala | 10 +++++---- .../FlintSparkSkippingIndexITSuite.scala | 6 ++--- 14 files changed, 67 insertions(+), 53 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 98b9dbfec..ae34b82df 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -48,24 +48,30 @@ case class FlintMetadata( * @return * JSON content */ - def getContent: String = { + def getContent(includeProperties: Boolean = true): String = { try { buildJson(builder => { // Add _meta field objectField(builder, "_meta") { builder .field("version", version.version) - .field("targetName", targetName.getOrElse(name) ) .field("name", name) .field("kind", kind) .field("source", source) .field("indexedColumns", indexedColumns) + // Only add targetName if it's not empty + targetName.foreach(tn => builder.field("targetName", tn)) optionalObjectField(builder, "options", options) - optionalObjectField(builder, "properties", properties) + + if (includeProperties) { + optionalObjectField(builder, "properties", properties) + } } // Add properties (schema) field - builder.field("properties", schema) + if (includeProperties) { + builder.field("properties", schema) + } }) } catch { case e: Exception => @@ -111,7 +117,7 @@ object FlintMetadata { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) - case "targetName" => builder.targetName(parser.text()) + case "targetName" => builder.targetName(Option.apply(parser.text())) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => @@ -144,8 +150,8 @@ object FlintMetadata { */ class Builder { private var version: FlintVersion = FlintVersion.current() - private var name: String = "" private var targetName: Option[String] = None + private var name: String = "" private var kind: String = "" private var source: String = "" private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() @@ -164,8 +170,8 @@ object FlintMetadata { this } - def targetName(name: String): this.type = { - this.targetName = Option(name) + def targetName(name: Option[String]): this.type = { + this.targetName = name this } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 8ce130c35..59395137d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -110,10 +110,12 @@ public void createIndex(String indexName, FlintMetadata metadata) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - request.mapping(metadata.getContent(), XContentType.JSON); - metadata.targetName().exists(name -> { - return request.alias(new Alias(toLowercase(metadata.name()))); - }); + boolean includeMappingProperties = true; + if(metadata.targetName().nonEmpty()) { + request.alias(new Alias(toLowercase(metadata.name()))); + includeMappingProperties = false; + } + request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); Option settings = metadata.indexSettings(); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index dc2f5fe6a..991eb51cb 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -62,6 +62,6 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") val metadata = builder.build() - metadata.getContent should matchJson(testMetadataJson) + metadata.getContent() should matchJson(testMetadataJson) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c078f7fb6..318cb9dab 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -46,7 +46,7 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio FlintClientBuilder .build(flintSparkConf.flintOptions()) .getIndexMetadata(name) - .getContent) + .getContent()) } } schema diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index e112a354d..a1016a2a5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -86,13 +86,15 @@ class FlintSpark(val spark: SparkSession) { if (targetName.nonEmpty) { //use targetIndex as the index to store the acceleration data flintClient.alias(targetName.get, indexName, index.metadata()) - } else if (flintClient.exists(indexName)) { - if (!ignoreIfExists) { - throw new IllegalStateException(s"Flint index $indexName already exists") - } } else { - val metadata = index.metadata() - flintClient.createIndex(indexName, metadata) + if (flintClient.exists(indexName)) { + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + val metadata = index.metadata() + flintClient.createIndex(indexName, metadata) + } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 60d0fef93..7ed63713d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -59,6 +59,7 @@ case class FlintSparkCoveringIndex( metadataBuilder(this) .name(indexName) + .targetName(targetIndexName) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -104,7 +105,7 @@ object FlintSparkCoveringIndex { /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { - private var targetIndexName: String = "" + private var targetIndexName: Option[String] = None private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() @@ -130,7 +131,7 @@ object FlintSparkCoveringIndex { * index builder */ def targetName(indexName: String): Builder = { - this.targetIndexName = indexName + this.targetIndexName = Option.apply(indexName) this } @@ -163,6 +164,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex(targetIndexName, indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 567bc13e4..a449f0bc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -69,6 +69,7 @@ case class FlintSparkMaterializedView( metadataBuilder(this) .name(mvName) + .targetName(targetIndexName) .source(query) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -159,7 +160,7 @@ object FlintSparkMaterializedView { /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { - private var targetIndexName: String = "" + private var targetIndexName: Option[String] = None private var mvName: String = "" private var query: String = "" @@ -172,7 +173,7 @@ object FlintSparkMaterializedView { * index builder */ def targetName(indexName: String): Builder = { - this.targetIndexName = indexName + this.targetIndexName = Option.apply(indexName) this } @@ -211,7 +212,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.typeName } .toMap - FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(targetIndexName, mvName, query, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..cc17d5190 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -13,12 +13,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("get covering index name") { val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + new FlintSparkCoveringIndex(None, "ci", "spark_catalog.default.test", Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } test("should fail if get index name without full table name") { - val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) + val index = new FlintSparkCoveringIndex(None, "ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { index.name() } @@ -26,7 +26,7 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { - new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + new FlintSparkCoveringIndex(None, "ci", "default.test", Map.empty) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..08b33343c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -36,20 +36,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = "SELECT 1" test("get name") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "mv", testQuery, Map.empty).name() the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("default.mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "default.mv", testQuery, Map.empty).name() } test("get metadata") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map("test_col" -> "integer")) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer")) val metadata = mv.metadata() metadata.name shouldBe mv.mvName @@ -64,7 +64,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val indexSettings = """{"number_of_shards": 2}""" val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "index_settings" -> indexSettings)) - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer"), @@ -77,12 +77,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build batch data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.build(spark, None).collect() shouldBe Array(Row(1)) } test("should fail if build given other source data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame])) } @@ -100,7 +100,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -127,7 +127,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -145,7 +145,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, s"SELECT name, age FROM $testTable WHERE age > 30", Map.empty) @@ -164,7 +164,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..12dc81c34 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -345,7 +345,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { } private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = { - val actual = parse(metadata.getContent) \ "properties" + val actual = parse(metadata.getContent()) \ "properties" assert(actual == parse(expected)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 5c799128c..779e025f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -46,7 +46,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M |""".stripMargin val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn(content) + when(metadata.getContent()).thenReturn(content) when(metadata.indexSettings).thenReturn(None) flintClient.createIndex(indexName, metadata) @@ -58,7 +58,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent()).thenReturn("{}") when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) @@ -73,14 +73,14 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent()).thenReturn("{}") when(metadata.indexSettings).thenReturn(None) flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.getContent() should not be empty) allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a4b0069dd..cbb71a98f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -44,7 +44,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "version": "${current()}", | "name": "name_and_age", diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 81b3819d9..e7fb7a1f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -22,6 +22,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.mv_test" private val testMvName = "spark_catalog.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) + private val testTargetIndex = "existing_index" private val testQuery = s""" | SELECT @@ -53,7 +54,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", @@ -87,12 +88,12 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create materialized view using existing OpebnSearch index successfully") { + test("create materialized view using existing OpenSearch index successfully") { val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) flint .materializedView() - .targetName("existing_index") + .targetName(testTargetIndex) .name(testMvName) .query(testQuery) .options(indexOptions) @@ -100,12 +101,13 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex("existing_index") index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", | "name": "spark_catalog.default.mv_test_metrics", | "kind": "mv", + | "targetName": "$testTargetIndex", | "source": "$testQuery", | "indexedColumns": [ | { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..efcec8ee6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -52,7 +52,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_test_skipping_index", | "version": "${current()}", @@ -123,7 +123,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined val optionJson = compact(render( - parse(index.get.metadata().getContent) \ "_meta" \ "options")) + parse(index.get.metadata().getContent()) \ "_meta" \ "options")) optionJson should matchJson(""" | { | "auto_refresh": "true", @@ -449,7 +449,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson( + index.get.metadata().getContent() should matchJson( s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index",