Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly-once guarantee for covering index and MV incremental refresh #143

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Nov 9, 2023

$${\color{red} DON'T \space MERGE \space DUE \space TO \space IMPACT \space ON \space INTEGRATION \space FEATURE }$$

Description

As proposed in issue below, generate ID column in FlintSparkIndex.generateIdColumn():

  1. Add a new option id_expression in create index statement: will use this expression as ID column
  2. If not provided, use all result columns (concat and sha1) if aggregated query (applied to MV)
  3. If manual refresh or auto refresh without checkpoint location, continue without ID column
  4. Otherwise, fail and ask user to provide ID expression

Documentation

Documentation: https://github.com/dai-chen/opensearch-spark/blob/support-covering-index-and-mv-idempotency/docs/index.md#create-index-options

Algorithm

In specifically, the approach in case 2 is first concat each column with ASCII NULL as separator (different from empty or whitespace string) and then pass it to SHA1:

spark-sql> SELECT concat_ws('\0', 'hello', 123, 4.56, true, TIMESTAMP '2023-11-01 10:01:00', null, DATE '2023-11-02', array(7, 8));
hello1234.56true2023-11-01 10:01:002023-11-0278

spark-sql> SELECT sha1(concat_ws('\0', 'hello', 123, 4.56, true, TIMESTAMP '2023-11-01 10:01:00', null, DATE '2023-11-02', array(7, 8)));
ce785d2ecbd1039c5028dc3478b2f5e579f3a880

spark-sql> SELECT sha1(concat_ws(' ', 'hello', 123, 4.56, true, TIMESTAMP '2023-11-01 10:01:00', null, DATE '2023-11-02', array(7, 8)));
2e44ab3226a7d8cf0f4190489d62ea34b5628d9f

spark-sql> SELECT sha1(concat_ws('', 'hello', 123, 4.56, true, TIMESTAMP '2023-11-01 10:01:00', null, DATE '2023-11-02', array(7, 8)));
06aed815b4654249db4994946c5133e372ad42ab

Testing

Case 1:Use ID expression provided in the index options:

scala> spark.sql("""
CREATE INDEX clientip_and_status ON ds_tables.http_logs
(clientip, status)
WITH (
  auto_refresh = true,
  id_expression = 'uuid()'
)
""")

23/11/10 19:58:23 INFO FlintSparkIndex: Generated ID column based on expression Some(uuid())
23/11/10 19:58:23 INFO FlintSparkCoveringIndex: Building covering index by == Physical Plan ==
*(1) Project [clientip#86, status#88, uuid(Some(4863749536212552132)) AS __id__#101]
+- *(1) Scan ExistingRDD[@timestamp#85,clientip#86,request#87,status#88,size#89,year#90,month#91,day#92]

      {
        "_index": "flint_myglue_ds_tables_http_logs_clientip_and_status_index",
        "_id": "aae1a580-ee85-4b9b-b2cd-8f2b80255d9d",
        "_score": 1,
        "_source": {
          "clientip": "138.64.16.0",
          "status": 304
        }
      }

Case 2:Generate ID column based on all output columns if MV with aggregation:

spark.sql("""
CREATE MATERIALIZED VIEW http_logs_metrics
AS
SELECT
  window.start AS startTime,
  COUNT(*) AS count
FROM ds_tables.http_logs
WHERE year = 1998 AND month = 6 AND day = 11 
  AND status BETWEEN 400 AND 599
GROUP BY TUMBLE(`@timestamp`, '1 Hour')
WITH (
  auto_refresh = true,
  checkpoint_location = "s3://checkpoints/",
  watermark_delay = '1 Minute',
  extra_options = '{"myglue.ds_tables.http_logs": {"maxFilesPerTrigger": "10"}}'
)
""")

23/11/10 22:54:44 INFO FlintSparkIndex:
  Generated ID column based on expression Some(sha1(concat_ws(, startTime, count))) 

    "hits": [
            {
        "_index": "flint_myglue_default_http_logs_metrics",
        "_id": "3aad6637a22adc431b032c826df367fea1ed424c",
        "_score": 1,
        "_source": {
          "startTime": "1998-06-11T12:00:00.000000+0000",
          "count": 2
        }
      },

Case 3: ID expression is not mandatory if manual refresh or auto refresh without checkpoint location:

scala> spark.sql("CREATE INDEX clientip_and_status ON ds_tables.http_logs (clientip, status)")
scala> spark.sql("REFRESH INDEX clientip_and_status ON ds_tables.http_logs")

23/11/10 19:55:35 INFO FlintSparkIndex: Generated ID column based on expression None

    "hits": [
      {
        "_index": "flint_myglue_ds_tables_http_logs_clientip_and_status_index",
        "_id": "S4XOuosBZG4KSy0OKmxu",
        "_score": 1,
        "_source": {
          "clientip": "212.167.12.0",
          "status": 304
        }

Case 4: Otherwise, throwThrow exception if doesn't match any case above:

spark.sql("""
CREATE INDEX clientip_and_status ON ds_tables.http_logs
(clientip, status)
WITH (
  auto_refresh = true,
  checkpoint_location = "s3://checkpoints"
)
""")

23/11/10 20:11:44 ERROR MicroBatchExecution:
Query flint_myglue_ds_tables_http_logs_clientip_and_status_index 
  [id = 3d20e1d2-dbc6-4c93-9811-cd94ae98d184, runId = 8872253a-bdb9-47a1-88e3-f2a8169192f5
   terminated with error
 java.lang.IllegalStateException: 
  ID expression is required to avoid duplicate data when index refresh job restart 

Issues Resolved

#88

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added the enhancement New feature or request label Nov 9, 2023
@dai-chen dai-chen self-assigned this Nov 9, 2023
@dai-chen
Copy link
Collaborator Author

dai-chen commented Dec 1, 2023

Will reopen if still want to go with this implementation.

@dai-chen dai-chen closed this Dec 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant