Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flint index metadata log and transaction support #110

Merged
merged 23 commits into from
Oct 31, 2023

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Oct 27, 2023

Description

  1. Added new abstraction OptimisticTransaction and FlintMetadataLog. Please see Javadoc for details.
  2. Added detailed logging on each layer instead of throwing underlying exception all the way up.

TODO

  1. Remove appId and jobId in Flint metadata
  2. Detect streaming job failure (currently just keep updating metadata log without any check)
  3. Avoid data source info in Flint core in future [TBD]

Example

$ spark-shell ... --conf spark.flint.index.checkpoint.mandatory=false --conf spark.flint.datasource.name=myglue

scala> spark.sql("CREATE SKIPPING INDEX ON stream.lineitem_tiny (l_shipdate VALUE_SET) WITH (auto_refresh=true)")

scala> spark.sql("DROP SKIPPING INDEX ON stream.lineitem_tiny")

Flint Metadata Changes

Added new latestId field pointing to the latest metadata log entry:

{
  "flint_myglue_stream_lineitem_tiny_skipping_index": {
    "mappings": {
      "_meta": {
        "latestId": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "kind": "skipping",
        "indexedColumns": [
          ...
      },
      ...
    }
  }
}

Flint Metadata Log Format

Add new Flint metadata log in OpenSearch which is changed over time (appId and jobId is unknown because of local test)

    # Created and start refreshing
  {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "refreshing",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698713346468",
          "error": ""
        }
      }
     # Timestamp updated
     {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "refreshing",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698713361525",
          "error": ""
        }
      }
     # After deleted
      {
        "_index": ".query_execution_request_myglue",
        "_id": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "type": "flintindexstate",
          "state": "deleted",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "lastUpdateTime": "1698713440274",
          "error": ""
        }
      }

Logging

### Create Index Transaction ###
# Start transaction
FlintSpark: Creating Flint index FlintSparkSkippingIndex(myglue.stream.lineitem_tiny,List(ValueSetSkippingStrategy(VALUE_SET,l_shipdate,date)),FlintSparkIndexOptions(Map(auto_refresh -> true))) with ignoreIfExists false
FlintOpenSearchClient: Checking if Flint index exists flint_myglue_stream_lineitem_tiny_skipping_index
FlintOpenSearchClient: Starting transaction on index flint_myglue_stream_lineitem_tiny_skipping_index and metadata log index .query_execution_request_myglue

# Creating initial log entry with EMPTY state
FlintOpenSearchMetadataLog: Fetching latest log entry with id ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4
FlintOpenSearchMetadataLog: Latest log entry not found
FlintOpenSearchMetadataLog: Creating log entry FlintMetadataLogEntry(,-2,0,empty,myglue,)
FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,0,1,empty,myglue,)

# Updating transient log with CREATING state
FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,0,1,creating,myglue,)
FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,1,1,creating,myglue,)

# Perform create index operation
FlintOpenSearchClient: Creating Flint index flint_myglue_stream_lineitem_tiny_skipping_index with metadata FlintMetadata(0.1.0,flint_myglue_stream_lineitem_tiny_skipping_index,skipping,myglue.stream.lineitem_tiny,[Ljava.util.Map;@357afae1,{auto_refresh=true},{},{file_path={type=keyword}, l_shipdate={format=strict_date, type=date}},Some(hbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4),None)

# Updating final log entry with ACTIVE state
FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,1,1,active,myglue,)
FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,2,1,active,myglue,)

### Refresh Index Transaction ###
FlintSpark: Refreshing Flint index flint_myglue_stream_lineitem_tiny_skipping_index with mode INCREMENTAL
FlintSpark: Describing index name flint_myglue_stream_lineitem_tiny_skipping_index
FlintOpenSearchClient: Checking if Flint index exists flint_myglue_stream_lineitem_tiny_skipping_index
FlintOpenSearchClient: Fetching Flint index metadata for flint_myglue_stream_lineitem_tiny_skipping_index

# Start transaction and update from ACTIVE to REFRESHING
FlintOpenSearchClient: Starting transaction on index flint_myglue_stream_lineitem_tiny_skipping_index and metadata log index .query_execution_request_myglue
FlintOpenSearchMetadataLog: Fetching latest log entry with id ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4
FlintOpenSearchMetadataLog: Found latest log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,2,1,active,myglue,)
FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,2,1,refreshing,myglue,)
FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,3,1,refreshing,myglue,)

### Scheduler Transaction ###
FlintSpark: Scheduler triggers index log entry update
FlintOpenSearchClient: Starting transaction on index flint_myglue_stream_lineitem_tiny_skipping_index and metadata log index .query_execution_request_myglue
FlintOpenSearchMetadataLog: Fetching latest log entry with id ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4
FlintOpenSearchMetadataLog: Found latest log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,5,1,refreshing,myglue,)
FlintSpark: Updating log entry to FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,5,1,refreshing,myglue,)
FlintOpenSearchMetadataLog: Updating log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,5,1,refreshing,myglue,)
FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX3N0cmVhbV9saW5laXRlbV90aW55X3NraXBwaW5nX2luZGV4,6,1,refreshing,myglue,)

# Scheduler transaction failed due to state changed to DELETED and thus exit automatically
ERROR FlintSpark: Failed to update index log entry
java.lang.IllegalStateException: Transaction failed due to initial log precondition not satisfied
	at org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction.commit(DefaultOptimisticTransaction.java:94) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.opensearch.flint.spark.FlintSpark.$anonfun$scheduleIndexStateUpdate$1(FlintSpark.scala:260) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]

Issues Resolved

#95

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added the enhancement New feature or request label Oct 27, 2023
@dai-chen dai-chen self-assigned this Oct 27, 2023
Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen marked this pull request as ready for review October 31, 2023 01:42
@dai-chen dai-chen merged commit 3e93f77 into opensearch-project:main Oct 31, 2023
4 checks passed
@dai-chen dai-chen deleted the add-index-state-transition branch October 31, 2023 16:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants