diff --git a/docs/opensearch-table.md b/docs/opensearch-table.md index 3a12b01e3..2f617eb1c 100644 --- a/docs/opensearch-table.md +++ b/docs/opensearch-table.md @@ -49,9 +49,12 @@ df.show() ``` ## Limitation -### Unsupported catalog operation +### catalog operation - List Tables: Not supported. - Create Table: Not supported. - Alter Table: Not supported. - Drop Table: Not supported. - Rename Table: Not supported. + +### table operation +- Table only support read operation, for instance, SELECT, DESCRIBE. diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala index 71ea965ab..3594f41de 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala @@ -10,7 +10,7 @@ import org.apache.spark.opensearch.catalog.OpenSearchCatalog.OPENSEARCH_PREFIX import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.flint.FlintTable +import org.apache.spark.sql.flint.FlintReadOnlyTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -55,7 +55,7 @@ class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging { removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX)) conf.put("path", ident.name()) - FlintTable(conf, Option.empty) + new FlintReadOnlyTable(conf, Option.empty) } override def createTable( diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala new file mode 100644 index 000000000..9c2651af3 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.flint + +import java.util + +import scala.collection.JavaConverters._ + +import org.opensearch.flint.core.FlintClientBuilder + +import org.apache.spark.opensearch.catalog.OpenSearchCatalog +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * FlintReadOnlyTable. + * + * @param conf + * configuration + * @param userSpecifiedSchema + * userSpecifiedSchema + */ +class FlintReadOnlyTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType]) + extends Table + with SupportsRead { + + lazy val sparkSession = SparkSession.active + + lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf) + + lazy val name: String = flintSparkConf.tableName() + + // todo. currently, we use first index schema in multiple indices. we should merge StructType + // to widen type + lazy val schema: StructType = { + userSpecifiedSchema.getOrElse { + FlintClientBuilder + .build(flintSparkConf.flintOptions()) + .getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*) + .values() + .asScala + .headOption + .map(m => FlintDataType.deserialize(m.getContent)) + .getOrElse(StructType(Nil)) + } + } + + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE) + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + FlintScanBuilder(name, schema, flintSparkConf) + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c99eb0c7c..2226eea4b 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -7,20 +7,9 @@ package org.apache.spark.sql.flint import java.util -import scala.collection.JavaConverters._ - -import org.opensearch.flint.core.FlintClientBuilder - -import org.apache.spark.opensearch.catalog.OpenSearchCatalog -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} -import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * FlintTable. @@ -30,38 +19,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * userSpecifiedSchema */ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType]) - extends Table - with SupportsRead + extends FlintReadOnlyTable(conf, userSpecifiedSchema) with SupportsWrite { - lazy val sparkSession = SparkSession.active - - lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf) - - lazy val name: String = flintSparkConf.tableName() - - // todo. currently, we use first index schema in multiple indices. we should merge StructType - // to widen type - lazy val schema: StructType = { - userSpecifiedSchema.getOrElse { - FlintClientBuilder - .build(flintSparkConf.flintOptions()) - .getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*) - .values() - .asScala - .headOption - .map(m => FlintDataType.deserialize(m.getContent)) - .getOrElse(StructType(Nil)) - } - } - - override def capabilities(): util.Set[TableCapability] = - util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE) - - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - FlintScanBuilder(name, schema, flintSparkConf) - } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { FlintWriteBuilder(name, info, flintSparkConf) } diff --git a/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala index f90817ff6..ed331fa0d 100644 --- a/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala @@ -8,7 +8,7 @@ package org.apache.spark.opensearch.catalog import org.opensearch.flint.OpenSearchSuite import org.apache.spark.FlintSuite -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.streaming.StreamTest class OpenSearchCatalogITSuite @@ -27,6 +27,9 @@ class OpenSearchCatalogITSuite "org.apache.spark.opensearch.catalog.OpenSearchCatalog") spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.port", s"$openSearchPort") spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.host", openSearchHost) + spark.conf.set( + s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy", + "wait_for") } test("Load single index as table") { @@ -35,13 +38,68 @@ class OpenSearchCatalogITSuite simpleIndex(indexName) val df = spark.sql(s""" SELECT accountId, eventName, eventSource - FROM ${catalogName}.default.${indexName}""") + FROM ${catalogName}.default.$indexName""") assert(df.count() == 1) checkAnswer(df, Row("123", "event", "source")) } } + test("Describe single index as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + DESC ${catalogName}.default.$indexName""") + + assert(df.count() == 6) + checkAnswer( + df, + Seq( + Row("# Partitioning", "", ""), + Row("", "", ""), + Row("Not partitioned", "", ""), + Row("accountId", "string", ""), + Row("eventName", "string", ""), + Row("eventSource", "string", ""))) + } + } + + test("Failed to write value to readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s""" + INSERT INTO ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""") + } + assert(exception.getMessage.contains(s"Table does not support writes: $indexName")) + } + } + + test("Failed to delete value from readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s"DELETE FROM ${catalogName}.default.$indexName WHERE accountId = '234'") + } + assert(exception.getMessage.contains(s"Table does not support deletes: $indexName")) + } + } + + test("Failed to override value of readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s""" + INSERT OVERWRITE TABLE ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""") + } + assert(exception.getMessage.contains(s"Table does not support writes: $indexName")) + } + } + test("Load index wildcard expression as table") { val indexName = "t0001" withIndexName(indexName) {