Skip to content

Commit

Permalink
Deduplicate filter projection with result projection (#668)
Browse files Browse the repository at this point in the history
This was missing in #651
  • Loading branch information
robert3005 authored Aug 21, 2024
1 parent 1f6e81c commit a604fa8
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
8 changes: 7 additions & 1 deletion vortex-serde/src/layouts/read/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};

use bytes::BytesMut;
Expand Down Expand Up @@ -84,7 +85,12 @@ impl<R: VortexReadAt> LayoutReaderBuilder<R> {
Projection::All => (Projection::All, Projection::All),
Projection::Flat(mut v) => {
let original_len = v.len();
v.extend(filter_columns.into_iter());
let existing_fields: HashSet<Field> = v.iter().cloned().collect();
v.extend(
filter_columns
.into_iter()
.filter(|f| !existing_fields.contains(f)),
);
(
Projection::Flat(v),
Projection::Flat((0..original_len).map(Field::from).collect()),
Expand Down
6 changes: 5 additions & 1 deletion vortex-serde/src/layouts/read/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ impl Layout for FlatLayout {
}
FlatLayoutState::ReadBatch => {
let mut buf = self.cache.remove(&[]).ok_or_else(|| {
vortex_err!("Wrong state transition, message should have been fetched")
vortex_err!(
"Wrong state transition, message {:?} (with range {}) should have been fetched",
self.cache.absolute_id(&[]),
self.range
)
})?;

let mut array_reader = ArrayBufferReader::new();
Expand Down
8 changes: 8 additions & 0 deletions vortex-serde/src/stream_writer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};

use futures_util::{Stream, TryStreamExt};
use vortex::array::ChunkedArray;
use vortex::stream::ArrayStream;
Expand Down Expand Up @@ -98,6 +100,12 @@ pub struct ByteRange {
pub end: u64,
}

impl Display for ByteRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}, {})", self.begin, self.end)
}
}

#[allow(clippy::len_without_is_empty)]
impl ByteRange {
pub fn new(begin: u64, end: u64) -> Self {
Expand Down

0 comments on commit a604fa8

Please sign in to comment.