Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Incorrect flint index name during query rewrite #319

Closed
Tracked by #331
seankao-az opened this issue Apr 26, 2024 · 8 comments
Closed
Tracked by #331

[BUG] Incorrect flint index name during query rewrite #319

seankao-az opened this issue Apr 26, 2024 · 8 comments
Labels
bug Something isn't working

Comments

@seankao-az
Copy link
Collaborator

What is the bug?
After upgrading Spark to 3.4.1, skipping index won't be applied to rewrite applicable queries, because during query rewrite, an incorrect flint index name is constructed for the queried table.

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Spin up EMR Serverless with emr-6.13.0 release
  2. Create skipping index for a table and run an applicable select query
  3. Check Spark driver log for
    INFO FlintSpark: Describing index name flint_spark_catalog_default_{table}_skipping_index
  4. Explaining applicable query shows Physical Plan using InMemoryFileIndex / CatalogFileIndex

What is the expected behavior?

  1. Spark driver log should show
    INFO FlintSpark: Describing index name flint_{datasource}_default_{table}_skipping_index
  2. Explaining applicable query should show Physical Plan using FlintSparkSkippingFileIndex

What is your host/environment?

  • OS: 2.13
  • EMR-S: 6.13 (which uses Spark 3.4.1)

Do you have any screenshots?
If applicable, add screenshots to help explain your problem.

Do you have any additional context?

Using emr-6.13.0 release, EXPLAIN EXTENDED for query shows +- Relation spark_catalog.default.{table} in Analyzed Logical Plan, while for emr-6.11.0 release, it reads +- Relation default.{table}

Change in TableIdentifiers interface:

@seankao-az seankao-az added bug Something isn't working untriaged labels Apr 26, 2024
@asuresh8
Copy link
Contributor

asuresh8 commented May 6, 2024

Root cause of the issue is https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-applica[…]main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java. We should be returning the actual name of the datasource rather than spark_catalog. This will be a breaking change.

@penghuo
Copy link
Collaborator

penghuo commented May 7, 2024

Root cause of the issue is https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-applica[…]main/java/org/opensearch/sql/FlintDelegatingSessionCatalog.java. We should be returning the actual name of the datasource rather than spark_catalog. This will be a breaking change.

We did this by intention. The is because Spark compare catalog name with static string (spark_catalog) and take action differently. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala#L386C1-L388C4

@asuresh8
Copy link
Contributor

asuresh8 commented May 7, 2024

You shouldn't need to do that . Because we are delegating to default spark catalog, the session will still work correctly if you set catalog name properly. Take a look at how Iceberg does it: https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java

Let's assume we fix name() to return the configured name. The general flow will be that I create a FlintDelegatingSessionCatalog with name my_s3. Whenever, I run SQL queries on a table in catalog my_s3 that will use spark_catalog as all actions are delegated to that catalog, but the table identifier will be my_s3.${database}.{table}. Let's say I set sql.catalog.spark_catalog to Iceberg's SparkSessionCatalog, then the all actions would delegate to Iceberg's SparkSessionCatalog, which in turn delegates to the default SparkSessionCatalog if the table is not Iceberg. But the table identifier is still my_s3.${database}.${table}

@penghuo
Copy link
Collaborator

penghuo commented May 7, 2024

Test create csv table with Iceberg catalog

./bin/spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.1\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
    --conf spark.sql.defaultCatalog=local

spark-sql ()> CREATE TABLE local.default.test (name STRING, age INT) USING CSV;
24/05/06 21:16:38 ERROR SparkSQLDriver: Failed in [CREATE TABLE local.default.test (name STRING, age INT) USING CSV]
java.lang.IllegalArgumentException: Unsupported format in USING: CSV
	at org.apache.iceberg.spark.Spark3Util.rebuildCreateProperties(Spark3Util.java:129)
	at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:246)
	at org.apache.spark.sql.connector.catalog.TableCatalog.createTable(TableCatalog.java:199)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:415)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:533)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:527)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:527)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:307)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@penghuo
Copy link
Collaborator

penghuo commented May 8, 2024

Why DeletegateSessionCatalog name must be spark_catalog

1.Introduction

  • If return mycatalog as name in FlintDelegatingSessionCatalog, the following query will failed.
  override def beforeAll(): Unit = {
    super.beforeAll()
    spark.*conf*.set(
      "spark.sql.catalog.mycatalog",
      "org.opensearch.sql.FlintDelegatingSessionCatalog")

    *sql*(s"""
                 | CREATE TABLE **$***testTable*
                 | (
                 |   name STRING,
                 |   age INT
                 | )
                 | USING CSV
                 | OPTIONS (
                 |  header 'false',
                 |  delimiter '\t'
                 | )
                 |""".stripMargin)

    *sql*(s"""
                 | INSERT INTO **$***testTable*
                 | VALUES ('Hello', 30)
                 | """.stripMargin)
  }
