Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add covering index based query rewriter rule #318

Merged

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Apr 23, 2024

Description

This PR introduces the feature to rewrite applicable queries using covering indexes, enhancing the performance of SELECT queries by allowing faster data retrieval using these indexes.

PR Planned

  • Add covering index based query rewriter rule #318
  • Support query rewrite for Iceberg table (IT disabled temporarily)
  • Add logging or whyNot API to explain why/whyNot index applied
  • Support SQL hint for Spark conf, ex. CV rewrite, hybrid scan [TBD]
  • Support partial covering index rewrite [TBD]

Changes

  • Implemented query rewrite logic that detects when a covering index can be utilized for a query.
  • Added new SQL test cases to ensure proper functionality and to validate that queries are only rewritten when appropriate indexes are available and enabled.
  • Provided a new option to enable or disable query rewrites using covering indexes.

Testing

Added several unit and integration tests to cover:

  • UT: The basic functionality of query rewrite when a suitable covering index exists.
  • IT: Ensuring no rewrite occurs when covering indexes are disabled via Spark configuration.
  • IT: Correct interaction of query rewrites in presence of both covering and skipping indexes.

Examples:

# Create covering index as before
spark-sql> CREATE INDEX all ON ds_tables.http_logs
         > (`@timestamp`, clientip, request, status, size);
spark-sql> REFRESH INDEX all ON ds_tables.http_logs;
Time taken: 9593.567 seconds

# Disable covering index acceleration
spark-sql> set spark.flint.optimizer.covering.enabled=false;

spark-sql> EXPLAIN SELECT clientip, request AS cnt FROM ds_tables.http_logs WHERE status = 400;
== Physical Plan ==
*(1) Project [clientip#17, request#18 AS cnt#10]
+- *(1) Filter (isnotnull(status#19) AND (status#19 = 400))
   +- FileScan json ds_tables.http_logs[clientip#17,request#18,status#19,year#21,month#22,day#23]
Batched: false, DataFilters: [isnotnull(status#19), (status#19 = 400)], Format: JSON,
Location: FlintSparkSkippingFileIndex(1 paths)[s3://.../httplogs/http_logs_partitioned_json_bz2],
PartitionFilters: [], PushedFilters: [IsNotNull(status), EqualTo(status,400)],
ReadSchema: struct

# Enable covering index acceleration
spark-sql> set spark.flint.optimizer.covering.enabled=true;

spark-sql> EXPLAIN SELECT clientip, request AS cnt FROM ds_tables.http_logs WHERE status = 400;
== Physical Plan ==
*(1) Project [clientip#17, request#18 AS cnt#39]
+- BatchScan[@timestamp#16, request#18, size#20, clientip#17, status#19]
class org.apache.spark.sql.flint.FlintScan,
PushedPredicates: [status IS NOT NULL, status = 400]
 RuntimeFilters: []

# Join query
spark-sql> EXPLAIN
         > SELECT l.clientip, l.status, s.description
         > FROM ds_tables.http_logs l
         > JOIN ds_tables.http_status s ON l.status = s.status
         > WHERE clientip = '157.12.11.0';
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [clientip#17, status#19, description#51]
   +- BroadcastHashJoin [status#19], [status#50], Inner, BuildRight, false
      :- Project [clientip#17, status#19]
      :  +- BatchScan[@timestamp#16, request#18, size#20, clientip#17, status#19]
class org.apache.spark.sql.flint.FlintScan,
PushedPredicates: [clientip IS NOT NULL, clientip = '157.12.11.0', status IS NOT NULL]
RuntimeFilters: []
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=71]
         +- Filter isnotnull(status#50)
            +- FileScan parquet ds_tables.http_status[status#50,description#51] Batched: true,
DataFilters: [isnotnull(status#50)], Format: Parquet,
Location: InMemoryFileIndex(1 paths)[s3://.../httpstatus], PartitionFilters: [],
PushedFilters: [IsNotNull(status)], ReadSchema: struct

Issues Resolved

#298

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added feature New feature 0.4 labels Apr 23, 2024
@dai-chen dai-chen self-assigned this Apr 23, 2024
@dai-chen dai-chen changed the title Add covering index query rewriter rule Add covering index based query rewriter rule Apr 26, 2024
@dai-chen dai-chen marked this pull request as ready for review April 26, 2024 16:39
Signed-off-by: Chen Dai <[email protected]>
@dai-chen dai-chen requested a review from seankao-az April 29, 2024 19:43
@dai-chen dai-chen merged commit a8a376f into opensearch-project:main Apr 30, 2024
4 checks passed
@dai-chen dai-chen deleted the add-covering-index-query-rewriter branch April 30, 2024 18:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.4 feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants