From 3db9136431320f69bf95898ff6c1bcc71e3f820f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 25 Jun 2024 16:39:34 -0700 Subject: [PATCH] add OpenSearchCatalog to enable direct access OpenSearch index as Table Signed-off-by: Peng Huo --- docs/img/opensearch-table.md | 66 +++++++++ .../opensearch/flint/core/FlintClient.java | 2 +- .../core/storage/FlintOpenSearchClient.java | 14 +- .../catalog/OpenSearchCatalog.scala | 132 +++++++++++++++++ .../apache/spark/sql/flint/FlintTable.scala | 27 ++-- .../catalog/OpenSearchCatalogTest.scala | 134 ++++++++++++++++++ .../catalog/OpenSearchCatalogITSuite.scala | 74 ++++++++++ 7 files changed, 430 insertions(+), 19 deletions(-) create mode 100644 docs/img/opensearch-table.md create mode 100644 flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala create mode 100644 flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala create mode 100644 integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala diff --git a/docs/img/opensearch-table.md b/docs/img/opensearch-table.md new file mode 100644 index 000000000..71cd5c6f0 --- /dev/null +++ b/docs/img/opensearch-table.md @@ -0,0 +1,66 @@ +# OpenSearch Table + +## Overview + +The `OpenSearchCatalog` class integrates Apache Spark with OpenSearch, allowing Spark to interact with OpenSearch indices as tables. This integration supports read and write operations, enabling seamless data processing and querying across Spark and OpenSearch. + +## Configuration Parameters + +To configure the `OpenSearchCatalog`, set the following parameters in your Spark session: + +- **`opensearch.port`**: The port to connect to OpenSearch. Default is `9200`. +- **`opensearch.scheme`**: The scheme to use for the connection. Default is `http`. Valid values are `[http, https]`. +- **`opensearch.auth`**: The authentication method to use. Default is `noauth`. Valid values are `[noauth, sigv4, basic]`. +- **`opensearch.auth.username`**: The username for basic authentication. +- **`opensearch.auth.password`**: The password for basic authentication. +- **`opensearch.region`**: The AWS region to use for SigV4 authentication. Default is `us-west-2`. Used only when `auth` is `sigv4`. + +## Usage + +### Initializing the Catalog + +To configure and initialize the catalog in your Spark session, set the following configurations: + +```scala +spark.conf.set("spark.sql.catalog.dev", "org.apache.spark.opensearch.catalog.OpenSearchCatalog") +spark.conf.set("spark.sql.catalog.dev.opensearch.port", "9200") +spark.conf.set("spark.sql.catalog.dev.opensearch.scheme", "http") +spark.conf.set("spark.sql.catalog.dev.opensearch.auth", "noauth") +``` + +### Querying Data + +Once the catalog is configured, you can use Spark SQL to query OpenSearch indices as tables: + +- The namespace **MUST** be `default`. +- When using a wildcard index name or a comma-separated index name as the table, it **MUST** be wrapped in backticks. + +Example: + +```scala +val df = spark.sql("SELECT * FROM dev.default.my_index") +df.show() +``` + +Using a wildcard index name: +```scala +val df = spark.sql("SELECT * FROM dev.default.`my_index*`") +df.show() +``` + +Using a comma-separated index name: +```scala +val df = spark.sql("SELECT * FROM dev.default.`index1,index2`") +df.show() +``` + + + + +## Limitation +### Unsupported catalog operation +- List Tables: Not supported. +- Create Table: Not supported. +- Alter Table: Not supported. +- Drop Table: Not supported. +- Rename Table: Not supported. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index b9ef05851..e5e18f126 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -40,7 +40,7 @@ public interface FlintClient { * @return map where the keys are the matched index names, and the values are * corresponding index metadata */ - Map getAllIndexMetadata(String indexNamePattern); + Map getAllIndexMetadata(String... indexNamePattern); /** * Retrieve metadata in a Flint index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 36db4a040..7d56e3011 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -60,7 +60,7 @@ public class FlintOpenSearchClient implements FlintClient { * excluding '*' because it's reserved for pattern matching. */ private final static Set INVALID_INDEX_NAME_CHARS = - Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + Set.of(' ', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); private final FlintOptions options; @@ -101,11 +101,12 @@ public boolean exists(String indexName) { } @Override - public Map getAllIndexMetadata(String indexNamePattern) { - LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); - String osIndexNamePattern = sanitizeIndexName(indexNamePattern); + public Map getAllIndexMetadata(String... indexNamePattern) { + LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern)); + String[] indexNames = + Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new); try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); + GetIndexRequest request = new GetIndexRequest(indexNames); GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) @@ -117,7 +118,8 @@ public Map getAllIndexMetadata(String indexNamePattern) { ) )); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + + String.join(",", indexNames), e); } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala new file mode 100644 index 000000000..99e86e9ff --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import org.apache.spark.internal.Logging +import org.apache.spark.opensearch.catalog.OpenSearchCatalog.{OPENSEARCH_PREFIX} +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.flint.FlintTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A Spark TableCatalog implementation wrap OpenSearch domain as Catalog. + * + *

