Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Zero-ETL - Apache Iceberg Table Support #372

Open
dai-chen opened this issue Jun 7, 2024 · 4 comments
Open

[EPIC] Zero-ETL - Apache Iceberg Table Support #372

dai-chen opened this issue Jun 7, 2024 · 4 comments

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Jun 7, 2024

Is your feature request related to a problem?

Apache Iceberg is designed for managing large analytic tables in a scalable and performant way, using features like schema evolution, partitioning, and metadata management to optimize query performance. Despite these robust optimizations, the inherent latency of querying large datasets directly from S3 can be a pain point, especially for real-time analytics and interactive querying scenarios, when running complex or frequently accessed queries on large Iceberg tables.

TODO: current problem statement is more technical. need more feedback from real Iceberg customer.

What solution would you like?

Integrate current Flint’s query acceleration features with Iceberg to enhance performance:

  • Skipping Index: Implement Flint’s fine-grained skipping index to complement Iceberg’s metadata-based skipping, reducing the amount of data read and processed.
  • Covering Index: Allow Flint to create covering indices for frequently queried columns, decreasing the need to scan large S3 files and improving query response times.
  • Materialized Views: Support Flint’s materialized views backed by OpenSearch, enabling pre-computed results for complex queries, thus providing instant access and significantly reducing computation time.

TODO: evaluate missing features in #367

What alternatives have you considered?

N/A

Do you have any additional context?

Known issues related:

@dai-chen dai-chen added the feature New feature label Jun 7, 2024
@dai-chen dai-chen removed the untriaged label Jun 7, 2024
@dai-chen
Copy link
Collaborator Author

dai-chen commented Jun 11, 2024

Proof of Concept 1: Basic Integration of Flint Core Functionality with Iceberg Tables

Objectives

This PoC aims to evaluate the integration of Flint's core functionalities, including skipping index, covering index and materialized views, with Apache Iceberg tables.

The key questions to answer:

  1. Is a Flint skipping index helpful for Iceberg tables in certain case?
  2. How can Flint track changes in Iceberg tables on the fly?

Demo Query

This query demonstrates finding the top IP address pairs (source -> target) that have had their connections rejected in the past hour:

-- Identify the top IP address pairs with rejected connections in the last hour
SELECT
  src_endpoint.ip || '->' || dst_endpoint.ip AS ip_pair,
  action,
  COUNT(*) AS count
FROM vpc_flow_logs
WHERE action = 'REJECT'
  AND time_dt > (current_timestamp - interval '1' hour)
GROUP BY 1, 2
ORDER BY count DESC
LIMIT 25;

Indexing Support [2d]

Index Maintenance [2d]

  1. How does Flint track versions in Iceberg tables?
  2. How does Flint handle data expiration or compaction in Iceberg tables?
  3. How does Flint handle schema evolution in Iceberg tables?

Performance Benchmark [1d]

  1. Direct queries on Iceberg tables vs. queries accelerated by Flint's skipping index
  2. Direct queries on Iceberg tables vs. queries accelerated by Flint's covering index

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jun 11, 2024

Proof of Concept 2: Advanced Integration of Flint with Iceberg Tables

Enabling Advanced Search Capability

This part of the PoC explores the implementation of advanced search capabilities in Flint, integrated with Iceberg tables. Take full-text search capability for example, below is a demonstration query:

-- Identify the number of HTTP status occurrences with requests containing 'Chrome' in the past hour
SELECT
  status,
  COUNT(*) AS count
FROM http_logs
WHERE MATCH(request, 'Chrome')
  AND timestamp > (current_timestamp - interval '1' hour)
GROUP BY status;

Changes required:

  1. User-Defined Function (UDF) for Full-Text Search:
    • Implement a UDF that supports full-text search queries directly on Iceberg tables.
  2. Query Rewrite with Covering Index:
    • Query rewriting mechanisms in Flint to utilize the covering index for enhanced performance in text search queries.
  3. Search Pushdown to Covering Index Scan:
    • Enable search query conditions to be pushed down directly to OpenSearch DSL query on index.

