Need advice on how to design and build a query engine using datafusion over custom storage structure on S3 #12674
Unanswered
nitishpRedbus
asked this question in
Q&A
Replies: 1 comment 1 reply
-
DataFusion is built as a library with many extension points. Plenty to be a powerful tool for your use-case IMO. Here's a couple pointers I can think of:
|
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi all,
I'm a software engineer from India and I am new to datafusion, so I thank you all for building such a great product.
I have a problem which i believe can be solved using data fusion and Apache Arrow, so need your valuable advice. Please forgive me as its a bit of a lengthy post.
Context:
In my company, we ingest a large volume JSON data generated as part of API request-response from every service that is deployed, we only extract a small part of it for analytics and we had a requirement to store the raw data in S3 for later processing or debugging (data is oil).
Initially, we researched on what are the ways to store this data and most solutions recommended were as follows
Compress the JSON into JSON-L text files and store then in S3, and extract them using Flink or Spark. However, this approach felt inefficient since every read required a resource-intensive pipeline which is cumbersome to implement (schema on read) and could be costly.
Store the entire JSON in a NoSQL document database like MongoDB, however the data we generate is around an average 3 to 4 TiB per day (and growing) so storing so much in a MongoDb would be very costly.
1. Storage
From beginning we wanted to leverage AWS S3 for storage as it is a cost effective solution, but we wanted the data to be stored in a more efficient file format like parquet, so that 'schema on read' can be avoided which in turn reduces 'extraction' cost, however the main problem parquet has a fixed schema but the JSON data schema was unknown and continuously evolves.
Since most of the API data follows a flexible schema but they are in the bounds of the API contract (which no one knows), the only issue was with tracking schema evolution.
Every release the API contract may evolve, but across the org we have a global contract within these API's which stipulates to always add few common parameters in order to drive analytics (e.g., sourceId, eventTypeId, countryId, eventTime).
So the solution was if we could build a system that can 'track' the evolution of the JSON data, then we can map the json schema to parquet schema and store the json data in parquet.
So following this idea we built a simple library on top of jackson that can detect schema evolution and a Flink-based pipeline that ingests data from kafka, using the library detects the incoming JSON schema, groups similar events, and converts the common schema into Parquet, and finally writes the data into parquet using the parquet schema.
The schema follows a "merge" logic either creating new schema or merging into existing ones, or evolving existing schema (this idea is similar to how spark does json processing)
We called each of finalised schema a "bucket" (kind of like table) assigning a unique bucketID and tracking schema evolution as versions (via timestamp).
Here’s the simplified flow:
3.1. If no match: Create a new bucket with the schema.
3.2. If matched: Use the existing bucket schema.
3.3. If schema evolution is needed: Evolve the existing bucket schema (in turn evolve parquet schema)
We do not delete any columns but rather 'merge' columns (or structs), during the matching process, we also 'up cast' data types according to predefined rules:
For Primitive Data Types, the Merge Rules Are:
For Structs, the Merge Rules Are:
For arrays, also we follow same rules for merging, except if an array has more than 1 type (this usually happens if we have array of structs, then structs overall types are different), then we store the entire array as 'string'.
This approach worked really well because, in the worst case, I've observed up to 14 different 'buckets' (or bucket schemas) for a single event type. This variation likely arises because APIs may be designed for different countries or versions.
We were able to store raw data into S3 and achieve a 30–50% compression ratio with this method (it can be even increased using parquet compaction), as this data is streamed it is available for extraction in under an hour.
The file organization structure in s3 is as follows : s3://s3_bucket/raw_data/${sourceId}/${countryId}/${partitionHour}/${eventTypeId}/${bucketId}/${versionTime}_${uuid}.parquet
2. Extraction
As starter we implemented a simple light weight python application deployed in AWS Fargate (2 core, 4gb RAM) and uses Apache Arrow (we tried duckdb but we had trouble handling multiple schemes within a bucket), which allows us to filter and extract raw data from multiple parquet within 2–3 minutes for any hourly time interval. The final result is streamed and stored in sqlite (using JsonB) and then shared to end user.
We also built a simple UI over this which displays this data to end users in a simple and intuitive way.
This worked very well because now people can actually view 'raw' data and is currently used across our company for debugging.
Now with the success of this idea, we wish to take this to next level by building a server less 'query engine' on top of this.
I initially thought of leveraging Iceberg and checked with Apache Iceberg community, but they said that the 'schema evolution' would be difficult to track within iceberg and hence i feel incorporating the current design into Iceberg may be very difficult or impossible.
So Apache datafusion looks like a good fit, if i can leverage datafusion with serveless components eg: AWS lambda/Fargate, for building a light weight query engine over this raw data storage, then I can open up possibilities to end users on leveraging raw data using simple SQL syntax, which to me is a very powerful idea.
However i am completely new to query engines and i currently do not have the technical know on building a query engine of this scale, so it would be great if any of you could provide some valuable advice or materials on how i can leverage datafusion and apache arrow to build this.
Thank you.
Beta Was this translation helpful? Give feedback.
All reactions