From 388343bdea41acf05c57197e2aeb3b15ee93300f Mon Sep 17 00:00:00 2001 From: Josh Casale Date: Tue, 7 May 2024 09:10:26 -0400 Subject: [PATCH] review 1/n --- vortex-array/src/view.rs | 180 ++++++++++++++++++------------------- vortex-ipc/src/messages.rs | 51 ++++++----- vortex-ipc/src/reader.rs | 1 - 3 files changed, 113 insertions(+), 119 deletions(-) diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 2e838c5d7e..b2a694c8de 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -2,12 +2,13 @@ use std::fmt::{Debug, Formatter}; use enum_iterator::all; use itertools::Itertools; +use log::info; use vortex_buffer::Buffer; use vortex_dtype::flatbuffers::PType; use vortex_dtype::half::f16; use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; -use vortex_scalar::flatbuffers::Primitive; +use vortex_scalar::flatbuffers as fbs; use vortex_scalar::Scalar::List; use vortex_scalar::{ListScalar, Scalar}; @@ -147,12 +148,91 @@ impl<'v> ArrayView<'v> { impl Statistics for ArrayView<'_> { fn get(&self, stat: Stat) -> Option { - // fb fetch is just a pointer dereference, so we check that first - let from_fb = get_from_flatbuffer_array(self.array, stat); - if from_fb.is_some() { - return from_fb; + match stat { + Stat::IsConstant => { + let is_constant = self.array.stats()?.is_constant(); + is_constant + .and_then(|v| v.type__as_bool()) + .map(|v| v.value().into()) + } + Stat::IsSorted => self + .array + .stats()? + .is_sorted() + .and_then(|v| v.type__as_bool()) + .map(|v| v.value().into()), + Stat::IsStrictSorted => self + .array + .stats()? + .is_strict_sorted() + .and_then(|v| v.type__as_bool()) + .map(|v| v.value().into()), + Stat::Max => { + let max = self.array.stats()?.max(); + max.and_then(|v| v.type__as_primitive()) + .and_then(primitive_to_scalar) + } + Stat::Min => { + let min = self.array.stats()?.min(); + min.and_then(|v| v.type__as_primitive()) + .and_then(primitive_to_scalar) + } + Stat::RunCount => { + let rc = self.array.stats()?.run_count(); + rc.and_then(|v| v.type__as_primitive()) + .and_then(primitive_to_scalar) + } + Stat::TrueCount => { + let tc = self.array.stats()?.true_count(); + tc.and_then(|v| v.type__as_primitive()) + .and_then(primitive_to_scalar) + } + Stat::NullCount => { + let nc = self.array.stats()?.null_count(); + nc.and_then(|v| v.type__as_primitive()) + .and_then(primitive_to_scalar) + } + Stat::BitWidthFreq => self + .array + .stats()? + .bit_width_freq() + .map(|v| { + v.iter() + .flat_map(|v| { + primitive_to_scalar( + v.type__as_primitive() + .expect("Should only ever produce primitives"), + ) + }) + .collect_vec() + }) + .map(|v| { + List(ListScalar::new( + DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable), + Some(v), + )) + }), + Stat::TrailingZeroFreq => self + .array + .stats()? + .trailing_zero_freq() + .map(|v| { + v.iter() + .flat_map(|v| { + primitive_to_scalar( + v.type__as_primitive() + .expect("Should only ever produce primitives"), + ) + }) + .collect_vec() + }) + .map(|v| { + List(ListScalar::new( + DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable), + Some(v), + )) + }), } - None } /// NB: part of the contract for to_set is that it does not do any expensive computation. @@ -172,7 +252,7 @@ impl Statistics for ArrayView<'_> { /// We want to avoid any sort of allocation on instantiation of the ArrayView, so we /// do not allocate a stats_set to cache values. fn set(&self, _stat: Stat, _value: Scalar) { - unimplemented!() + info!("Cannot write stats to a view") } fn compute(&self, stat: Stat) -> Option { @@ -211,92 +291,8 @@ impl Statistics for ArrayView<'_> { } } -fn get_from_flatbuffer_array(array: fb::Array<'_>, stat: Stat) -> Option { - match stat { - Stat::IsConstant => { - let is_constant = array.stats()?.is_constant(); - is_constant - .and_then(|v| v.type__as_bool()) - .map(|v| v.value().into()) - } - Stat::IsSorted => array - .stats()? - .is_sorted() - .and_then(|v| v.type__as_bool()) - .map(|v| v.value().into()), - Stat::IsStrictSorted => array - .stats()? - .is_strict_sorted() - .and_then(|v| v.type__as_bool()) - .map(|v| v.value().into()), - Stat::Max => { - let max = array.stats()?.max(); - max.and_then(|v| v.type__as_primitive()) - .and_then(primitive_to_scalar) - } - Stat::Min => { - let min = array.stats()?.min(); - min.and_then(|v| v.type__as_primitive()) - .and_then(primitive_to_scalar) - } - Stat::RunCount => { - let rc = array.stats()?.run_count(); - rc.and_then(|v| v.type__as_primitive()) - .and_then(primitive_to_scalar) - } - Stat::TrueCount => { - let tc = array.stats()?.true_count(); - tc.and_then(|v| v.type__as_primitive()) - .and_then(primitive_to_scalar) - } - Stat::NullCount => { - let nc = array.stats()?.null_count(); - nc.and_then(|v| v.type__as_primitive()) - .and_then(primitive_to_scalar) - } - Stat::BitWidthFreq => array - .stats()? - .bit_width_freq() - .map(|v| { - v.iter() - .flat_map(|v| { - primitive_to_scalar( - v.type__as_primitive() - .expect("Should only ever produce primitives"), - ) - }) - .collect_vec() - }) - .map(|v| { - List(ListScalar::new( - DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable), - Some(v), - )) - }), - Stat::TrailingZeroFreq => array - .stats()? - .trailing_zero_freq() - .map(|v| { - v.iter() - .flat_map(|v| { - primitive_to_scalar( - v.type__as_primitive() - .expect("Should only ever produce primitives"), - ) - }) - .collect_vec() - }) - .map(|v| { - List(ListScalar::new( - DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable), - Some(v), - )) - }), - } -} - // TODO(@jcasale): move this to serde and make serde crate public? -fn primitive_to_scalar(v: Primitive) -> Option { +fn primitive_to_scalar(v: fbs::Primitive) -> Option { let err_msg = "failed to deserialize invalid primitive scalar"; match v.ptype() { PType::U8 => v diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 7b5fac5856..2c886a7aeb 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -1,6 +1,6 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; -use vortex::flatbuffers::{Array, ArrayArgs, ArrayStats, ArrayStatsArgs}; +use vortex::flatbuffers as fb; use vortex::{ArrayData, Context, ViewContext}; use vortex_dtype::{match_each_native_ptype, DType}; use vortex_error::{vortex_err, VortexError}; @@ -8,7 +8,7 @@ use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer}; use vortex_scalar::Scalar::{Bool, Primitive}; use vortex_scalar::{BoolScalar, PrimitiveScalar}; -use crate::flatbuffers::ipc as fb; +use crate::flatbuffers::ipc as fbi; use crate::flatbuffers::ipc::Compression; use crate::{missing, ALIGNMENT}; @@ -29,7 +29,7 @@ pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); impl FlatBufferRoot for IPCMessage<'_> {} impl WriteFlatBuffer for IPCMessage<'_> { - type Target<'a> = fb::Message<'a>; + type Target<'a> = fbi::Message<'a>; fn write_flatbuffer<'fb>( &self, @@ -41,12 +41,12 @@ impl WriteFlatBuffer for IPCMessage<'_> { Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(), }; - let mut msg = fb::MessageBuilder::new(fbb); + let mut msg = fbi::MessageBuilder::new(fbb); msg.add_version(Default::default()); msg.add_header_type(match self { - Self::Context(_) => fb::MessageHeader::Context, - Self::Schema(_) => fb::MessageHeader::Schema, - Self::Chunk(_) => fb::MessageHeader::Chunk, + Self::Context(_) => fbi::MessageHeader::Context, + Self::Schema(_) => fbi::MessageHeader::Schema, + Self::Chunk(_) => fbi::MessageHeader::Chunk, }); msg.add_header(header); msg.finish() @@ -54,7 +54,7 @@ impl WriteFlatBuffer for IPCMessage<'_> { } impl<'a> WriteFlatBuffer for IPCContext<'a> { - type Target<'t> = fb::Context<'t>; + type Target<'t> = fbi::Context<'t>; fn write_flatbuffer<'fb>( &self, @@ -67,9 +67,9 @@ impl<'a> WriteFlatBuffer for IPCContext<'a> { .map(|e| e.id()) .map(|id| { let encoding_id = fbb.create_string(id.as_ref()); - fb::Encoding::create( + fbi::Encoding::create( fbb, - &fb::EncodingArgs { + &fbi::EncodingArgs { id: Some(encoding_id), }, ) @@ -77,9 +77,9 @@ impl<'a> WriteFlatBuffer for IPCContext<'a> { .collect_vec(); let fb_encodings = fbb.create_vector(fb_encodings.as_slice()); - fb::Context::create( + fbi::Context::create( fbb, - &fb::ContextArgs { + &fbi::ContextArgs { encodings: Some(fb_encodings), }, ) @@ -87,7 +87,7 @@ impl<'a> WriteFlatBuffer for IPCContext<'a> { } pub struct SerdeContextDeserializer<'a> { - pub(crate) fb: fb::Context<'a>, + pub(crate) fb: fbi::Context<'a>, pub(crate) ctx: &'a Context, } @@ -111,19 +111,19 @@ impl<'a> TryFrom> for ViewContext { } impl<'a> WriteFlatBuffer for IPCSchema<'a> { - type Target<'t> = fb::Schema<'t>; + type Target<'t> = fbi::Schema<'t>; fn write_flatbuffer<'fb>( &self, fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { let dtype = Some(self.0.write_flatbuffer(fbb)); - fb::Schema::create(fbb, &fb::SchemaArgs { dtype }) + fbi::Schema::create(fbb, &fbi::SchemaArgs { dtype }) } } impl<'a> WriteFlatBuffer for IPCChunk<'a> { - type Target<'t> = fb::Chunk<'t>; + type Target<'t> = fbi::Chunk<'t>; fn write_flatbuffer<'fb>( &self, @@ -137,7 +137,7 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { let mut offset = 0; for array_data in array_data.depth_first_traversal() { if let Some(buffer) = array_data.buffer() { - buffers.push(fb::Buffer::new( + buffers.push(fbi::Buffer::new( offset as u64, buffer.len() as u64, Compression::None, @@ -148,9 +148,9 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { } let buffers = Some(fbb.create_vector(&buffers)); - fb::Chunk::create( + fbi::Chunk::create( fbb, - &fb::ChunkArgs { + &fbi::ChunkArgs { array, buffers, buffer_size: offset as u64, @@ -160,7 +160,7 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { } impl<'a> WriteFlatBuffer for IPCArray<'a> { - type Target<'t> = Array<'t>; + type Target<'t> = fb::Array<'t>; fn write_flatbuffer<'fb>( &self, @@ -194,9 +194,9 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { let stats = compute_and_build_stats(fbb, self.1); - Array::create( + fb::Array::create( fbb, - &ArrayArgs { + &fb::ArrayArgs { version: Default::default(), has_buffer: column_data.buffer().is_some(), encoding, @@ -212,7 +212,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { fn compute_and_build_stats<'a>( fbb: &'_ mut FlatBufferBuilder<'a>, array: &'_ ArrayData, -) -> WIPOffset> { +) -> WIPOffset> { let primitive_ptype = match array.dtype() { DType::Primitive(ptype, _) => Some(ptype), _ => None, @@ -293,7 +293,7 @@ fn compute_and_build_stats<'a>( }) .map(|v| fbb.create_vector(v.as_slice())); - let stat_args = &ArrayStatsArgs { + let stat_args = &fb::ArrayStatsArgs { min, max, is_sorted, @@ -306,6 +306,5 @@ fn compute_and_build_stats<'a>( trailing_zero_freq, }; - let stats = ArrayStats::create(fbb, stat_args); - stats + fb::ArrayStats::create(fbb, stat_args) } diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 2c7d30c801..1ef272f763 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -395,7 +395,6 @@ mod tests { use vortex_scalar::ListScalarVec; use crate::iter::FallibleLendingIterator; - // use crate::messages::IPCMessage::Context; use crate::reader::StreamReader; use crate::writer::StreamWriter;