Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support on-demand incremental refresh #234

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Jan 23, 2024

Description

Support on-demand incremental index refresh which is a new index refresh mode. Please check updated user manual for more details:

  1. Added new section to explain 3 index refresh modes: https://github.com/dai-chen/opensearch-spark/blob/support-on-demand-incremental-refresh-rebased/docs/index.md#flint-index-refresh
  2. Added new incremental refresh option: https://github.com/dai-chen/opensearch-spark/blob/support-on-demand-incremental-refresh-rebased/docs/index.md#create-index-options

Code Changes

Most of the changed files are UT and IT. Reviewer can focus on the following changes:

  1. FlintSparkIndexOptions: added a new incremental_refresh option with default False value

  2. FlintSpark: refactored complex doRefresh() method by the new FlintSparkIndexRefresh abstraction and its subclass AutoIndexRefresh and FullIndexRefresh

  3. IncrementalIndexRefresh: added this subclass for new incremental mode

Class Diagram

FlintSparkIndexRefresh

Example

Create test table and skipping index with incremental_refresh = true:

spark-sql>
CREATE TABLE stream.refresh_test
(name STRING)
USING CSV
LOCATION 's3://refresh_test/';

INSERT INTO stream.refresh_test VALUES ('hello'), ('world');

CREATE INDEX test_idx ON stream.refresh_test (name)
WITH (
  incremental_refresh = true,
  checkpoint_location = 's3://checkpoints/refresh_test/'
);

At this moment, Flint index is empty with incremental refresh option stored in metadata:

GET flint_myglue_stream_refresh_test_test_idx_index/_mapping
{
  "flint_myglue_stream_refresh_test_test_idx_index": {
    "mappings": {
      "_meta": {
        "latestId": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9yZWZyZXNoX3Rlc3RfdGVzdF9pZHhfaW5kZXg=",
        "kind": "covering",
        "indexedColumns": [
          {
            "columnType": "string",
            "columnName": "name"
          }
        ],
        "name": "test_idx",
        "options": {
          "auto_refresh": "false",
          "incremental_refresh": "true",
          "checkpoint_location": "s3://checkpoints/refresh_test/"
        },
        "source": "myglue.stream.refresh_test",
        ...

Trigger incremental refresh:

# Similarly as full refresh, incremental refresh statement will exit once complete
spark-sql> REFRESH INDEX test_idx ON stream.refresh_test;

POST flint_myglue_stream_refresh_test_test_idx_index/_search
    ...
    "hits": [
      {
        "_index": "flint_myglue_stream_refresh_test_test_idx_index",
        "_id": "1Ih0PY0BZG4KSy0Om1AJ",
        "_score": 1,
        "_source": {
          "name": "world"
        }
      },
      {
        "_index": "flint_myglue_stream_refresh_test_test_idx_index",
        "_id": "GTV0PY0BWdGHpYUCm64S",
        "_score": 1,
        "_source": {
          "name": "hello"
        }
      }
    ]

# Index state is back to active with timestamp updated
POST .query_execution_request_myglue/_search
    ...
    "_source": {
          "version": "1.0",
          "latestId": "ZmxpbnRfbXlnbHVlX3N0cmVhbV9yZWZyZXNoX3Rlc3RfdGVzdF9pZHhfaW5kZXg=",
          "type": "flintindexstate",
          "state": "active",
          "applicationId": "unknown",
          "jobId": "unknown",
          "dataSourceName": "myglue",
          "jobStartTime": 1706133480199,
          "lastUpdateTime": 1706133493409,
          "error": ""
        }

Insert more source data and trigger refresh again:

spark-sql> INSERT INTO stream.refresh_test VALUES ('test');

spark-sql> REFRESH INDEX test_idx ON stream.refresh_test;

# Only latest new data is refreshed without duplicates
POST flint_myglue_stream_refresh_test_test_idx_index/_search
    ...
    "hits": [
      {
        "_index": "flint_myglue_stream_refresh_test_test_idx_index",
        "_id": "1Yh7PY0BZG4KSy0OFFAw",
        "_score": 1,
        "_source": {
          "name": "test"
        }
      },
      {
        "_index": "flint_myglue_stream_refresh_test_test_idx_index",
        "_id": "1Ih0PY0BZG4KSy0Om1AJ",
        "_score": 1,
        "_source": {
          "name": "world"
        }
      },
      {
        "_index": "flint_myglue_stream_refresh_test_test_idx_index",
        "_id": "GTV0PY0BWdGHpYUCm64S",
        "_score": 1,
        "_source": {
          "name": "hello"
        }
      }
    ]

Issues Resolved

#195

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen added enhancement New feature or request 0.2 labels Jan 23, 2024
@dai-chen dai-chen self-assigned this Jan 23, 2024
@dai-chen dai-chen changed the title Support on-demand incremental manual refresh Support on-demand incremental refresh Jan 24, 2024
Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen marked this pull request as ready for review January 26, 2024 07:29
docs/index.md Outdated Show resolved Hide resolved
docs/index.md Outdated Show resolved Hide resolved
@dai-chen dai-chen requested a review from penghuo February 7, 2024 19:36
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen merged commit f446de0 into opensearch-project:main Feb 8, 2024
4 checks passed
@dai-chen dai-chen deleted the support-on-demand-incremental-refresh-rebased branch February 8, 2024 20:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.2 enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants