Skip to content

Commit

Permalink
review 1/n
Browse files Browse the repository at this point in the history
  • Loading branch information
jdcasale committed May 7, 2024
1 parent d03316c commit 388343b
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 119 deletions.
180 changes: 88 additions & 92 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use log::info;
use vortex_buffer::Buffer;
use vortex_dtype::flatbuffers::PType;
use vortex_dtype::half::f16;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::flatbuffers::Primitive;
use vortex_scalar::flatbuffers as fbs;
use vortex_scalar::Scalar::List;
use vortex_scalar::{ListScalar, Scalar};

Expand Down Expand Up @@ -147,12 +148,91 @@ impl<'v> ArrayView<'v> {

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
// fb fetch is just a pointer dereference, so we check that first
let from_fb = get_from_flatbuffer_array(self.array, stat);
if from_fb.is_some() {
return from_fb;
match stat {
Stat::IsConstant => {
let is_constant = self.array.stats()?.is_constant();
is_constant
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into())
}
Stat::IsSorted => self
.array
.stats()?
.is_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::IsStrictSorted => self
.array
.stats()?
.is_strict_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::Max => {
let max = self.array.stats()?.max();
max.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::Min => {
let min = self.array.stats()?.min();
min.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::RunCount => {
let rc = self.array.stats()?.run_count();
rc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::TrueCount => {
let tc = self.array.stats()?.true_count();
tc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::NullCount => {
let nc = self.array.stats()?.null_count();
nc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::BitWidthFreq => self
.array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
Stat::TrailingZeroFreq => self
.array
.stats()?
.trailing_zero_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
}
None
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
Expand All @@ -172,7 +252,7 @@ impl Statistics for ArrayView<'_> {
/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
unimplemented!()
info!("Cannot write stats to a view")
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
Expand Down Expand Up @@ -211,92 +291,8 @@ impl Statistics for ArrayView<'_> {
}
}

fn get_from_flatbuffer_array(array: fb::Array<'_>, stat: Stat) -> Option<Scalar> {
match stat {
Stat::IsConstant => {
let is_constant = array.stats()?.is_constant();
is_constant
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into())
}
Stat::IsSorted => array
.stats()?
.is_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::IsStrictSorted => array
.stats()?
.is_strict_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::Max => {
let max = array.stats()?.max();
max.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::Min => {
let min = array.stats()?.min();
min.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::RunCount => {
let rc = array.stats()?.run_count();
rc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::TrueCount => {
let tc = array.stats()?.true_count();
tc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::NullCount => {
let nc = array.stats()?.null_count();
nc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::BitWidthFreq => array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
Stat::TrailingZeroFreq => array
.stats()?
.trailing_zero_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
}
}

// TODO(@jcasale): move this to serde and make serde crate public?
fn primitive_to_scalar(v: Primitive) -> Option<Scalar> {
fn primitive_to_scalar(v: fbs::Primitive) -> Option<Scalar> {
let err_msg = "failed to deserialize invalid primitive scalar";
match v.ptype() {
PType::U8 => v
Expand Down
51 changes: 25 additions & 26 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers::{Array, ArrayArgs, ArrayStats, ArrayStatsArgs};
use vortex::flatbuffers as fb;
use vortex::{ArrayData, Context, ViewContext};
use vortex_dtype::{match_each_native_ptype, DType};
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer};
use vortex_scalar::Scalar::{Bool, Primitive};
use vortex_scalar::{BoolScalar, PrimitiveScalar};

use crate::flatbuffers::ipc as fb;
use crate::flatbuffers::ipc as fbi;
use crate::flatbuffers::ipc::Compression;
use crate::{missing, ALIGNMENT};

Expand All @@ -29,7 +29,7 @@ pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);
impl FlatBufferRoot for IPCMessage<'_> {}

impl WriteFlatBuffer for IPCMessage<'_> {
type Target<'a> = fb::Message<'a>;
type Target<'a> = fbi::Message<'a>;

fn write_flatbuffer<'fb>(
&self,
Expand All @@ -41,20 +41,20 @@ impl WriteFlatBuffer for IPCMessage<'_> {
Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(),
};

let mut msg = fb::MessageBuilder::new(fbb);
let mut msg = fbi::MessageBuilder::new(fbb);
msg.add_version(Default::default());
msg.add_header_type(match self {
Self::Context(_) => fb::MessageHeader::Context,
Self::Schema(_) => fb::MessageHeader::Schema,
Self::Chunk(_) => fb::MessageHeader::Chunk,
Self::Context(_) => fbi::MessageHeader::Context,
Self::Schema(_) => fbi::MessageHeader::Schema,
Self::Chunk(_) => fbi::MessageHeader::Chunk,
});
msg.add_header(header);
msg.finish()
}
}

impl<'a> WriteFlatBuffer for IPCContext<'a> {
type Target<'t> = fb::Context<'t>;
type Target<'t> = fbi::Context<'t>;

fn write_flatbuffer<'fb>(
&self,
Expand All @@ -67,27 +67,27 @@ impl<'a> WriteFlatBuffer for IPCContext<'a> {
.map(|e| e.id())
.map(|id| {
let encoding_id = fbb.create_string(id.as_ref());
fb::Encoding::create(
fbi::Encoding::create(
fbb,
&fb::EncodingArgs {
&fbi::EncodingArgs {
id: Some(encoding_id),
},
)
})
.collect_vec();
let fb_encodings = fbb.create_vector(fb_encodings.as_slice());

fb::Context::create(
fbi::Context::create(
fbb,
&fb::ContextArgs {
&fbi::ContextArgs {
encodings: Some(fb_encodings),
},
)
}
}

pub struct SerdeContextDeserializer<'a> {
pub(crate) fb: fb::Context<'a>,
pub(crate) fb: fbi::Context<'a>,
pub(crate) ctx: &'a Context,
}

Expand All @@ -111,19 +111,19 @@ impl<'a> TryFrom<SerdeContextDeserializer<'a>> for ViewContext {
}

impl<'a> WriteFlatBuffer for IPCSchema<'a> {
type Target<'t> = fb::Schema<'t>;
type Target<'t> = fbi::Schema<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let dtype = Some(self.0.write_flatbuffer(fbb));
fb::Schema::create(fbb, &fb::SchemaArgs { dtype })
fbi::Schema::create(fbb, &fbi::SchemaArgs { dtype })
}
}

impl<'a> WriteFlatBuffer for IPCChunk<'a> {
type Target<'t> = fb::Chunk<'t>;
type Target<'t> = fbi::Chunk<'t>;

fn write_flatbuffer<'fb>(
&self,
Expand All @@ -137,7 +137,7 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
let mut offset = 0;
for array_data in array_data.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
buffers.push(fb::Buffer::new(
buffers.push(fbi::Buffer::new(
offset as u64,
buffer.len() as u64,
Compression::None,
Expand All @@ -148,9 +148,9 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
}
let buffers = Some(fbb.create_vector(&buffers));

fb::Chunk::create(
fbi::Chunk::create(
fbb,
&fb::ChunkArgs {
&fbi::ChunkArgs {
array,
buffers,
buffer_size: offset as u64,
Expand All @@ -160,7 +160,7 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
}

impl<'a> WriteFlatBuffer for IPCArray<'a> {
type Target<'t> = Array<'t>;
type Target<'t> = fb::Array<'t>;

fn write_flatbuffer<'fb>(
&self,
Expand Down Expand Up @@ -194,9 +194,9 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {

let stats = compute_and_build_stats(fbb, self.1);

Array::create(
fb::Array::create(
fbb,
&ArrayArgs {
&fb::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
encoding,
Expand All @@ -212,7 +212,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
fn compute_and_build_stats<'a>(
fbb: &'_ mut FlatBufferBuilder<'a>,
array: &'_ ArrayData,
) -> WIPOffset<ArrayStats<'a>> {
) -> WIPOffset<fb::ArrayStats<'a>> {
let primitive_ptype = match array.dtype() {
DType::Primitive(ptype, _) => Some(ptype),
_ => None,
Expand Down Expand Up @@ -293,7 +293,7 @@ fn compute_and_build_stats<'a>(
})
.map(|v| fbb.create_vector(v.as_slice()));

let stat_args = &ArrayStatsArgs {
let stat_args = &fb::ArrayStatsArgs {
min,
max,
is_sorted,
Expand All @@ -306,6 +306,5 @@ fn compute_and_build_stats<'a>(
trailing_zero_freq,
};

let stats = ArrayStats::create(fbb, stat_args);
stats
fb::ArrayStats::create(fbb, stat_args)
}
1 change: 0 additions & 1 deletion vortex-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ mod tests {
use vortex_scalar::ListScalarVec;

use crate::iter::FallibleLendingIterator;
// use crate::messages::IPCMessage::Context;
use crate::reader::StreamReader;
use crate::writer::StreamWriter;

Expand Down

0 comments on commit 388343b

Please sign in to comment.