Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial indexing support for skipping index #124

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Nov 2, 2023

Description

  1. Enable WHERE clause support for skipping index
    a. Remove old UT, add more logging
    b. Restrict column must be partitioned
  2. Skipping index query rewrite
    a. Enforces hybrid scan per query if skipping index is partial
    b. Fix hybrid scan join bug: change from LEFT OUTER to ANTI SEMI join between source file list and index data
  3. Restrict filtering condition on covering index be conjunction too (in case we want to support query rewrite in future)

Documentation: https://github.com/dai-chen/opensearch-spark/blob/add-where-clause-for-skipping-index/docs/index.md#skipping-index

Create Partial Skipping Index Test

Create skipping index with filtering condition on non-partitioned column should fail:

spark-sql> CREATE SKIPPING INDEX ON ds_tables.http_logs
         > (status VALUE_SET)
         > WHERE status = 200
         > WITH (
         >   auto_refresh=true
         > );

java.lang.IllegalArgumentException:
    requirement failed: status is not partitioned column and cannot be used in index filtering condition

Create skipping index with disjunction filtering condition (OR) should fail:

spark-sql> CREATE SKIPPING INDEX ON ds_tables.http_logs
         > (status VALUE_SET)
         > WHERE year=1998 OR month=6
         > WITH (
         >   auto_refresh=true
         > );

java.lang.IllegalArgumentException:
    requirement failed: filtering condition Some(year=1998 OR month=6) must be conjunction

Create skipping index with filtering condition on partitioned column should succeed:

CREATE SKIPPING INDEX ON ds_tables.http_logs
(status VALUE_SET)
WHERE year=1998 AND month=6 AND day >= 10
WITH (
  auto_refresh=true,
  extra_options = '{"myglue.ds_tables.http_logs": {"maxFilesPerTrigger": "10"}}'
);

POST flint_myglue_ds_tables_http_logs_skipping_index/_search
      {
        "_index": "flint_myglue_ds_tables_http_logs_skipping_index",
        "_id": "a4245e7651b50fe6e274e2e27f9c9808b0ce3d05",
        "_score": 1,
        "_source": {
          "file_path": "s3://daichen-benchmark/httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=10/part-00383-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
          "status": [
            206,
            200
          ]
        }
      },
      {
        "_index": "flint_myglue_ds_tables_http_logs_skipping_index",
        "_id": "199e16b74b231ad41156da61c95608c1232a9ee6",
        "_score": 1,
        "_source": {
          "file_path": "s3://daichen-benchmark/httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=10/part-00385-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
          "status": [
            206,
            400,
            200
          ]
        }
      },
      ......

Skipping Index Query Rewrite Test

Query rewrite should enforce hybrid scan and thus include unknown files in skipping index:

spark.sql("SELECT `@timestamp`, clientip, status FROM ds_tables.http_logs WHERE status = 500").explain

23/11/10 19:16:22 INFO ApplyFlintSparkSkippingIndex: Applying skipping index rewrite rule on filter condition Filter (status#4511 = 500)
+- Relation ds_tables.http_logs[@timestamp#4508,clientip#4509,request#4510,status#4511,size#4512,year#4513,month#4514,day#4515] json

23/11/10 19:16:22 INFO FlintSpark: Describing index name flint_myglue_ds_tables_http_logs_skipping_index
23/11/10 19:16:22 INFO FlintOpenSearchClient: Checking if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
23/11/10 19:16:22 INFO FlintOpenSearchClient: Fetching Flint index metadata for flint_myglue_ds_tables_http_logs_skipping_index

23/11/10 19:16:22 INFO ApplyFlintSparkSkippingIndex: Found skipping index Some(FlintSparkSkippingIndex(myglue.ds_tables.http_logs,WrappedArray(ValueSetSkippingStrategy(VALUE_SET,status,int)),Some(year=1998 AND month=6 AND day >= 10),FlintSparkIndexOptions(Map(auto_refresh -> true, extra_options -> {"myglue.ds_tables.http_logs": {"maxFilesPerTrigger": "10"}}))))
23/11/10 19:16:22 INFO ApplyFlintSparkSkippingIndex: Found filter condition can be pushed down to skipping index: Some(('status = 500))

== Physical Plan ==
*(1) Project [@timestamp#4508, clientip#4509, status#4511]
+- *(1) Filter (isnotnull(status#4511) AND (status#4511 = 500))
   +- FileScan json ds_tables.http_logs[@timestamp#4508,clientip#4509,status#4511,year#4513,month#4514,day#4515] Batched: false, DataFilters: [isnotnull(status#4511), (status#4511 = 500)], Format: JSON, 
Location: FlintSparkSkippingFileIndex(1 paths)[s3://daichen-benchmark/httplogs/http_logs_partitioned_json_bz2], PartitionFilters: [], PushedFilters: [IsNotNull(status), EqualTo(status,500)], 
ReadSchema: struct<@timestamp:timestamp,clientip:string,status:int>

Verified that partial skipping index can hybrid scan source file not included in index data (day=9)

spark-sql> SELECT `@timestamp`, clientip, status FROM ds_tables.http_logs WHERE year = 1998 AND month = 6 AND day >= 9 AND status = 404 LIMIT 5;
1998-06-10 01:04:01	32.95.0.0	404
1998-06-10 04:48:48	90.135.1.0	404
1998-06-10 16:07:35	39.202.2.0	404
1998-06-10 19:27:09	20.223.12.0	404
1998-06-11 00:42:28	72.71.11.0	404
Time taken: 31.411 seconds, Fetched 5 row(s)

spark-sql> SELECT `@timestamp`, clientip, status FROM ds_tables.http_logs WHERE year = 1998 AND month = 6 AND day >= 9 AND status = 404 ORDER BY `@timestamp` LIMIT 5;
1998-06-09 00:00:00	179.57.0.0	404
1998-06-09 00:00:00	59.27.0.0	404
1998-06-09 00:00:03	218.6.0.0	404
1998-06-09 00:00:03	41.169.9.0	404
1998-06-09 00:00:04	80.250.8.0	404
Time taken: 125.703 seconds, Fetched 5 row(s)

Issues Resolved

#89

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added the enhancement New feature or request label Nov 2, 2023
@dai-chen dai-chen self-assigned this Nov 2, 2023
@dai-chen dai-chen marked this pull request as ready for review November 10, 2023 00:46
partitions
.flatMap(_.files.map(f => f.getPath.toUri.toString))
.toDF(FILE_PATH_COLUMN)
.join(indexScan, Seq(FILE_PATH_COLUMN), "left")
.filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter))
.join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "anti")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • left join still works if flint-core can support array types, right?
  • the reason we use left anti join is performance consideration, right? if yes, could we add test to guardian it?

Copy link
Collaborator Author

@dai-chen dai-chen Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on current understanding, Anti Semi join seems required. Even if we support push down optimization for array, the OR condition in previous Left Outer join cannot be pushed down to skipping index.

   *   SELECT left.file_path
   *   FROM partitions AS left
   *   LEFT JOIN indexScan AS right
   *     ON left.file_path = right.file_path
   *   WHERE right.file_path IS NULL
   *     OR [indexFilter]

@dai-chen
Copy link
Collaborator Author

Will resolve the conflicts and reopen for review later.

@dai-chen dai-chen closed this Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants