Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix index state stuck in refreshing when streaming job exits early #370

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Jun 6, 2024

Description

Addressed the issue that streaming job already exits before awaitMonitor API called and updated Flint index state to FAILED.

Note that I'm unable to reproduce this issue in IT and doubt the the delay between execute SQL query and await monitor is caused by result index write. For DDL statement, although the result is empty, we write it by DataFrame with Flint data source instead of using OpenSearch client directly. Need to confirm this and will create issue separately.

Issues Resolved

#368

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added bug Something isn't working 0.5 backport 0.4 labels Jun 6, 2024
@dai-chen dai-chen self-assigned this Jun 6, 2024
Signed-off-by: Chen Dai <[email protected]>
Copy link
Collaborator

@penghuo penghuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx!

Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen marked this pull request as ready for review June 7, 2024 20:51
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen force-pushed the fix-index-state-stuck-when-streaming-job-exit-early branch from a1e809f to 14df017 Compare June 7, 2024 23:48
Copy link
Collaborator

@noCharger noCharger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change!

@dai-chen dai-chen merged commit 5460acc into opensearch-project:main Jun 10, 2024
4 checks passed
@dai-chen dai-chen deleted the fix-index-state-stuck-when-streaming-job-exit-early branch June 10, 2024 16:19
opensearch-trigger-bot bot pushed a commit that referenced this pull request Jun 10, 2024
)

* Handle streaming job exit early case

Signed-off-by: Chen Dai <[email protected]>

* Modify IT to simplify

Signed-off-by: Chen Dai <[email protected]>

* Address PR comments

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit 5460acc)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
dai-chen pushed a commit that referenced this pull request Jun 10, 2024
) (#374)

* Handle streaming job exit early case



* Modify IT to simplify



* Address PR comments



* Add more IT



---------


(cherry picked from commit 5460acc)

Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@dai-chen
Copy link
Collaborator Author

Verified the changes with more test:

# Generate Parquet file with INT column
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val data = Seq(1, 2, 3, 4, 5)
val df = data.toDF("protocol").withColumn("protocol", $"protocol".cast(IntegerType))
df.write.mode("overwrite").parquet("s3://parquet_mismatch_test/")

# Create Spark table with BIGINT column
sql("""
    CREATE TABLE parquet_mismatch_test (
        protocol BIGINT
    )
    USING parquet
    LOCATION 's3://parquet_mismatch_test/'
""")

# Submit Spark job to create covering index
CREATE INDEX await_early_test
ON glue.default. parquet_mismatch_test (
  protocol
)
WITH (
  auto_refresh = true
);

# Spark job log
24/06/20 21:44:39 ERROR MicroBatchExecution: Query flint_glue_default_parquet_mismatch_test_await_early_test_index [id = 72705241-bafd-4b02-8f2c-328ec3528aa7,
runId = 8b978f47-a816-4ab0-b9b6-b7e2657b52e6] terminated with error
org.apache.spark.SparkException: Writing job aborted
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 1.0 (TID 11) ([2600:1f14:3ed2:a401:2d1f:8db2:ece6:45cc] executor 1): org.apache.spark.sql.execution.QueryExecutionException: 
Parquet column cannot be converted in file s3://parquet_mismatch_test/part-00001-cbde737c-1014-4d9a-b4fd-6aea3aa91206-c000.snappy.parquet.
Column: [protocol], Expected: bigint, Found: INT32
	at ...
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
	at ...

24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Index monitor for [None] not found.
24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Found index name in index monitor task list: flint_glue_default_parquet_mismatch_test_await_early_test_index
24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Updating index state to failed for flint_glue_default_parquet_mismatch_test_await_early_test_index
24/06/20 21:44:41 INFO FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(
ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4,
14,1,1718919871609,failed,glue,)

# Double check index state is Failed
      {
        "_index": ".query_execution_request_glue",
        "_id": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "latestId": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4",
          "type": "flintindexstate",
          "state": "failed",
          "applicationId": "00fj56e4cs0ghe0l",
          "jobId": "00fk8m0kqlj1180n",
          "dataSourceName": "glue",
          "jobStartTime": 1718919871609,
          "lastUpdateTime": 1718919880581,
          "error": ""
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.5 backport 0.4 bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants