Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Jul 17, 2024
1 parent e09d897 commit 6d0c352
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 24 deletions.
10 changes: 5 additions & 5 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::SystemTime;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format};
use futures::future::try_join_all;
use indicatif::ProgressBar;
// use indicatif::ProgressBar;
use prettytable::{Cell, Row, Table};

#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
Expand Down Expand Up @@ -38,14 +38,14 @@ async fn main() {
}

// Setup a progress bar
let progress = ProgressBar::new(21 * formats.len() as u64);
// let progress = ProgressBar::new(21 * formats.len() as u64);

// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
for (q, query) in tpch_queries() {
let _ctxs = ctxs.clone();
let _tx = rows_tx.clone();
let _progress = progress.clone();
// let _progress = progress.clone();
rayon::spawn_fifo(move || {
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", q)));
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn main() {
let fastest = measure.iter().cloned().min().unwrap();
elapsed_us.push(fastest);

_progress.inc(1);
// _progress.inc(1);
}

let baseline = elapsed_us.first().unwrap();
Expand Down Expand Up @@ -131,6 +131,6 @@ async fn main() {
table.add_row(row);
}

progress.finish();
// progress.finish();
table.printstd();
}
4 changes: 2 additions & 2 deletions encodings/dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
let mut lookup_dict: HashMap<u64, (), ()> = HashMap::with_hasher(());
let mut codes: Vec<u64> = Vec::with_capacity(lower);
let mut bytes: Vec<u8> = Vec::new();
let mut offsets: Vec<u64> = Vec::new();
let mut offsets: Vec<u32> = Vec::new();
offsets.push(0);

if dtype.is_nullable() {
Expand All @@ -133,7 +133,7 @@ where
RawEntryMut::Vacant(vac) => {
let next_code = offsets.len() as u64 - 1;
bytes.extend_from_slice(byte_ref);
offsets.push(bytes.len() as u64);
offsets.push(bytes.len() as u32);
vac.insert_with_hasher(value_hash, next_code, (), |idx| {
hasher.hash_one(lookup_bytes(
offsets.as_slice(),
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn take_nullable<I: NativePType, O: NativePType>(
indices: &[I],
null_buffer: NullBuffer,
) -> VarBinArray {
let mut builder = VarBinBuilder::<I>::with_capacity(indices.len());
let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
for &idx in indices {
let idx = idx.to_usize().unwrap();
if null_buffer.is_valid(idx) {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl VarBinArray {
dtype: DType,
) -> Self {
let iter = iter.into_iter();
let mut builder = VarBinBuilder::<u64>::with_capacity(iter.size_hint().0);
let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
for v in iter {
builder.push(v.as_ref().map(|o| o.as_ref()));
}
Expand Down
14 changes: 5 additions & 9 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl TableProvider for VortexMemTable {
filters
.iter()
.map(|expr| {
if can_be_pushed_down(expr)? {
if can_be_pushed_down(expr) {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
Expand Down Expand Up @@ -289,8 +289,8 @@ fn make_filter_then_take_plan(
}

/// Check if the given expression tree can be pushed down into the scan.
fn can_be_pushed_down(expr: &Expr) -> DFResult<bool> {
let r = match expr {
fn can_be_pushed_down(expr: &Expr) -> bool {
match expr {
Expr::BinaryExpr(expr) if expr.op == Operator::Eq => {
let lhs = expr.left.as_ref();
let rhs = expr.right.as_ref();
Expand All @@ -314,9 +314,7 @@ fn can_be_pushed_down(expr: &Expr) -> DFResult<bool> {
}

_ => false,
};

Ok(r)
}
}

/// Extract out the columns from our table referenced by the expression.
Expand Down Expand Up @@ -586,9 +584,7 @@ mod test {
right: Box::new(lit("F")),
};
let e = Expr::BinaryExpr(e);
println!("{e:?}");

let r = can_be_pushed_down(&e).unwrap();
assert!(r);
assert!(can_be_pushed_down(&e));
}
}
7 changes: 1 addition & 6 deletions vortex-datafusion/src/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ impl Stream for RowIndicesStream {

// TODO(adamg): Filter on vortex arrays
let array = ExpressionEvaluator::eval(vortex_struct, &this.conjunction_expr).unwrap();
let selection = array
.into_bool()
.unwrap()
.into_canonical()
.unwrap()
.into_arrow();
let selection = array.into_canonical().unwrap().into_arrow();

// Convert the `selection` BooleanArray into a UInt64Array of indices.
let selection_indices = selection
Expand Down

0 comments on commit 6d0c352

Please sign in to comment.