From d6c769dc4733aef0158874b6541acb0695c49ce2 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 25 Sep 2023 17:26:39 -0700 Subject: [PATCH 1/2] Add FlintDelegatingSessionCatalog Signed-off-by: Peng Huo --- .../sql/FlintDelegatingSessionCatalog.java | 191 ++++++++++++++++++ .../FlintDelegatingSessionCatalogTest.scala | 47 +++++ 2 files changed, 238 insertions(+) create mode 100644 spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java create mode 100644 spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala diff --git a/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java new file mode 100644 index 000000000..86c4caba8 --- /dev/null +++ b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java @@ -0,0 +1,191 @@ +package org.opensearch.sql; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.CatalogExtension; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.util.Map; + +/** + * Delegate to V2SessionCatalog in current SparkSession. + * + * spark_catalog default catalog name. With FlintDelegatingSessionCatalog, User could customize + * catalog name, the implementation delegate to V2SessionCatalog in current SparkSession. + * + * Usage + * --conf spark.sql.catalog.mycatalog = org.opensearch.sql.FlintDelegatingSessionCatalog + */ +public class FlintDelegatingSessionCatalog implements CatalogExtension { + private CatalogPlugin delegate; + + public final void setDelegateCatalog(CatalogPlugin delegate) { + // do nothing + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public final void initialize(String name, CaseInsensitiveStringMap options) { + delegate = SparkSession.getActiveSession() + .get() + .sessionState() + .catalogManager().v2SessionCatalog(); + } + + @Override + public String[] defaultNamespace() { + return delegate.defaultNamespace(); + } + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + return asTableCatalog().listTables(namespace); + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + return asTableCatalog().loadTable(ident); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return asTableCatalog().loadTable(ident, timestamp); + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return asTableCatalog().loadTable(ident, version); + } + + @Override + public void invalidateTable(Identifier ident) { + asTableCatalog().invalidateTable(ident); + } + + @Override + public boolean tableExists(Identifier ident) { + return asTableCatalog().tableExists(ident); + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return asTableCatalog().createTable(ident, schema, partitions, properties); + } + + @Override + public Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException { + return asTableCatalog().alterTable(ident, changes); + } + + @Override + public boolean dropTable(Identifier ident) { + return asTableCatalog().dropTable(ident); + } + + @Override + public boolean purgeTable(Identifier ident) { + return asTableCatalog().purgeTable(ident); + } + + @Override + public void renameTable( + Identifier oldIdent, + Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { + asTableCatalog().renameTable(oldIdent, newIdent); + } + + @Override + public String[][] listNamespaces() throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(); + } + + @Override + public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(namespace); + } + + @Override + public boolean namespaceExists(String[] namespace) { + return asNamespaceCatalog().namespaceExists(namespace); + } + + @Override + public Map loadNamespaceMetadata( + String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().loadNamespaceMetadata(namespace); + } + + @Override + public void createNamespace( + String[] namespace, + Map metadata) throws NamespaceAlreadyExistsException { + asNamespaceCatalog().createNamespace(namespace, metadata); + } + + @Override + public void alterNamespace( + String[] namespace, + NamespaceChange... changes) throws NoSuchNamespaceException { + asNamespaceCatalog().alterNamespace(namespace, changes); + } + + @Override + public boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { + return asNamespaceCatalog().dropNamespace(namespace, cascade); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + return asFunctionCatalog().loadFunction(ident); + } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + return asFunctionCatalog().listFunctions(namespace); + } + + @Override + public boolean functionExists(Identifier ident) { + return asFunctionCatalog().functionExists(ident); + } + + private TableCatalog asTableCatalog() { + return (TableCatalog) delegate; + } + + private SupportsNamespaces asNamespaceCatalog() { + return (SupportsNamespaces) delegate; + } + + private FunctionCatalog asFunctionCatalog() { + return (FunctionCatalog) delegate; + } +} diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala new file mode 100644 index 000000000..7bd8d0397 --- /dev/null +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSparkSessionBase + +class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessionBase { + private val testTable = "mycatalog.default.flint_sql_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + spark.conf.set( + "spark.sql.catalog.mycatalog", + "org.opensearch.sql.FlintDelegatingSessionCatalog") + + // Create test table + spark.sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + |""".stripMargin) + + spark.sql(s""" + | INSERT INTO $testTable + | VALUES ('Hello', 30) + | """.stripMargin) + } + + test("test read from customized catalog") { + + val result = spark.sql(s"SELECT name, age FROM $testTable") + + checkAnswer(result, Seq(Row("Hello", 30))) + } +} From 2f22bf28953d804454df1724a459483ddb15a4f0 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 26 Sep 2023 14:27:46 -0700 Subject: [PATCH 2/2] address comments Signed-off-by: Peng Huo --- .../sql/FlintDelegatingSessionCatalog.java | 9 ++-- .../FlintDelegatingSessionCatalogTest.scala | 47 ++++++++++--------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java index 86c4caba8..6ed9fa980 100644 --- a/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java +++ b/spark-sql-application/src/main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java @@ -39,6 +39,9 @@ public final void setDelegateCatalog(CatalogPlugin delegate) { // do nothing } + /** + * Using V2SessionCatalog name: spark_catalog. + */ @Override public String name() { return delegate.name(); @@ -46,10 +49,8 @@ public String name() { @Override public final void initialize(String name, CaseInsensitiveStringMap options) { - delegate = SparkSession.getActiveSession() - .get() - .sessionState() - .catalogManager().v2SessionCatalog(); + this.delegate = + SparkSession.getActiveSession().get().sessionState().catalogManager().v2SessionCatalog(); } @Override diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala index 7bd8d0397..f6be0b1c3 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql.test.SharedSparkSessionBase class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessionBase { private val testTable = "mycatalog.default.flint_sql_test" + private val testTableWithoutCatalog = "default.flint_sql_test" override def beforeAll(): Unit = { super.beforeAll() @@ -18,30 +19,34 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio "spark.sql.catalog.mycatalog", "org.opensearch.sql.FlintDelegatingSessionCatalog") - // Create test table - spark.sql(s""" - | CREATE TABLE $testTable - | ( - | name STRING, - | age INT - | ) - | USING CSV - | OPTIONS ( - | header 'false', - | delimiter '\t' - | ) - |""".stripMargin) - - spark.sql(s""" - | INSERT INTO $testTable - | VALUES ('Hello', 30) - | """.stripMargin) + sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES ('Hello', 30) + | """.stripMargin) } - test("test read from customized catalog") { + test("query with customized catalog name") { + var result = sql(s"SELECT name, age FROM $testTable") + checkAnswer(result, Seq(Row("Hello", 30))) + } - val result = spark.sql(s"SELECT name, age FROM $testTable") + test("query without catalog name") { + sql("use mycatalog") + assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog"))) - checkAnswer(result, Seq(Row("Hello", 30))) + checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30))) } }