Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Prevalidate Flint index creation on Iceberg branches #709

Open
dai-chen opened this issue Sep 27, 2024 · 0 comments
Open

[FEATURE] Prevalidate Flint index creation on Iceberg branches #709

dai-chen opened this issue Sep 27, 2024 · 0 comments
Labels
DataSource:Iceberg enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

Is your feature request related to a problem?

When working with Flint indexes on Iceberg branches, the refresh job processes the first micro-batch but then stops progressing. This happens because Iceberg branches are not supported for streaming reads in Spark Structured Streaming jobs, which primarily read from the snapshot lineage on the main table in Iceberg. As a result, Flint auto-refresh indexes (which rely on Spark streaming) get stuck after the first batch.

What solution would you like?

To avoid this, Flint should pre-validate whether users are attempting to create an auto or incremental-refresh index (backed by a Spark streaming job) on an Iceberg branch. If an Iceberg branch is detected, the creation process should fail with a clear message indicating this is not supported. This will notify users beforehand and prevent them from running into issues later when the streaming job starts but fails to progress.

What alternatives have you considered?

  1. Alternatively, clear documentation could help users manually configure their indexes for manual refresh on Iceberg branches. However, by pre-validating and blocking the creation of auto-refresh indexes, Flint can ensure users are informed early and avoid job issues after the streaming job has started.
  2. Users can perform a full manual refresh on an Iceberg branch; however, they will need to truncate the index to avoid duplicates before refreshing the data.

Do you have any additional context?

  1. Related to Support Spark micro batch streaming of a specific branch apache/iceberg#3898
  2. Source abstraction is helpful here as mentioned in [FEATURE] Handle Iceberg overwrite and delete snapshots to prevent index refresh failure #708
  3. Example test
# Start streaming job on a Iceberg branch
scala> spark.readStream.table("myglue.default.iceberg_test.branch_test")
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .option("checkpointLocation", "s3://checkpoint_1")
  .start()

-------------------------------------------
Batch: 0
-------------------------------------------
+---+----+---+
| id|data|col|
+---+----+---+
|  1|   a|1.0|
|  2|   b|2.0|
|  3|   c|3.0|
|  4|   d|4.0|
+---+----+---+

24/09/26 20:15:43 WARN ProcessingTimeExecutor: Current batch is falling behind.
The trigger interval is 1000 milliseconds, but spent 7920 milliseconds


scala> sql("INSERT INTO myglue.default.iceberg_test.branch_output_mode_test values (7, 'h', 7.0)")
res4: org.apache.spark.sql.DataFrame = []

# No progress in subsequent microbatch
scala> 24/09/26 20:16:53 WARN ProcessingTimeExecutor: Current batch is falling behind.
The trigger interval is 1000 milliseconds, but spent 2066 milliseconds

# New data is already present in branch though
scala> sql("SELECT * FROM myglue.default.iceberg_test.branch_test").show
+---+----+---+
| id|data|col|
+---+----+---+
|  4|   d|4.0|
|  7|   h|7.0|
|  1|   a|1.0|
|  2|   b|2.0|
|  3|   c|3.0|
+---+----+---+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DataSource:Iceberg enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant