Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Jun 27, 2024
1 parent 12473d3 commit b4c7baa
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 47 deletions.
5 changes: 4 additions & 1 deletion docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ df.show()
```

## Limitation
### Unsupported catalog operation
### catalog operation
- List Tables: Not supported.
- Create Table: Not supported.
- Alter Table: Not supported.
- Drop Table: Not supported.
- Rename Table: Not supported.

### table operation
- Table only support read operation, for instance, SELECT, DESCRIBE.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.opensearch.catalog.OpenSearchCatalog.OPENSEARCH_PREFIX
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.flint.FlintTable
import org.apache.spark.sql.flint.FlintReadOnlyTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -55,7 +55,7 @@ class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging {
removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX))
conf.put("path", ident.name())

FlintTable(conf, Option.empty)
new FlintReadOnlyTable(conf, Option.empty)
}

override def createTable(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql.flint

import java.util

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.opensearch.catalog.OpenSearchCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* FlintReadOnlyTable.
*
* @param conf
* configuration
* @param userSpecifiedSchema
* userSpecifiedSchema
*/
class FlintReadOnlyTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType])
extends Table
with SupportsRead {

lazy val sparkSession = SparkSession.active

lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf)

lazy val name: String = flintSparkConf.tableName()

// todo. currently, we use first index schema in multiple indices. we should merge StructType
// to widen type
lazy val schema: StructType = {
userSpecifiedSchema.getOrElse {
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*)
.values()
.asScala
.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}
}

override def capabilities(): util.Set[TableCapability] =
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, flintSparkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,9 @@ package org.apache.spark.sql.flint

import java.util

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.opensearch.catalog.OpenSearchCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* FlintTable.
Expand All @@ -30,38 +19,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* userSpecifiedSchema
*/
case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType])
extends Table
with SupportsRead
extends FlintReadOnlyTable(conf, userSpecifiedSchema)
with SupportsWrite {

lazy val sparkSession = SparkSession.active

lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf)

lazy val name: String = flintSparkConf.tableName()

// todo. currently, we use first index schema in multiple indices. we should merge StructType
// to widen type
lazy val schema: StructType = {
userSpecifiedSchema.getOrElse {
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*)
.values()
.asScala
.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}
}

override def capabilities(): util.Set[TableCapability] =
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, flintSparkConf)
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
FlintWriteBuilder(name, info, flintSparkConf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.apache.spark.opensearch.catalog
import org.opensearch.flint.OpenSearchSuite

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.streaming.StreamTest

class OpenSearchCatalogITSuite
Expand All @@ -27,6 +27,9 @@ class OpenSearchCatalogITSuite
"org.apache.spark.opensearch.catalog.OpenSearchCatalog")
spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.port", s"$openSearchPort")
spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.host", openSearchHost)
spark.conf.set(
s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy",
"wait_for")
}

test("Load single index as table") {
Expand All @@ -35,13 +38,68 @@ class OpenSearchCatalogITSuite
simpleIndex(indexName)
val df = spark.sql(s"""
SELECT accountId, eventName, eventSource
FROM ${catalogName}.default.${indexName}""")
FROM ${catalogName}.default.$indexName""")

assert(df.count() == 1)
checkAnswer(df, Row("123", "event", "source"))
}
}

test("Describe single index as table") {
val indexName = "t0001"
withIndexName(indexName) {
simpleIndex(indexName)
val df = spark.sql(s"""
DESC ${catalogName}.default.$indexName""")

assert(df.count() == 6)
checkAnswer(
df,
Seq(
Row("# Partitioning", "", ""),
Row("", "", ""),
Row("Not partitioned", "", ""),
Row("accountId", "string", ""),
Row("eventName", "string", ""),
Row("eventSource", "string", "")))
}
}

test("Failed to write value to readonly table") {
val indexName = "t0001"
withIndexName(indexName) {
simpleIndex(indexName)
val exception = intercept[AnalysisException] {
spark.sql(s"""
INSERT INTO ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""")
}
assert(exception.getMessage.contains(s"Table does not support writes: $indexName"))
}
}

test("Failed to delete value from readonly table") {
val indexName = "t0001"
withIndexName(indexName) {
simpleIndex(indexName)
val exception = intercept[AnalysisException] {
spark.sql(s"DELETE FROM ${catalogName}.default.$indexName WHERE accountId = '234'")
}
assert(exception.getMessage.contains(s"Table does not support deletes: $indexName"))
}
}

test("Failed to override value of readonly table") {
val indexName = "t0001"
withIndexName(indexName) {
simpleIndex(indexName)
val exception = intercept[AnalysisException] {
spark.sql(s"""
INSERT OVERWRITE TABLE ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""")
}
assert(exception.getMessage.contains(s"Table does not support writes: $indexName"))
}
}

test("Load index wildcard expression as table") {
val indexName = "t0001"
withIndexName(indexName) {
Expand Down

0 comments on commit b4c7baa

Please sign in to comment.