Configuration parameters for OpenSearchCatalog: + * + *

  • opensearch.port: Default is 9200.
  • + *
  • opensearch.scheme: Default is http. Valid values are [http, https].
  • + *
  • opensearch.auth: Default is noauth. Valid values are [noauth, sigv4, + * basic].
  • opensearch.auth.username: Basic auth username.
  • + *
  • opensearch.auth.password: Basic auth password.
  • + *
  • opensearch.region: Default is us-west-2. Only used when auth is sigv4.
  • + *
+ */ +class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging { + + private var catalogName: String = _ + private var options: CaseInsensitiveStringMap = _ + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + this.catalogName = name + this.options = options + } + + override def name(): String = catalogName + + @throws[NoSuchNamespaceException] + override def listTables(namespace: Array[String]): Array[Identifier] = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support listTables") + } + + @throws[NoSuchTableException] + override def loadTable(ident: Identifier): Table = { + logInfo(s"Loading table ${ident.name()}") + if (!ident.namespace().exists(n => OpenSearchCatalog.isDefaultNamespace(n))) { + throw new NoSuchTableException(ident.namespace().mkString("."), ident.name()) + } + + val conf = new java.util.HashMap[String, String]( + removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX)) + conf.put("path", ident.name()) + + FlintTable(conf, Option.empty) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support createTable") + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support alterTable") + } + + override def dropTable(ident: Identifier): Boolean = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support dropTable") + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support renameTable") + } + + private def removePrefixFromMap( + map: java.util.Map[String, String], + prefix: String): java.util.Map[String, String] = { + val result = new java.util.HashMap[String, String]() + map.forEach { (key, value) => + if (key.startsWith(prefix)) { + val newKey = key.substring(prefix.length) + result.put(newKey, value) + } else { + result.put(key, value) + } + } + result + } +} + +object OpenSearchCatalog { + + /** + * The reserved namespace. + */ + val RESERVED_DEFAULT_NAMESPACE: String = "default" + + /** + * The prefix for OpenSearch-related configuration keys. + */ + val OPENSEARCH_PREFIX: String = "opensearch." + + /** + * Checks if the given namespace is the reserved default namespace. + * + * @param namespace + * The namespace to check. + * @return + * True if the namespace is the reserved default namespace, false otherwise. + */ + def isDefaultNamespace(namespace: String): Boolean = { + RESERVED_DEFAULT_NAMESPACE.equalsIgnoreCase(namespace) + } + + /** + * Splits the table name into index names. + * + * @param tableName + * The name of the table, potentially containing comma-separated index names. + * @return + * An array of index names. + */ + def indexNames(tableName: String): Array[String] = { + tableName.split(",") + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c078f7fb6..c99eb0c7c 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -7,8 +7,11 @@ package org.apache.spark.sql.flint import java.util +import scala.collection.JavaConverters._ + import org.opensearch.flint.core.FlintClientBuilder +import org.apache.spark.opensearch.catalog.OpenSearchCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} @@ -37,19 +40,19 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio lazy val name: String = flintSparkConf.tableName() - var schema: StructType = { - if (schema == null) { - schema = if (userSpecifiedSchema.isDefined) { - userSpecifiedSchema.get - } else { - FlintDataType.deserialize( - FlintClientBuilder - .build(flintSparkConf.flintOptions()) - .getIndexMetadata(name) - .getContent) - } + // todo. currently, we use first index schema in multiple indices. we should merge StructType + // to widen type + lazy val schema: StructType = { + userSpecifiedSchema.getOrElse { + FlintClientBuilder + .build(flintSparkConf.flintOptions()) + .getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*) + .values() + .asScala + .headOption + .map(m => FlintDataType.deserialize(m.getContent)) + .getOrElse(StructType(Nil)) } - schema } override def capabilities(): util.Set[TableCapability] = diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala b/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala new file mode 100644 index 000000000..196cd6b01 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import scala.collection.JavaConverters._ + +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.flint.FlintTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class OpenSearchCatalogTest + extends FlintSuite + with Matchers + with BeforeAndAfterEach + with MockitoSugar { + + private var catalog: OpenSearchCatalog = _ + private val catalogName = "dev" + private val optionsMap = Map( + "opensearch.port" -> "9200", + "opensearch.scheme" -> "http", + "opensearch.auth" -> "noauth", + "some.other.config" -> "value") + private val options = new CaseInsensitiveStringMap(optionsMap.asJava) + + override def beforeEach(): Unit = { + catalog = new OpenSearchCatalog() + catalog.initialize(catalogName, options) + } + + test("Catalog should initialize with given name and options") { + catalog.name() should be(catalogName) + } + + test("listTables should throw UnsupportedOperationException") { + val namespace = Array("default") + + intercept[UnsupportedOperationException] { + catalog.listTables(namespace) + } + } + + test("loadTable should throw NoSuchTableException if namespace is not default") { + val identifier = mock[Identifier] + when(identifier.namespace()).thenReturn(Array("non-default")) + when(identifier.name()).thenReturn("table1") + + intercept[NoSuchTableException] { + catalog.loadTable(identifier) + } + } + + test("loadTable should load table if namespace is default") { + val identifier = mock[Identifier] + when(identifier.namespace()).thenReturn(Array("default")) + when(identifier.name()).thenReturn("table1") + + val table = catalog.loadTable(identifier) + table should not be null + table.name() should be("table1") + + val flintTableConf = FlintTable.unapply(table.asInstanceOf[FlintTable]).get._1 + flintTableConf.get("port") should be("9200") + flintTableConf.get("scheme") should be("http") + flintTableConf.get("auth") should be("noauth") + flintTableConf.get("some.other.config") should be("value") + flintTableConf.containsKey("opensearch.port") should be(false) + flintTableConf.containsKey("opensearch.scheme") should be(false) + flintTableConf.containsKey("opensearch.auth") should be(false) + } + + test("createTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + val schema = mock[StructType] + val partitions = Array.empty[Transform] + val properties = Map.empty[String, String].asJava + + intercept[UnsupportedOperationException] { + catalog.createTable(identifier, schema, partitions, properties) + } + } + + test("alterTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + val changes = Array.empty[TableChange] + + intercept[UnsupportedOperationException] { + catalog.alterTable(identifier, changes: _*) + } + } + + test("dropTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + + intercept[UnsupportedOperationException] { + catalog.dropTable(identifier) + } + } + + test("renameTable should throw UnsupportedOperationException") { + val oldIdentifier = mock[Identifier] + val newIdentifier = mock[Identifier] + + intercept[UnsupportedOperationException] { + catalog.renameTable(oldIdentifier, newIdentifier) + } + } + + test("isDefaultNamespace should return true for default namespace") { + OpenSearchCatalog.isDefaultNamespace("default") should be(true) + } + + test("isDefaultNamespace should return false for non-default namespace") { + OpenSearchCatalog.isDefaultNamespace("non-default") should be(false) + } + + test("indexNames should split table name by comma") { + val tableName = "index-1,index-2" + val result = OpenSearchCatalog.indexNames(tableName) + result should contain theSameElementsAs Array("index-1", "index-2") + } +} diff --git a/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala new file mode 100644 index 000000000..fa3bd3726 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import org.opensearch.flint.OpenSearchSuite + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.streaming.StreamTest + +class OpenSearchCatalogITSuite + extends QueryTest + with StreamTest + with FlintSuite + with OpenSearchSuite { + + private val catalogName = "dev" + + override def beforeAll(): Unit = { + super.beforeAll() + + spark.conf.set( + s"spark.sql.catalog.${catalogName}", + "org.apache.spark.opensearch.catalog.OpenSearchCatalog") + spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.port", s"$openSearchPort") + spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.host", openSearchHost) + } + + test("Load single index as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.${indexName}""") + + assert(df.count() == 1) + checkAnswer(df, Row("123", "event", "source")) + } + } + + test("Load index wildcard expression as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.`t*`""") + + assert(df.count() == 1) + checkAnswer(df, Row("123", "event", "source")) + } + } + + test("Load comma seperated index expression as table") { + val indexName1 = "t0001" + val indexName2 = "t0002" + withIndexName(indexName1) { + withIndexName(indexName2) { + simpleIndex(indexName1) + simpleIndex(indexName2) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.`t0001,t0002`""") + + assert(df.count() == 2) + checkAnswer(df, Seq(Row("123", "event", "source"), Row("123", "event", "source"))) + } + } + } +}