From d6c769dc4733aef0158874b6541acb0695c49ce2 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 25 Sep 2023 17:26:39 -0700 Subject: [PATCH] Add FlintDelegatingSessionCatalog Signed-off-by: Peng Huo --- .../sql/FlintDelegatingSessionCatalog.java | 191 ++++++++++++++++++ .../FlintDelegatingSessionCatalogTest.scala | 47 +++++ 2 files changed, 238 insertions(+) create mode 100644 spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java create mode 100644 spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala diff --git a/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java new file mode 100644 index 000000000..86c4caba8 --- /dev/null +++ b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java @@ -0,0 +1,191 @@ +package org.opensearch.sql; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.CatalogExtension; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.util.Map; + +/** + * Delegate to V2SessionCatalog in current SparkSession. + * + * spark_catalog default catalog name. With FlintDelegatingSessionCatalog, User could customize + * catalog name, the implementation delegate to V2SessionCatalog in current SparkSession. + * + * Usage + * --conf spark.sql.catalog.mycatalog = org.opensearch.sql.FlintDelegatingSessionCatalog + */ +public class FlintDelegatingSessionCatalog implements CatalogExtension { + private CatalogPlugin delegate; + + public final void setDelegateCatalog(CatalogPlugin delegate) { + // do nothing + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public final void initialize(String name, CaseInsensitiveStringMap options) { + delegate = SparkSession.getActiveSession() + .get() + .sessionState() + .catalogManager().v2SessionCatalog(); + } + + @Override + public String[] defaultNamespace() { + return delegate.defaultNamespace(); + } + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + return asTableCatalog().listTables(namespace); + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + return asTableCatalog().loadTable(ident); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return asTableCatalog().loadTable(ident, timestamp); + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return asTableCatalog().loadTable(ident, version); + } + + @Override + public void invalidateTable(Identifier ident) { + asTableCatalog().invalidateTable(ident); + } + + @Override + public boolean tableExists(Identifier ident) { + return asTableCatalog().tableExists(ident); + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return asTableCatalog().createTable(ident, schema, partitions, properties); + } + + @Override + public Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException { + return asTableCatalog().alterTable(ident, changes); + } + + @Override + public boolean dropTable(Identifier ident) { + return asTableCatalog().dropTable(ident); + } + + @Override + public boolean purgeTable(Identifier ident) { + return asTableCatalog().purgeTable(ident); + } + + @Override + public void renameTable( + Identifier oldIdent, + Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { + asTableCatalog().renameTable(oldIdent, newIdent); + } + + @Override + public String[][] listNamespaces() throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(); + } + + @Override + public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(namespace); + } + + @Override + public boolean namespaceExists(String[] namespace) { + return asNamespaceCatalog().namespaceExists(namespace); + } + + @Override + public Map loadNamespaceMetadata( + String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().loadNamespaceMetadata(namespace); + } + + @Override + public void createNamespace( + String[] namespace, + Map metadata) throws NamespaceAlreadyExistsException { + asNamespaceCatalog().createNamespace(namespace, metadata); + } + + @Override + public void alterNamespace( + String[] namespace, + NamespaceChange... changes) throws NoSuchNamespaceException { + asNamespaceCatalog().alterNamespace(namespace, changes); + } + + @Override + public boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { + return asNamespaceCatalog().dropNamespace(namespace, cascade); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + return asFunctionCatalog().loadFunction(ident); + } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + return asFunctionCatalog().listFunctions(namespace); + } + + @Override + public boolean functionExists(Identifier ident) { + return asFunctionCatalog().functionExists(ident); + } + + private TableCatalog asTableCatalog() { + return (TableCatalog) delegate; + } + + private SupportsNamespaces asNamespaceCatalog() { + return (SupportsNamespaces) delegate; + } + + private FunctionCatalog asFunctionCatalog() { + return (FunctionCatalog) delegate; + } +} diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala new file mode 100644 index 000000000..7bd8d0397 --- /dev/null +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSparkSessionBase + +class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessionBase { + private val testTable = "mycatalog.default.flint_sql_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + spark.conf.set( + "spark.sql.catalog.mycatalog", + "org.opensearch.sql.FlintDelegatingSessionCatalog") + + // Create test table + spark.sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + |""".stripMargin) + + spark.sql(s""" + | INSERT INTO $testTable + | VALUES ('Hello', 30) + | """.stripMargin) + } + + test("test read from customized catalog") { + + val result = spark.sql(s"SELECT name, age FROM $testTable") + + checkAnswer(result, Seq(Row("Hello", 30))) + } +}