Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shard level split on read path #402

Merged
merged 5 commits into from
Jul 9, 2024

Conversation

penghuo
Copy link
Collaborator

@penghuo penghuo commented Jun 27, 2024

Description

Test

  1. Test with EMR-S. The task is scheduled based on split count. For instance, index has 5 shards, then 5 tasks are scheduled.
24/06/29 00:24:43 INFO DAGScheduler: Got map stage job 0 (main at NativeMethodAccessorImpl.java:0) with 5 output partitions
24/06/29 00:24:43 INFO DAGScheduler: Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4))
  1. Test with AWS OpenSearch Multi-AZ with Standby domain. it support preference:_shard: paramaters.

Benchmark

Single Index with 5 shards

The table below presents the p90 query latency results for both the partitioned and non-partitioned test cases. Across all queries, the results with partitioning show significantly lower times compared to the non-partitioned results.

Query p90 ms (without partition) p90 ms (with partition)
SELECT COUNT(*) FROM dev.default.logs-181998 42296 17031
SELECT COUNT(*) FROM dev.default.logs-181998 WHERE status <> 0; 44142 16667
SELECT COUNT(*), AVG(size) FROM dev.default.logs-181998; 44584 18775
SELECT AVG(CAST(size AS BIGINT)) FROM dev.default.logs-181998; 43575 19474
SELECT MIN(@timestamp), MAX(@timestamp) FROM dev.default.logs-181998; 43533 19130
SELECT status, COUNT() FROM dev.default.logs-181998 WHERE status <> 0 GROUP BY status ORDER BY COUNT() DESC; 43952 19661

1

Multiple Indices, each index has 5 shards

The table below presents the p90 query latency results for both the partitioned and non-partitioned test cases when query index wildcard. Across all queries, the results with partitioning show significantly lower times compared to the non-partitioned results.

Query p90 ms (without partition) p90 ms (with partition)
SELECT COUNT() FROM dev.default.`logs-1` 228049 52123
SELECT COUNT() FROM dev.default.`logs-1` WHERE status <> 0; 215548 43631
SELECT COUNT(), AVG(size) FROM dev.default.`logs-1`; 222332 51583
SELECT AVG(CAST(size AS BIGINT)) FROM dev.default.`logs-1*`; 215273 51805
SELECT MIN(`@timestamp`), MAX(`@timestamp`) FROM dev.default.`logs-1*`; 224576 45914
SELECT status, COUNT() FROM dev.default.`logs-1` WHERE status <> 0 GROUP BY status ORDER BY COUNT(*) DESC; 189594 46758

2

Issues Resolved

#396

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@penghuo penghuo added enhancement New feature or request 0.5 labels Jun 27, 2024
@penghuo penghuo self-assigned this Jun 27, 2024
penghuo added 4 commits June 28, 2024 07:25
Signed-off-by: Peng Huo <[email protected]>
Signed-off-by: Peng Huo <[email protected]>
Signed-off-by: Peng Huo <[email protected]>
@penghuo penghuo marked this pull request as ready for review July 2, 2024 17:25
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String shardId, String query);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is shardId concept binding to FlintOpenSearchClient implementation or generic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bind to OpenSearch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can abstract this task/split info later.

* @param metadata
* Metadata of the table.
*/
case class OpenSearchTable(tableName: String, metadata: Map[String, FlintMetadata]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using FlintMetadata temporary for only fetching index setting? Or is there hard dependency between OS table and Flint index in future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only for index setting and mapping, not binding to real index

@penghuo penghuo merged commit 087a9df into opensearch-project:main Jul 9, 2024
4 checks passed
@penghuo penghuo mentioned this pull request Jul 15, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.5 enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants