Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance CV and MV idempotency via deterministic ID generation #946

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Nov 22, 2024

Description

This PR enhances the Covering Index (CV) and Materialized View (MV) write operations by introducing deterministic ID generation. Deterministic IDs uniquely identify each document in a CV/MV based on its data, enabling idempotency. This ensures consistent results during retries or restarts of index refreshes. By eliminating duplicates, this approach maintains data integrity and operational consistency, even in failure scenarios.

Algorithm

The ID generation logic follows the precedence below:

  1. User-Provided ID Expression
    • Enables users to define custom ID generation logic based on specific columns or expressions.
    • If empty, no ID column is generated, which is useful for testing or disabling the feature.
  2. Aggregated MV
    • If MV queries involves aggregation, IDs are generated using SHA-1 on concatenated output columns.
    • SHA-1 is chosen for balancing collision resistance, performance, and space efficiency, compared to other options in Spark such as hash, xxhash64, md5 and sha-2.
  3. Default Behavior
    • Otherwise, no ID column is generated and idempotency is not guaranteed.

Side Effects

This deterministic document ID approach introduces these impacts and requires further exploration to determine if a better solution is viable for long-term scalability and performance.

  1. Spark Computation Overhead
    • The SHA-1 hash and string concat operations adds computation overhead on Spark side.
  2. OpenSearch Ingestion Overhead
    • Each document write involves a document ID lookup to identify and deduplicate existing documents.
  3. OpenSearch Document Size
    • The SHA-1 hash used for document IDs (160 bits) consumes more space compared to OpenSearch's default UUID-based IDs (128 bits).
  4. Possibility of Collision
    • While the likelihood of collisions with SHA-1 is extremely low (e.g., negligible even at PB-scale data), a collision could result in data loss by overwriting an existing document.

Documentation

https://github.com/dai-chen/opensearch-spark/blob/support-covering-index-and-mv-idempotency-rework/docs/index.md#create-index-options

Related Issues

#88

Check List

  • Updated documentation (docs/ppl-lang/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • New added source code should include a copyright header
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added enhancement New feature or request 0.7 labels Nov 22, 2024
@dai-chen dai-chen self-assigned this Nov 22, 2024
@dai-chen dai-chen marked this pull request as ready for review November 25, 2024 16:53
Copy link
Collaborator

@noCharger noCharger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. A couple of points to emphasize in the documentation:

  1. The id_expression generates a unique _id to be used during OpenSearch write operations with upsert semantics, guaranteeing idempotency.
  2. It would be beneficial to provide more examples about the common and available id_expression options supported.

@dai-chen
Copy link
Collaborator Author

dai-chen commented Dec 6, 2024

LGTM. A couple of points to emphasize in the documentation:

  1. The id_expression generates a unique _id to be used during OpenSearch write operations with upsert semantics, guaranteeing idempotency.
  2. It would be beneficial to provide more examples about the common and available id_expression options supported.

Sure, will update the documentation. Meanwhile will block this PR merge until the follow-up PR for testing dashboard query. Thanks!

@dai-chen
Copy link
Collaborator Author

I'm testing with AOSS and found time-series collection seems not support create/index request with doc ID specified. If this is true, that means the approach in this PR as well as Flint skipping index (both relies on doc ID) won't work. Will double confirm.

@dai-chen
Copy link
Collaborator Author

dai-chen commented Dec 20, 2024

Using the AOSS time series collection, it was confirmed that the following exception is thrown when attempting to create or update a document with ID:

24/12/19 23:58:46 WARN TaskSetManager: Lost task 39.0 in stage 3.0 (TID 3447) ([2600:1f14:38a0:a801:1a95:8524:74ed:54a8] executor 4): java.lang.RuntimeException: failure in bulk execution:
[0]: index [flint_glue_default_mv_idempotent_test_1], id [51fd2d0929ebb39c79ee9492ba064f654e56709c], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[1]: index [flint_glue_default_mv_idempotent_test_1], id [53b97544fb9831e0714a12ca243930539af06c3d], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[2]: index [flint_glue_default_mv_idempotent_test_1], id [5896fb03aa01e3962215b19eddefa07a8764fc55], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[3]: index [flint_glue_default_mv_idempotent_test_1], id [1ec78f9c430d1e7dfc682b76b0f8e27eb7e3c32d], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[4]: index [flint_glue_default_mv_idempotent_test_1], id [fcd5f0c77810f6382ff00ec942e7f1fcb529d61f], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
	at org.opensearch.flint.core.storage.OpenSearchWriter.flush(OpenSearchWriter.java:64)
	at shaded.flint.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.flush(WriterBasedJsonGenerator.java:983)
	at org.apache.spark.sql.flint.json.FlintJacksonGenerator.flush(FlintJacksonGenerator.scala:257)
	at org.apache.spark.sql.flint.FlintPartitionWriter.write(FlintPartitionWriter.scala:64)
	at org.apache.spark.sql.flint.FlintPartitionWriter.write(FlintPartitionWriter.scala:24)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1410)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
	at org.apache.spark.scheduler.Task.run(Task.scala:152)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

The implications of these findings are as follows:

  1. Flint's skipping index creation will fail because it automatically uses source file path as the document ID.
  2. The deduplication approach proposed for APPEND mode in this PR will not function as intended.
  3. A similar deduplication approach (upsert with document ID) for future UPDATE mode support will also be ineffective.

For this PR, it remains functional with both AOS and AOSS search collections. However, since I couldn't find a way to determine the type of an AOSS collection, I will temporarily remove the auto-generated ID column logic for aggregate MVs. A separate PR will be submitted once an alternative approach is identified.

@dai-chen dai-chen marked this pull request as draft December 20, 2024 18:07
@dai-chen dai-chen force-pushed the support-covering-index-and-mv-idempotency-rework branch from 796b45c to 15ed31b Compare December 20, 2024 21:42
@dai-chen dai-chen marked this pull request as ready for review December 20, 2024 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.7 enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants