Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Tumble Function/Watermark Does Not Work With UNION ALL #964

Open
engechas opened this issue Dec 3, 2024 · 0 comments
Open

[BUG] Tumble Function/Watermark Does Not Work With UNION ALL #964

engechas opened this issue Dec 3, 2024 · 0 comments
Labels
bug Something isn't working Core:MV

Comments

@engechas
Copy link
Contributor

engechas commented Dec 3, 2024

What is the bug?
I am attempting to union multiple Glue tables in a materialized view query. The MV command is:

CREATE MATERIALIZED VIEW `<my catalog>`.`<my db>`.`<my mv name>` AS 
    SELECT 
        TUMBLE(`@timestamp`, '"'"'5 Minute'"'"').start AS `start_time`, 
        ... remaining fields omitted
    FROM ( 
       ( 
          SELECT 
              ... remaining fields omitted
              start_time_dt AS `@timestamp` 
          FROM <my catalog>.<my db>.<my table 1>
       ) 
          UNION ALL 
       ( 
          SELECT 
              ... remaining fields omitted
              start_time_dt AS `@timestamp` 
          FROM <my catalog>.<my db>.<my table 2>
       ) 
    ) 
    GROUP BY 
        TUMBLE(`@timestamp`, '5 Minute'), 
        ... remaining fields omitted
    WITH ( 
        auto_refresh = true, 
        refresh_interval = '5 minutes', 
        watermark_delay = '1 Minute' 
    )

The refresh fails saying there is a call to nullable on an UnresolvedAttribute in the watermark logic:

24/12/03 17:46:09.048 ERROR {} (main) DefaultOptimisticTransaction: Rolling back transient log due to transaction operation failure
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
	at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:233)
	at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$5(basicLogicalOperators.scala:520)
	at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$5$adapted(basicLogicalOperators.scala:520)
	at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
	at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
	at scala.collection.immutable.List.exists(List.scala:91)
	at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$4(basicLogicalOperators.scala:520)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:518)
	at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.output(basicLogicalOperators.scala:2046)
	at org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark.<init>(EventTimeWatermark.scala:52)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView.org$opensearch$flint$spark$mv$FlintSparkMaterializedView$$watermark(FlintSparkMaterializedView.scala:111)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:97)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:95)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:463)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView.buildStream(FlintSparkMaterializedView.scala:95)
	at org.opensearch.flint.spark.refresh.AutoIndexRefresh.$anonfun$start$2(AutoIndexRefresh.scala:75)
	at org.opensearch.flint.core.metrics.MetricsSparkListener$.withMetrics(MetricsSparkListener.scala:59)
	at org.opensearch.flint.spark.refresh.AutoIndexRefresh.start(AutoIndexRefresh.scala:73)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.$anonfun$start$2(IncrementalIndexRefresh.scala:55)
	at org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect.withMetrics(RefreshMetricsAspect.scala:51)
	at org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect.withMetrics$(RefreshMetricsAspect.scala:40)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.withMetrics(IncrementalIndexRefresh.scala:23)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.start(IncrementalIndexRefresh.scala:51)
	at org.opensearch.flint.spark.FlintSpark.$anonfun$refreshIndexManual$5(FlintSpark.scala:574)
	at org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction.commit(DefaultOptimisticTransaction.java:102)
	at org.opensearch.flint.spark.FlintSpark.refreshIndexManual(FlintSpark.scala:574)
	at org.opensearch.flint.spark.FlintSpark.$anonfun$refreshIndex$1(FlintSpark.scala:166)
	at org.opensearch.flint.spark.FlintSparkTransactionSupport.withTransaction(FlintSparkTransactionSupport.scala:67)
	at org.opensearch.flint.spark.FlintSparkTransactionSupport.withTransaction$(FlintSparkTransactionSupport.scala:52)
	at org.opensearch.flint.spark.FlintSpark.withTransaction(FlintSpark.scala:43)
	at org.opensearch.flint.spark.FlintSpark.refreshIndex(FlintSpark.scala:160)
	at org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder.$anonfun$visitRefreshMaterializedViewStatement$1(FlintSparkMaterializedViewAstBuilder.scala:65)
	at org.opensearch.flint.spark.sql.FlintSparkSqlCommand.run(FlintSparkSqlCommand.scala:27)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:223)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:692)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:714)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:745)
	at org.apache.spark.sql.JobOperator.start(JobOperator.scala:93)
	at org.apache.spark.sql.FlintJob$.processStreamingJob(FlintJob.scala:388)
	at org.apache.spark.sql.FlintJob$.handleWarmpoolJob(FlintJob.scala:188)
	at org.apache.spark.sql.FlintJob$.main(FlintJob.scala:80)
	at org.apache.spark.sql.FlintJob.main(FlintJob.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Create a materialized view with the above query
  2. Verify the refresh fails

What is the expected behavior?
MVs + TUMBLE should work with UNION ALL

What is your host/environment?

  • OS: [e.g. iOS]
  • Version [e.g. 22]
  • Plugins

Do you have any screenshots?
If applicable, add screenshots to help explain your problem.

Do you have any additional context?
Add any other context about the problem.

@engechas engechas added bug Something isn't working untriaged labels Dec 3, 2024
@dai-chen dai-chen added Core:MV and removed untriaged labels Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Core:MV
Projects
None yet
Development

No branches or pull requests

2 participants