Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recover index job statement #119

Merged

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Oct 31, 2023

Description

  1. Added RECOVER INDEX JOB command which restarts incremental refresh job regardless of previous index state. In particular, the index state will transit from any state to RECOVERING (new index state) and then to REFRESHING once recovery complete.
  2. Improved scheduled update task to check streaming job status when updating heartbeat timestamp: if streaming job is not active, transit index state to FAILED directly and cancel update task.

Documentation

Updated user manual https://github.com/dai-chen/opensearch-spark/blob/add-recover-index-job-command/docs/index.md#index-job-management

Example

# Pass datasource parameter to enable transaction
$ spark-shell  ... --conf spark.flint.datasource.name=myglue

# Enable info logging
scala> spark.sparkContext.setLogLevel("INFO")

# Create auto refreshed streaming job
scala> spark.sql("""
     | CREATE SKIPPING INDEX ON stream.lineitem_tiny
     | (l_shipdate VALUE_SET)
     | WITH (
     |   auto_refresh = true,
     |   checkpoint_location = "s3://chen-emr-test/checkpoints/job-5",
     |   extra_options = '{"myglue.ds_tables.http_logs": {"maxFilesPerTrigger": "1"}}'
     | )
     | """)

# Stop streaming job (or terminate spark-shell directly but won't get failed state update)
scala> spark.streams.active.head.stop 

# At next scheduled run, update task will be cancelled
INFO FlintSpark: Scheduler triggers index log entry update for flint_myglue_stream_lineitem_tiny_skipping_index
ERROR FlintSpark: Streaming job is not active. Cancelling update task
INFO FlintOpenSearchClient: Starting transaction on index flint_myglue_stream_lineitem_tiny_skipping_index and data source myglue
INFO FlintOpenSearchClient: Found metadata log index .query_execution_request_myglue
INFO FlintOpenSearchMetadataLog: Fetching latest log entry with id ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4
INFO FlintOpenSearchMetadataLog: Found latest log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,6,1,refreshing,myglue,)
INFO FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,6,1,failed,myglue,)
INFO FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,7,1,failed,myglue,)
INFO FlintSpark: Update task is cancelled

# Index state is moved to FAILED
      {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "failed",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698859813319",
          "error": ""
        }
      }

# Run recover command
scala> spark.sql("RECOVER INDEX JOB `flint_myglue_stream_lineitem_tiny_skipping_index`")

# Verified index state update scheduler started
INFO FlintSpark: Scheduler triggers index log entry update
INFO FlintOpenSearchClient: Starting transaction on index flint_myglue_stream_lineitem_tiny_skipping_index and data source myglue
INFO FlintOpenSearchClient: Found metadata log index .query_execution_request_myglue
INFO FlintOpenSearchMetadataLog: Fetching latest log entry with id ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4
INFO FlintOpenSearchMetadataLog: Found latest log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,7,1,refreshing,myglue,)
INFO FlintSpark: Updating log entry to FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,7,1,refreshing,myglue,)
INFO FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,7,1,refreshing,myglue,)
INFO FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,8,1,refreshing,myglue,)

POST .query_execution_request_myglue/_search
      {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "refreshing",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698792576309",
          "error": ""
        }
      }

POST .query_execution_request_myglue/_search
        {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "refreshing",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698792714273",
          "error": ""
        }
      }

Issues Resolved

#57

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added the enhancement New feature or request label Oct 31, 2023
@dai-chen dai-chen self-assigned this Oct 31, 2023
@dai-chen dai-chen marked this pull request as ready for review October 31, 2023 23:02
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true) // bypass state check and recover anyway
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the index has been deleted?

Copy link
Collaborator Author

@dai-chen dai-chen Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change precondition check to stable index state ACTIVE, REFRESHING and FAILED. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address in 92f567e.

Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen merged commit fda5dad into opensearch-project:main Nov 6, 2023
4 checks passed
@dai-chen dai-chen deleted the add-recover-index-job-command branch November 6, 2023 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants