From 0942d556c80616cd4ad3cf8b1427d3477f666ae9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 5 Dec 2024 08:29:15 -0500 Subject: [PATCH] chore: simplify MetadataFetcher to function (#1569) --- vortex-datafusion/src/persistent/format.rs | 5 +- vortex-file/src/read/metadata.rs | 58 +++++++--------------- 2 files changed, 19 insertions(+), 44 deletions(-) diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 42e40cd4f9..e7512f9c83 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -20,7 +20,7 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex_array::array::StructArray; use vortex_array::arrow::infer_schema; use vortex_array::Context; -use vortex_file::metadata::MetadataFetcher; +use vortex_file::metadata::fetch_metadata; use vortex_file::{ read_initial_bytes, read_layout_from_initial, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, Scan, VORTEX_FILE_EXTENSION, @@ -118,8 +118,7 @@ impl FileFormat for VortexFormat { stats.num_rows = Precision::Exact(row_count as usize); let metadata_table = - MetadataFetcher::fetch(os_read_at, io.into(), root_layout, layout_message_cache) - .await?; + fetch_metadata(os_read_at, io.into(), root_layout, layout_message_cache).await?; if let Some(metadata) = metadata_table { let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); diff --git a/vortex-file/src/read/metadata.rs b/vortex-file/src/read/metadata.rs index 66dc9c4c8b..9c3f2d4909 100644 --- a/vortex-file/src/read/metadata.rs +++ b/vortex-file/src/read/metadata.rs @@ -1,9 +1,5 @@ -use std::future::Future; use std::iter; -use std::iter::Once; -use std::pin::Pin; use std::sync::{Arc, RwLock}; -use std::task::{ready, Context, Poll}; use futures_util::{stream, StreamExt}; use vortex_array::ArrayData; @@ -14,17 +10,6 @@ use super::{LayoutMessageCache, LayoutReader}; use crate::read::buffered::{BufferedLayoutReader, ReadMasked}; use crate::{MessageRead, RowMask}; -type MetadataBufferedReader = BufferedLayoutReader< - R, - stream::Iter>>, - Vec>, - MetadataMaskReader, ->; - -pub struct MetadataFetcher { - metadata_reader: MetadataBufferedReader, -} - struct MetadataMaskReader { layout: Box, } @@ -46,30 +31,21 @@ impl ReadMasked for MetadataMaskReader { } } -impl MetadataFetcher { - pub fn fetch( - input: R, - dispatcher: Arc, - root_layout: Box, - layout_cache: Arc>, - ) -> Self { - let metadata_reader = BufferedLayoutReader::new( - input, - dispatcher, - stream::iter(iter::once(Ok(RowMask::new_valid_between(0, 1)))), - MetadataMaskReader::new(root_layout), - layout_cache, - ); - Self { metadata_reader } - } -} - -impl Future for MetadataFetcher { - type Output = VortexResult>>>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(ready!(self.metadata_reader.poll_next_unpin(cx)).transpose()) - } +pub async fn fetch_metadata( + input: R, + dispatcher: Arc, + root_layout: Box, + layout_cache: Arc>, +) -> VortexResult>>> { + let mut metadata_reader = BufferedLayoutReader::new( + input, + dispatcher, + stream::iter(iter::once(Ok(RowMask::new_valid_between(0, 1)))), + MetadataMaskReader::new(root_layout), + layout_cache, + ); + + metadata_reader.next().await.transpose() } #[cfg(test)] @@ -82,7 +58,7 @@ mod test { use vortex_buffer::{Buffer, BufferString}; use vortex_io::IoDispatcher; - use crate::metadata::MetadataFetcher; + use crate::metadata::fetch_metadata; use crate::{ read_initial_bytes, read_layout_from_initial, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, Scan, VortexFileWriter, @@ -142,7 +118,7 @@ mod test { ) .unwrap(); let io = IoDispatcher::default(); - let metadata_table = MetadataFetcher::fetch( + let metadata_table = fetch_metadata( written_bytes, io.into(), layout_reader,