An exception or error caused a run to abort: Table default.flint_sql_test does not support append in batch mode.;
AppendData RelationV2[name#4, age#5] mycatalog.default.flint_sql_test default.flint_sql_test, false
+- Project [cast(col1#2 as string) AS name#6, cast(col2#3 as int) AS age#7]
   +- LocalRelation [col1#2, col2#3] 
org.apache.spark.sql.AnalysisException: Table default.flint_sql_test does not support append in batch mode.;
AppendData RelationV2[name#4, age#5] mycatalog.default.flint_sql_test default.flint_sql_test, false
+- Project [cast(col1#2 as string) AS name#6, cast(col2#3 as int) AS age#7]
   +- LocalRelation [col1#2, col2#3]

2. Root cause analysis

  • In Analyzer, resolveRelation, firstly, call loadTable(), internally it call V2SessionCatalog createTable/loadTable call return a V1Table instance.
01
  • second, call createRelation(). CatalogV2Util.isSessionCatalog perform static check of catalog.name() is spark_catalog. if not, DataSourceV2Relation.create is called which return DataSourceV2Relation.
02
  • In CheckAnalysis, it invoke TableCapabilityCheck, V1Table does not support capability check, then it throw exception.
03

@asuresh8
Copy link
Contributor

asuresh8 commented May 8, 2024

Spent a few hours researching this as well today and had the same findings.

Per your question on Iceberg, only the default default Spark session catalog can use CSV. Iceberg doesn't have that logic.

So I think it should work if you remove this logic:

    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
    --conf spark.sql.defaultCatalog=local

The easiest path forward will be to leave the existing behavior of using spark_catalog for everything. We should be able to plumb in data source name via Spark property and use that property value to since there should only be one data source at a time.

@penghuo
Copy link
Collaborator

penghuo commented May 8, 2024

How to resolve catalog name in Flint

1. Current Status

flint index name is composed by flint_{catalog_name}{database_name}{table_name}_{index_type}. flint index builder and flint optimizer resolve catalog_name from table.qualifedName.
Current, Flint reolve catalog by doing following steps

  1. call CatalogManager current_catalog method to retrive CatalogPlugin object.
  2. resolve catalog from table.qualifiedName. it include 2 cases
    1. prior Spark 3.4, catalog resolve to CatalogManager current_catalog object
    2. since Spark 3.4, table.qualifiedName include catalog name, catalog resolve to spark_catalog object
  3. fetch all registeed catalog names from CatalogManager, the reason is catalog name regester in CatalogManager through spark.sql.catalog.{catalog-name}, instead of call Catalog.name() method.
  4. call CatalogManager.catalog(catalog-name) and compare with current catalog object. if it is not match, call Catalog.name().
Screenshot 2024-05-07 at 5 33 26 PM

Why the solution does not work since spark 3.4
since 3.4, catalog always resolve to spark_catalog instead of catalog-name configured / specified in query.

2. Proposed solution

The proposed solution is to change catalog resolve logic of step 3 and step 4 as below. The idea is reuse Spark current logic to reolsve CatalogPlugin, but customized catalogName resolve logic. spark.sql.defaultCatalog is used if Catalog.name is spark_catalog. it is a work around to solve #319 (comment).

def resolveCatalogName(CatalogPlugin catalog): String = {
    // FlintDelegateSessionCatalog always return spark_catalog
    // We use defaultCatalog value in this case.
    if (catalog.name() == "spark_catalog") {
      val defaultCatalog = conf.get("spark.sql.defaultCatalog")
      if (CatalogManager.isRegistered(defaultCatalog)) {
        return defaultCatalog
      } else {
        // it may happen when spark.sql.defaultCatalog is configured, but not implementation. 
        // for instance, spark.sql.defaultCatalog = unknown
        throw new RuntimeException("unknown catalog name")
      }
    } else {
      // works for customized catalog, for instance OpenSearch Catalog
      return catalog.name()
    }   
}

Preconditions
the solution requierd the following configurations for glue catalog

  • spark.sql.defaultCatalog = catalog-name
  • spark.sql.catalog.catalog-name = org.spark.sql.FlintDelegationSessionCatalog

Limitations

  • only one glue catalog can be supported

@asuresh8
Copy link
Contributor

asuresh8 commented May 8, 2024

This approach looks good to me. Even today I think only one glue catalog can be supported at a time.

There may be some weird behavior by changing defaultCatalog to point to $catalogName instead of spark_catalog. Maybe we should use a new Spark property?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants