Skip to content

Commit

Permalink
add OpenSearchCatalog to enable direct access OpenSearch index as Table
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Jun 25, 2024
1 parent 0f53448 commit 3db9136
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 19 deletions.
66 changes: 66 additions & 0 deletions docs/img/opensearch-table.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# OpenSearch Table

## Overview

The `OpenSearchCatalog` class integrates Apache Spark with OpenSearch, allowing Spark to interact with OpenSearch indices as tables. This integration supports read and write operations, enabling seamless data processing and querying across Spark and OpenSearch.

## Configuration Parameters

To configure the `OpenSearchCatalog`, set the following parameters in your Spark session:

- **`opensearch.port`**: The port to connect to OpenSearch. Default is `9200`.
- **`opensearch.scheme`**: The scheme to use for the connection. Default is `http`. Valid values are `[http, https]`.
- **`opensearch.auth`**: The authentication method to use. Default is `noauth`. Valid values are `[noauth, sigv4, basic]`.
- **`opensearch.auth.username`**: The username for basic authentication.
- **`opensearch.auth.password`**: The password for basic authentication.
- **`opensearch.region`**: The AWS region to use for SigV4 authentication. Default is `us-west-2`. Used only when `auth` is `sigv4`.

## Usage

### Initializing the Catalog

To configure and initialize the catalog in your Spark session, set the following configurations:

```scala
spark.conf.set("spark.sql.catalog.dev", "org.apache.spark.opensearch.catalog.OpenSearchCatalog")
spark.conf.set("spark.sql.catalog.dev.opensearch.port", "9200")
spark.conf.set("spark.sql.catalog.dev.opensearch.scheme", "http")
spark.conf.set("spark.sql.catalog.dev.opensearch.auth", "noauth")
```

### Querying Data

Once the catalog is configured, you can use Spark SQL to query OpenSearch indices as tables:

- The namespace **MUST** be `default`.
- When using a wildcard index name or a comma-separated index name as the table, it **MUST** be wrapped in backticks.

Example:

```scala
val df = spark.sql("SELECT * FROM dev.default.my_index")
df.show()
```

Using a wildcard index name:
```scala
val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```

Using a comma-separated index name:
```scala
val df = spark.sql("SELECT * FROM dev.default.`index1,index2`")
df.show()
```




## Limitation
### Unsupported catalog operation
- List Tables: Not supported.
- Create Table: Not supported.
- Alter Table: Not supported.
- Drop Table: Not supported.
- Rename Table: Not supported.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface FlintClient {
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern);
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Retrieve metadata in a Flint index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class FlintOpenSearchClient implements FlintClient {
* excluding '*' because it's reserved for pattern matching.
*/
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');
Set.of(' ', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private final FlintOptions options;

Expand Down Expand Up @@ -101,11 +101,12 @@ public boolean exists(String indexName) {
}

@Override
public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
public Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern));
String[] indexNames =
Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexRequest request = new GetIndexRequest(indexNames);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
Expand All @@ -117,7 +118,8 @@ public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
)
));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
throw new IllegalStateException("Failed to get Flint index metadata for " +
String.join(",", indexNames), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.opensearch.catalog.OpenSearchCatalog.{OPENSEARCH_PREFIX}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.flint.FlintTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A Spark TableCatalog implementation wrap OpenSearch domain as Catalog.
*
* <p> Configuration parameters for OpenSearchCatalog:
*
* <ul> <li><code>opensearch.port</code>: Default is 9200.</li>
* <li><code>opensearch.scheme</code>: Default is http. Valid values are [http, https].</li>
* <li><code>opensearch.auth</code>: Default is noauth. Valid values are [noauth, sigv4,
* basic].</li> <li><code>opensearch.auth.username</code>: Basic auth username.</li>
* <li><code>opensearch.auth.password</code>: Basic auth password.</li>
* <li><code>opensearch.region</code>: Default is us-west-2. Only used when auth is sigv4.</li>
* </ul>
*/
class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging {

private var catalogName: String = _
private var options: CaseInsensitiveStringMap = _

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
this.catalogName = name
this.options = options
}

override def name(): String = catalogName

@throws[NoSuchNamespaceException]
override def listTables(namespace: Array[String]): Array[Identifier] = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support listTables")
}

@throws[NoSuchTableException]
override def loadTable(ident: Identifier): Table = {
logInfo(s"Loading table ${ident.name()}")
if (!ident.namespace().exists(n => OpenSearchCatalog.isDefaultNamespace(n))) {
throw new NoSuchTableException(ident.namespace().mkString("."), ident.name())
}

val conf = new java.util.HashMap[String, String](
removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX))
conf.put("path", ident.name())

FlintTable(conf, Option.empty)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support createTable")
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support alterTable")
}

override def dropTable(ident: Identifier): Boolean = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support dropTable")
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support renameTable")
}

private def removePrefixFromMap(
map: java.util.Map[String, String],
prefix: String): java.util.Map[String, String] = {
val result = new java.util.HashMap[String, String]()
map.forEach { (key, value) =>
if (key.startsWith(prefix)) {
val newKey = key.substring(prefix.length)
result.put(newKey, value)
} else {
result.put(key, value)
}
}
result
}
}

object OpenSearchCatalog {

/**
* The reserved namespace.
*/
val RESERVED_DEFAULT_NAMESPACE: String = "default"

/**
* The prefix for OpenSearch-related configuration keys.
*/
val OPENSEARCH_PREFIX: String = "opensearch."

/**
* Checks if the given namespace is the reserved default namespace.
*
* @param namespace
* The namespace to check.
* @return
* True if the namespace is the reserved default namespace, false otherwise.
*/
def isDefaultNamespace(namespace: String): Boolean = {
RESERVED_DEFAULT_NAMESPACE.equalsIgnoreCase(namespace)
}

/**
* Splits the table name into index names.
*
* @param tableName
* The name of the table, potentially containing comma-separated index names.
* @return
* An array of index names.
*/
def indexNames(tableName: String): Array[String] = {
tableName.split(",")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package org.apache.spark.sql.flint

import java.util

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.opensearch.catalog.OpenSearchCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
Expand Down Expand Up @@ -37,19 +40,19 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio

lazy val name: String = flintSparkConf.tableName()

var schema: StructType = {
if (schema == null) {
schema = if (userSpecifiedSchema.isDefined) {
userSpecifiedSchema.get
} else {
FlintDataType.deserialize(
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getIndexMetadata(name)
.getContent)
}
// todo. currently, we use first index schema in multiple indices. we should merge StructType
// to widen type
lazy val schema: StructType = {
userSpecifiedSchema.getOrElse {
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*)
.values()
.asScala
.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}
schema
}

override def capabilities(): util.Set[TableCapability] =
Expand Down
Loading

0 comments on commit 3db9136

Please sign in to comment.