Is skipping index helpful in this case?

Depending on latency requirement, it is possible to build skipping index such as:

  1. Value set of text
  2. Bloom filter of each token

Alternative to current enhanced covering index solution?

Covering indexes provide full search and dashboard capabilities, but it require indexing of all involved columns. An alternative is a non-covering (filtering) index, which indexes only the columns used in filters, with each index entry pointing back to the row ID in the source table.

To implement this, we need to answer the following questions:

  1. Is it feasible for users to create a non-covering index for the entire dataset?
  2. How can SQL syntax be differentiated to specify the use of non-covering versus covering indexes?
  3. If universal indexing isn’t viable, how can we enable on-demand index creation specific to individual queries?

Accelerating Iceberg Metadata Queries

The following SQL examples illustrate how Flint can be leveraged to accelerate queries against Iceberg's metadata, which is essential for schema management and data governance:

-- Example query to fetch historical metadata from an Iceberg table
TODO

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jun 17, 2024

Proof of Concept Conclusion [WIP]

Conclusion

  • Skipping Index

    • Iceberg does not support external skipping index from its current code structure [Potential solution: replace entire Iceberg relation operator]
    • Flint's skipping index will be invalidated after Iceberg table data compaction [Potential solution: assume append only log data and scan old data at that version]
    • Iceberg's column statistics (partition, min-max, value count, etc) is sufficient for analytical workloads without highly selective filtering conditions
  • Covering Index

  • Materialized View

  • Index Maintenance

    • Version tracking: progress is available in committed offset in streaming job checkpoint location
    • Snapshot expiration: Iceberg only supports append mode, so no micro batch generated
    • Data compaction: Iceberg generate an empty micro batch after compaction which should have no impact
    • Schema evolution: Schema is metadata-only change and no impact if new column not involved

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jun 18, 2024

High-Level Design and Task Breakdown

  1. User Experience
  2. Architecture
  3. Data Exploration
  4. Zero-ETL Support
  5. SparkSQL Query Acceleration
  6. Index Maintenance
  7. Testing

User Experience

Here we outline the end-to-end user experience, demonstrating the steps involved from initial data exploration through advanced query optimization and table management.

# Step 1: Data exploration
SELECT
  src_endpoint,
  dst_endpoint,
  action
FROM glue.iceberg.vpc_flow_logs -- limit size or sampling
LIMIT 10;

# Step 2: Zero-ETL by Flint index
CREATE INDEX src_dst_action ON glue.iceberg.vpc_flow_logs (
  src_endpoint,
  dst_endpoint,
  action
)
WHERE timestamp > (current_timestamp - interval '1' hour) -- partial indexing
WITH (
  auto_refresh = true
);

# Step 3a: Dashboard / DSL query Flint index directly
POST flint_glue_iceberg_vpc_flow_logs_src_dst_action_index
{
  ...
}

# Step 3b: SparkSQL query acceleration
# Identify the top IP address pairs with rejected connections in the last hour
SELECT
  src_endpoint.ip || '->' || dst_endpoint.ip AS ip_pair,
  action,
  COUNT(*) AS count
FROM glue.iceberg.vpc_flow_logs
WHERE action = 'REJECT'
  AND time_dt > (current_timestamp - interval '1' hour)
GROUP BY 1, 2
ORDER BY count DESC
LIMIT 25;

# Step 4: Iceberg table management
# Data compaction on a regular basis triggered manually or by Glue
CALL local.system.rewrite_data_files(
  table => 'glue.iceberg.vpc_flow_logs',
  options => map('rewrite-all', true)
);

# Step 5: Clean up
# User deletes unused covering index after analytics
DELETE INDEX src_dst_action ON glue.iceberg.vpc_flow_logs;
VACUUM INDEX src_dst_action ON glue.iceberg.vpc_flow_logs;

Architecture

Here is the architecture diagram that provides a comprehensive overview of the high-level design and key components:

Screenshot 2024-06-18 at 9 21 43 AM


Task Breakdown

Here presents the high-level task breakdown, providing a description of each task and its respective components. Please find more detailed task descriptions in the following sections:

Feature Component Priority Task Github Issue Comment
Data Exploration Catalog High Add Iceberg catalog config in Spark job params todo
Data Types High Support all Iceberg data types in direct query todo
Zero-ETL Covering Index Med Map source column to OpenSearch field type #384 OpenSearch table design related
Med Fix single OS index capacity issue #339
High Improve Flint data source reader performance #334
Materialized View Low Support event time ordering when cold start #90
SparkSQL Query Acceleration Skipping Index Med Disable skipping index create on Iceberg table todo
Covering Index High Query rewrite with partial covering index #298
Med Add more filtering condition pushdown #148 OpenSearch table design related
Materialized View Low Query rewrite with materialized view todo
Index Advisor Low Disable skipping index advisor on Iceberg table todo
Index Maintenance Index Data Freshness Low Index refresh idempotency #88
Med Include refresh status in show Flint index statement #385
Low Support hybrid scan for covering index #386
Index Management Low Support schema change in alter index statement #387

[TBD] Features in Data Exploration, Zero-ETL, and Query Acceleration above should also consider supporting Iceberg's current and historical snapshots, as well as branches and tags.


Data Exploration

Users can execute common DDL statement and direct SQL queries on Iceberg tables for ad-hoc data analytics. Flint must support the Iceberg catalog and fully accommodate Iceberg data types, ensuring seamless integration and comprehensive data analysis capabilities.

Iceberg Catalog

Configure Spark job parameters to activate the Iceberg catalog, ensuring compatibility with FlintDelegatingSessionCatalog. Ref: https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-iceberg.html

Iceberg Data Types

Flint must fully support all Iceberg data types, including complex structures like Struct, List, and Map, to ensure comprehensive data handling capabilities. Ref: https://iceberg.apache.org/docs/latest/spark-getting-started/#type-compatibility

Spark Iceberg Notes
boolean boolean
short integer
byte integer
integer integer
long long
float float
double double
date date
timestamp timestamp with timezone
timestamp_ntz timestamp without timezone
char string
varchar string
string string
binary binary
decimal decimal
struct struct
array list
map map

Zero-ETL Support

Users can load raw or aggregated data directly into OpenSearch via covering indexes and materialized views, enabling full-text search and dashboard capabilities without the need for an Extract, Transform, Load (ETL) process.

Covering Index Enhancement

Addressing limitations and improving the performance of covering indexes in issues below:

Materialized View Enhancement

Addressing limitations of materialized views:


SparkSQL Query Acceleration

Users continue to use the familiar SparkSQL interface and leverage OpenSearch's indexing capabilities to accelerate SparkSQL queries.

Data Skipping

  1. Disable Flint skipping index creation: Since Flint skipping indexes are not supported on Iceberg tables, utilize Iceberg's metadata indexing instead.
  2. Disable Skipping Index Advisor: Prevent users from using the ANALYZE index statement for skipping index recommendations.

Query Optimization

covering-index-acceleration

Support query rewriting for covering index (full or partial) and materialized view:


Index Maintenance

Index Data Freshness

Provides tool for user to inspect index data freshness and ensure up-to-date query results:

Index Management

  • Snapshot expiration: Index data will not update accordingly. This needs to be highlighted in the documentation.
  • Data compaction: The index refresh job automatically handles the empty micro batches generated.
  • Schema evolution: As Iceberg data schemas evolve, the index must adapt to accommodate new fields, types, and structures without causing disruptions.

Testing

Functional Testing

Functional testing ensures Iceberg support works with all existing components and features, and newly added features perform correctly and meet the specified requirements.

Screenshot 2024-06-27 at 12 37 11 PM

Performance Benchmark

Benchmarking performance for data exploration queries, zero-ETL ingestion, and SparkSQL query acceleration:

  • Direct query on Iceberg table
    • Performance
    • Load test in concurrent environment
  • Flint index building
    • Covering index building
    • Materialized index building
  • Flint query acceleration
    • Covering index acceleration
    • Materialized view acceleration

Issues related:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant