Skip to content

Commit

Permalink
More things and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Jul 29, 2024
1 parent 8175c2c commit 8349398
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 49 deletions.
8 changes: 3 additions & 5 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::chunked_reader::ChunkedArrayReader;
use vortex_serde::file::file_reader::FileReaderBuilder;
use vortex_serde::file::reader::VortexStreamBuilder;
use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite};
use vortex_serde::writer::ArrayWriter;
use vortex_serde::MessageReader;
Expand Down Expand Up @@ -180,15 +180,13 @@ pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Arr
let indices_array = indices.to_vec().into_array();

let file = TokioAdapter(tokio::fs::File::open(path).await?);
let reader = FileReaderBuilder::new(file)
let stream = VortexStreamBuilder::new(file)
.with_length(len)
.build()
.await
.unwrap();

let data = reader
.into_stream()
.await?
let data = stream
.collect::<Vec<_>>()
.await
.into_iter()
Expand Down
17 changes: 17 additions & 0 deletions vortex-dtype/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::hash::Hash;
use std::sync::Arc;

use itertools::Itertools;
use vortex_error::{vortex_bail, VortexResult};
use DType::*;

use crate::nullability::Nullability;
Expand Down Expand Up @@ -154,6 +155,22 @@ impl StructDType {
pub fn dtypes(&self) -> &Arc<[DType]> {
&self.dtypes
}

pub fn project(&self, indices: &[usize]) -> VortexResult<Self> {
let mut names = vec![];
let mut dtypes = vec![];

for &idx in indices.iter() {
if idx > self.names.len() {
vortex_bail!("Projection column is out of bounds");
}

names.push(self.names[idx].clone());
dtypes.push(self.dtypes[idx].clone());
}

Ok(StructDType::new(names.into(), dtypes))
}
}

#[cfg(test)]
Expand Down
9 changes: 5 additions & 4 deletions vortex-serde/src/file/file_writer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::mem;

use flatbuffers::{FlatBufferBuilder, WIPOffset};
Expand Down Expand Up @@ -158,10 +159,10 @@ impl<W: VortexWrite> FileWriter<W> {
unreachable!("Values are a structarray")
};

let mut column_layouts = Vec::with_capacity(self.column_chunks.len());
let mut column_layouts = VecDeque::with_capacity(self.column_chunks.len());

for mut chunk in mem::take(&mut self.column_chunks) {
let mut chunks = Vec::new();
let mut chunks = VecDeque::new();

let len = chunk.byte_offsets.len() - 1;
let byte_counts = chunk
Expand Down Expand Up @@ -210,11 +211,11 @@ impl<W: VortexWrite> FileWriter<W> {
let metadata_table_begin = self.msgs.tell();
self.msgs.write_dtype(metadata_array.dtype()).await?;
self.msgs.write_batch(metadata_array.into_array()).await?;
chunks.push(Layout::Flat(FlatLayout::new(
chunks.push_back(Layout::Flat(FlatLayout::new(
metadata_table_begin,
self.msgs.tell(),
)));
column_layouts.push(Layout::Chunked(ChunkedLayout::new(chunks)));
column_layouts.push_back(Layout::Chunked(ChunkedLayout::new(chunks)));
}

Ok(StructLayout::new(column_layouts))
Expand Down
12 changes: 7 additions & 5 deletions vortex-serde/src/file/layouts.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

use flatbuffers::{FlatBufferBuilder, WIPOffset};
use vortex_error::{VortexError, VortexResult};
use vortex_flatbuffers::WriteFlatBuffer;
Expand Down Expand Up @@ -112,7 +114,7 @@ impl FlatLayout {

#[derive(Debug, Clone)]
pub struct ChunkedLayout {
pub(crate) children: Vec<Layout>,
pub(crate) children: VecDeque<Layout>,
}

impl WriteFlatBuffer for ChunkedLayout {
Expand Down Expand Up @@ -140,7 +142,7 @@ impl WriteFlatBuffer for ChunkedLayout {
}

impl ChunkedLayout {
pub fn new(child_ranges: Vec<Layout>) -> Self {
pub fn new(child_ranges: VecDeque<Layout>) -> Self {
Self {
children: child_ranges,
}
Expand All @@ -154,7 +156,7 @@ impl ChunkedLayout {
// TODO(robert): Should struct layout store a schema? How do you pick a child by name
#[derive(Debug, Clone)]
pub struct StructLayout {
pub(crate) children: Vec<Layout>,
pub(crate) children: VecDeque<Layout>,
}

impl WriteFlatBuffer for StructLayout {
Expand Down Expand Up @@ -182,7 +184,7 @@ impl WriteFlatBuffer for StructLayout {
}

impl StructLayout {
pub fn new(child_ranges: Vec<Layout>) -> Self {
pub fn new(child_ranges: VecDeque<Layout>) -> Self {
Self {
children: child_ranges,
}
Expand All @@ -198,7 +200,7 @@ impl TryFrom<fb::NestedLayout<'_>> for Layout {
.unwrap()
.iter()
.map(Layout::try_from)
.collect::<VortexResult<Vec<_>>>()?;
.collect::<VortexResult<VecDeque<_>>>()?;
match value.encoding() {
1 => Ok(Layout::Chunked(ChunkedLayout::new(children))),
2 => Ok(Layout::Struct(StructLayout::new(children))),
Expand Down
2 changes: 1 addition & 1 deletion vortex-serde/src/file/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod file_reader;
pub mod file_writer;
mod footer;
mod layouts;
pub mod reader;

pub const FULL_FOOTER_SIZE: usize = 20;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
use projections::Projection;
use schema::Schema;
use vortex::array::struct_::StructArray;
use vortex::{Array, IntoArray};
use vortex_dtype::DType;
Expand All @@ -16,35 +18,50 @@ use crate::file::footer::Footer;
use crate::io::VortexReadAt;
use crate::{ArrayBufferReader, ReadResult};

pub struct FileReader<R> {
inner: R,
footer: Footer,
}
pub mod projections;
pub mod schema;

pub struct FileReaderBuilder<R> {
pub struct VortexStreamBuilder<R> {
reader: R,
projection: Option<Projection>,
len: Option<u64>,
}

impl<R: VortexReadAt> FileReaderBuilder<R> {
impl<R: VortexReadAt> VortexStreamBuilder<R> {
const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024;
const FOOTER_TRAILER_SIZE: usize = 20;

pub fn new(reader: R) -> Self {
Self { reader, len: None }
Self {
reader,
projection: None,
len: None,
}
}

pub fn with_length(mut self, len: u64) -> Self {
self.len = Some(len);
self
}

pub async fn build(mut self) -> VortexResult<FileReader<R>> {
pub fn with_projection(mut self, projection: Projection) -> Self {
self.projection = Some(projection);
self
}

pub async fn build(mut self) -> VortexResult<VortexStream<R>> {
let footer = self.read_footer().await?;

Ok(FileReader {
footer,
inner: self.reader,
let layout = footer.layout()?;
let dtype = footer.dtype()?;

Ok(VortexStream {
layout,
dtype,
projection: self.projection,
reader: Some(self.reader),
state: StreamingState::default(),
context: Default::default(),
})
}

Expand Down Expand Up @@ -102,30 +119,26 @@ impl<R: VortexReadAt> FileReaderBuilder<R> {
}
}

impl<R: VortexReadAt> FileReader<R> {
pub async fn into_stream(self) -> VortexResult<FileReaderStream<R>> {
let footer = self.footer;
let layout = footer.layout()?;
let dtype = footer.dtype()?;

Ok(FileReaderStream {
layout,
dtype,
reader: Some(self.inner),
state: StreamingState::default(),
context: Default::default(),
})
}
}

pub struct FileReaderStream<R> {
pub struct VortexStream<R> {
layout: Layout,
dtype: DType,
reader: Option<R>,
projection: Option<Projection>,
state: StreamingState<R>,
context: Arc<vortex::Context>,
}

impl<R> VortexStream<R> {
pub fn schema(&self) -> VortexResult<Schema> {
let struct_schema = self.dtype.as_struct().cloned().unwrap();
let dtype = match self.projection.as_ref() {
Some(projection) => struct_schema.project(projection.indices())?,
None => struct_schema,
};
Ok(Schema(dtype))
}
}

type StreamStateFuture<R> = BoxFuture<'static, VortexResult<(Vec<(Arc<str>, BytesMut, DType)>, R)>>;

#[derive(Default)]
Expand All @@ -152,7 +165,7 @@ impl ColumnInfo {
}
}

impl<R: VortexReadAt + Unpin + Send + 'static> Stream for FileReaderStream<R> {
impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexStream<R> {
type Item = VortexResult<Array>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -169,7 +182,7 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for FileReaderStream<R> {
if layout.children.len() == 1 {
return Poll::Ready(None);
} else {
layouts.push(layout.children.remove(0));
layouts.push(layout.children.pop_front().unwrap());
}
}

Expand Down Expand Up @@ -281,12 +294,11 @@ mod tests {
writer = writer.write_array_columns(st.into_array()).await.unwrap();
let written = writer.finalize().await.unwrap();

let reader = FileReaderBuilder::new(written).build().await.unwrap();
let layout = reader.footer.layout().unwrap();
let mut builder = VortexStreamBuilder::new(written);
let layout = builder.read_footer().await.unwrap().layout().unwrap();
dbg!(layout);

let mut stream = reader.into_stream().await.unwrap();

let mut stream = builder.build().await.unwrap();
let mut cnt = 0;

while let Some(array) = stream.next().await {
Expand Down
21 changes: 21 additions & 0 deletions vortex-serde/src/file/reader/projections.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub struct Projection {
indices: Vec<usize>,
}

impl Projection {
pub fn new(indices: impl AsRef<[usize]>) -> Self {
Projection {
indices: Vec::from(indices.as_ref()),
}
}

pub fn indices(&self) -> &[usize] {
self.indices.as_ref()
}
}

impl From<Vec<usize>> for Projection {
fn from(indices: Vec<usize>) -> Self {
Self { indices }
}
}
16 changes: 16 additions & 0 deletions vortex-serde/src/file/reader/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use vortex_dtype::{FieldNames, StructDType};
use vortex_error::VortexResult;

use super::projections::Projection;

pub struct Schema(pub(crate) StructDType);

impl Schema {
pub fn fields(&self) -> &FieldNames {
self.0.names()
}

pub fn project(&self, projection: Projection) -> VortexResult<Self> {
self.0.project(projection.indices()).map(Self)
}
}

0 comments on commit 8349398

Please sign in to comment.