Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Tumble function doesn't support expression #626

Closed
dai-chen opened this issue Sep 6, 2024 · 1 comment
Closed

[FEATURE] Tumble function doesn't support expression #626

dai-chen opened this issue Sep 6, 2024 · 1 comment
Labels
bug Something isn't working Core:MV

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Sep 6, 2024

Is your feature request related to a problem?

A ClassCastException when using the TUMBLE function with expressions in a CREATE MATERIALIZED VIEW statement.

For example:

CREATE MATERIALIZED VIEW test_day AS
SELECT
  COUNT(1),
  window.start
FROM
  test
GROUP BY
  TUMBLE(CAST(FROM_UNIXTIME(time) AS TIMESTAMP), '1 Hour')
ORDER BY
  window.start;
...

java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.Cast cannot be cast to class org.apache.spark.sql.catalyst.expressions.Attribute (org.apache.spark.sql.catalyst.expressions.Cast and org.apache.spark.sql.catalyst.expressions.Attribute are in unnamed module of loader 'app')
    at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$WindowingAggregate$.unapply(FlintSparkMaterializedView.scala:132)
    at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:87)
    at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:86)
...

What solution would you like?

Support expression in TUMBLE function. This is especially useful when time column in the source dataset is not timestamp type.

What alternatives have you considered?

Alternatively, using subquery can be a workaround:

CREATE MATERIALIZED VIEW test_day AS
SELECT
  COUNT(1),
  window.start
FROM (
    SELECT CAST(FROM_UNIXTIME(start) AS TIMESTAMP) AS startTime
    FROM test
)
GROUP BY
  TUMBLE(startTime, '1 Hour')
ORDER BY
  window.start
...

Do you have any additional context?

The first thing is to confirm if Spark can support event time defined by an expression.

@dai-chen dai-chen added enhancement New feature or request untriaged and removed untriaged labels Sep 6, 2024
@dai-chen dai-chen added bug Something isn't working and removed enhancement New feature or request labels Oct 29, 2024
@dai-chen
Copy link
Collaborator Author

dai-chen commented Oct 30, 2024

Actually EventTimeWatermark operator only accepts column. In this case the workaround above seems the right way to do this. I verified the correctness by inspecting the query plan:

Aggregate [window#132-T1000ms], [window#132-T1000ms.start AS startTime#107, count(1) AS count#108L]
+- Project [named_struct(...) AS window#132-T1000ms]
   +- Filter isnotnull(timestamp2#106-T1000ms)
      +- EventTimeWatermark timestamp2#106: timestamp, 1 seconds
         +- Project [cast(timestamp#130 as timestamp) AS timestamp2#106]
            +- StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession@4bf9f44b,CSV,List(),
Some(StructType(StructField(id,IntegerType,true),StructField(status_code,IntegerType,true),
StructField(request_path,StringType,true),StructField(timestamp,StringType,true))),List(),None,
Map(header -> false, delimiter -> 	, path -> file:/...),Some(CatalogTable(...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Core:MV
Projects
None yet
Development

No branches or pull requests

1 participant