Skip to content

Commit

Permalink
Static ArrayView (#310)
Browse files Browse the repository at this point in the history
Refactor ArrayView to use owned Arc'd vortex buffers.

There is a huge and largely compiler-based FLUP PR for this to actually
remove the lifetimes. It may or may not be possible to split that one
into smaller pieces. But this PR captures the logical changes.
  • Loading branch information
gatesn authored May 12, 2024
1 parent 57b8581 commit 7d223b0
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 72 deletions.
108 changes: 69 additions & 39 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::sync::Arc;

use enum_iterator::all;
use itertools::Itertools;
Expand All @@ -17,20 +19,25 @@ use crate::{Array, IntoArray, ToArray};
#[derive(Clone)]
pub struct ArrayView<'v> {
encoding: EncodingRef,
dtype: &'v DType,
array: fb::Array<'v>,
buffers: &'v [Buffer],
ctx: &'v ViewContext,
dtype: DType,
flatbuffer: Buffer,
flatbuffer_loc: usize,
// TODO(ngates): create an RC'd vector that can be lazily sliced.
buffers: Vec<Buffer>,
ctx: Arc<ViewContext>,
// TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array
// metadata, but only the buffers from the selected columns. Therefore we need to know
// which fb:Array children to skip when calculating how to slice into buffers.

// FIXME(ngates): while we refactor, leave the lifetime parameter in place.
_phantom: PhantomData<&'v ()>,
}

impl<'a> Debug for ArrayView<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrayView")
.field("encoding", &self.encoding)
.field("dtype", self.dtype)
.field("dtype", &self.dtype)
// .field("array", &self.array)
.field("buffers", &self.buffers)
.field("ctx", &self.ctx)
Expand All @@ -39,12 +46,19 @@ impl<'a> Debug for ArrayView<'a> {
}

impl<'v> ArrayView<'v> {
pub fn try_new(
ctx: &'v ViewContext,
dtype: &'v DType,
array: fb::Array<'v>,
buffers: &'v [Buffer],
) -> VortexResult<Self> {
pub fn try_new<F>(
ctx: Arc<ViewContext>,
dtype: DType,
flatbuffer: Buffer,
flatbuffer_init: F,
buffers: Vec<Buffer>,
) -> VortexResult<Self>
where
F: FnOnce(&[u8]) -> VortexResult<fb::Array>,
{
let array = flatbuffer_init(flatbuffer.as_ref())?;
let flatbuffer_loc = array._tab.loc();

let encoding = ctx
.find_encoding(array.encoding())
.ok_or_else(|| vortex_err!(InvalidSerde: "Encoding ID out of bounds"))?;
Expand All @@ -59,9 +73,11 @@ impl<'v> ArrayView<'v> {
let view = Self {
encoding,
dtype,
array,
flatbuffer,
flatbuffer_loc,
buffers,
ctx,
_phantom: Default::default(),
};

// Validate here that the metadata correctly parses, so that an encoding can infallibly
Expand All @@ -72,46 +88,56 @@ impl<'v> ArrayView<'v> {
Ok(view)
}

pub fn flatbuffer(&self) -> fb::Array {
unsafe {
let tab = flatbuffers::Table::new(self.flatbuffer.as_ref(), self.flatbuffer_loc);
fb::Array::init_from_table(tab)
}
}

pub fn encoding(&self) -> EncodingRef {
self.encoding
}

pub fn dtype(&self) -> &DType {
self.dtype
&self.dtype
}

pub fn metadata(&self) -> Option<&'v [u8]> {
self.array.metadata().map(|m| m.bytes())
pub fn metadata(&self) -> Option<&[u8]> {
self.flatbuffer().metadata().map(|m| m.bytes())
}

// TODO(ngates): should we separate self and DType lifetimes? Should DType be cloned?
pub fn child(&'v self, idx: usize, dtype: &'v DType) -> Option<ArrayView<'v>> {
let child = self.array_child(idx)?;
let flatbuffer_loc = child._tab.loc();

let encoding = self.ctx.find_encoding(child.encoding())?;

// Figure out how many buffers to skip...
// We store them depth-first.
let buffer_offset = self
.array
.flatbuffer()
.children()?
.iter()
.take(idx)
.map(|child| Self::cumulative_nbuffers(child))
.sum();
let buffer_count = Self::cumulative_nbuffers(child);

Some(
Self::try_new(
self.ctx,
dtype,
child,
&self.buffers[buffer_offset..][0..buffer_count],
)
.unwrap(),
)
Some(Self {
encoding,
dtype: dtype.clone(),
flatbuffer: self.flatbuffer.clone(),
flatbuffer_loc,
buffers: self.buffers[buffer_offset..][0..buffer_count].to_vec(),
ctx: self.ctx.clone(),
_phantom: Default::default(),
})
}

fn array_child(&self, idx: usize) -> Option<fb::Array<'v>> {
let children = self.array.children()?;
fn array_child(&self, idx: usize) -> Option<fb::Array> {
let children = self.flatbuffer().children()?;
if idx < children.len() {
Some(children.get(idx))
} else {
Expand All @@ -121,7 +147,7 @@ impl<'v> ArrayView<'v> {

/// Whether the current Array makes use of a buffer
pub fn has_buffer(&self) -> bool {
self.array.has_buffer()
self.flatbuffer().has_buffer()
}

/// The number of buffers used by the current Array and all its children.
Expand All @@ -133,7 +159,7 @@ impl<'v> ArrayView<'v> {
nbuffers
}

pub fn buffer(&self) -> Option<&'v Buffer> {
pub fn buffer(&self) -> Option<&Buffer> {
self.has_buffer().then(|| &self.buffers[0])
}

Expand All @@ -146,23 +172,27 @@ impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.array.stats()?.max();
let max = self.flatbuffer().stats()?.max();
max.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Min => {
let min = self.array.stats()?.min();
let min = self.flatbuffer().stats()?.min();
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::IsConstant => self.array.stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.array.stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self.array.stats()?.is_strict_sorted().map(bool::into),
Stat::RunCount => self.array.stats()?.run_count().map(u64::into),
Stat::TrueCount => self.array.stats()?.true_count().map(u64::into),
Stat::NullCount => self.array.stats()?.null_count().map(u64::into),
Stat::IsConstant => self.flatbuffer().stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.flatbuffer().stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self
.flatbuffer()
.stats()?
.is_strict_sorted()
.map(bool::into),
Stat::RunCount => self.flatbuffer().stats()?.run_count().map(u64::into),
Stat::TrueCount => self.flatbuffer().stats()?.true_count().map(u64::into),
Stat::NullCount => self.flatbuffer().stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => self
.array
.flatbuffer()
.stats()?
.bit_width_freq()
.map(|v| {
Expand All @@ -177,7 +207,7 @@ impl Statistics for ArrayView<'_> {
)
}),
Stat::TrailingZeroFreq => self
.array
.flatbuffer()
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().collect_vec())
Expand Down
34 changes: 21 additions & 13 deletions vortex-ipc/src/codecs/array_reader.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

use flatbuffers::root;
use futures_util::Stream;
use pin_project::pin_project;
use vortex::{Array, ArrayView, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_error::{vortex_err, VortexError, VortexResult};

use crate::codecs::message_reader::MessageReader;

Expand Down Expand Up @@ -53,7 +55,7 @@ where
}

pub(crate) struct MessageArrayReader<'a, M: MessageReader> {
ctx: ViewContext,
ctx: Arc<ViewContext>,
dtype: DType,
messages: &'a mut M,

Expand All @@ -64,7 +66,7 @@ pub(crate) struct MessageArrayReader<'a, M: MessageReader> {

impl<'m, M: MessageReader> MessageArrayReader<'m, M> {
/// Construct an ArrayReader with a message stream containing chunk messages.
pub fn new(ctx: ViewContext, dtype: DType, messages: &'m mut M) -> Self {
pub fn new(ctx: Arc<ViewContext>, dtype: DType, messages: &'m mut M) -> Self {
Self {
ctx,
dtype,
Expand Down Expand Up @@ -104,16 +106,22 @@ impl<M: MessageReader> MessageArrayReader<'_, M> {
self.buffers = self.messages.buffers().await?;

// After reading the buffers we're now able to load the next message.
let col_array = self
.messages
.next()
.await?
.header_as_chunk()
.unwrap()
.array()
.unwrap();

let view = ArrayView::try_new(&self.ctx, &self.dtype, col_array, self.buffers.as_slice())?;
let flatbuffer = self.messages.next_raw().await?;

let view = ArrayView::try_new(
self.ctx.clone(),
// TODO(ngates): we should Rc the DType.
self.dtype.clone(),
flatbuffer,
|flatbuffer| {
root::<crate::flatbuffers::ipc::Message>(flatbuffer)
.map_err(VortexError::from)
.map(|msg| msg.header_as_chunk().unwrap())
.and_then(|chunk| chunk.array().ok_or(vortex_err!("Chunk missing Array")))
},
// TODO(ngates): no point storing buffers on self (unless we try and reuse them)
self.buffers.clone(),
)?;

// Validate it
view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?;
Expand Down
13 changes: 9 additions & 4 deletions vortex-ipc/src/codecs/ipc_reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use pin_project::pin_project;
use vortex::{Context, ViewContext};
use vortex_dtype::DType;
Expand All @@ -10,13 +12,13 @@ use crate::messages::SerdeContextDeserializer;
/// An IPC reader is used to emit arrays from a stream of Vortex IPC messages.
#[pin_project]
pub struct IPCReader<'m, M> {
view_ctx: ViewContext,
view_ctx: Arc<ViewContext>,
messages: &'m mut M,
}

impl<'m, M: MessageReader> IPCReader<'m, M> {
/// Construct an IPC reader using an existing ViewContext.
pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self {
pub fn new(view_ctx: Arc<ViewContext>, messages: &'m mut M) -> Self {
Self { view_ctx, messages }
}

Expand All @@ -31,13 +33,16 @@ impl<'m, M: MessageReader> IPCReader<'m, M> {
}
}

let view_ctx: ViewContext = SerdeContextDeserializer {
let view_ctx = SerdeContextDeserializer {
fb: messages.next().await?.header_as_context().unwrap(),
ctx,
}
.try_into()?;

Ok(Self { messages, view_ctx })
Ok(Self {
messages,
view_ctx: Arc::new(view_ctx),
})
}

pub async fn next<'a>(&'a mut self) -> VortexResult<Option<impl ArrayReader + 'a>> {
Expand Down
12 changes: 12 additions & 0 deletions vortex-ipc/src/codecs/message_reader/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io;
use bytes::BytesMut;
use flatbuffers::{root, root_unchecked};
use futures_util::{AsyncRead, AsyncReadExt};
use vortex_buffer::Buffer;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::codecs::message_reader::MessageReader;
Expand Down Expand Up @@ -84,6 +85,17 @@ impl<R: AsyncRead + Unpin> MessageReader for AsyncReadMessageReader<R> {
Ok(unsafe { root_unchecked::<Message>(&self.prev_message) })
}

async fn next_raw(&mut self) -> VortexResult<Buffer> {
if self.finished {
panic!("StreamMessageReader is finished - should've checked peek!");
}
self.prev_message = self.message.split();
if !self.load_next_message().await? {
self.finished = true;
}
Ok(Buffer::from(self.prev_message.clone().freeze()))
}

async fn read_into(&mut self, mut buffers: Vec<Vec<u8>>) -> VortexResult<Vec<Vec<u8>>> {
// TODO(ngates): there is no read_vectored_exact for AsyncRead, so for now we'll
// just read one-by-one
Expand Down
1 change: 1 addition & 0 deletions vortex-ipc/src/codecs/message_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::flatbuffers::ipc::Message;
pub trait MessageReader {
fn peek(&self) -> Option<Message>;
fn next(&mut self) -> impl Future<Output = VortexResult<Message>>;
fn next_raw(&mut self) -> impl Future<Output = VortexResult<Buffer>>;
fn read_into(
&mut self,
buffers: Vec<Vec<u8>>,
Expand Down
12 changes: 12 additions & 0 deletions vortex-ipc/src/codecs/message_reader/monoio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::BytesMut;
use flatbuffers::{root, root_unchecked};
use monoio::buf::{IoBufMut, IoVecBufMut, VecBuf};
use monoio::io::{AsyncReadRent, AsyncReadRentExt};
use vortex_buffer::Buffer;
use vortex_error::VortexResult;

use crate::codecs::message_reader::MessageReader;
Expand Down Expand Up @@ -75,6 +76,17 @@ impl<R: AsyncReadRent + Unpin> MessageReader for MonoIoMessageReader<R> {
Ok(unsafe { root_unchecked::<Message>(&self.prev_message) })
}

async fn next_raw(&mut self) -> VortexResult<Buffer> {
if self.finished {
panic!("StreamMessageReader is finished - should've checked peek!");
}
self.prev_message = self.message.split();
if !self.load_next_message().await? {
self.finished = true;
}
Ok(Buffer::from(self.prev_message.clone().freeze()))
}

async fn read_into(&mut self, buffers: Vec<Vec<u8>>) -> VortexResult<Vec<Vec<u8>>> {
Ok(self
.read
Expand Down
Loading

0 comments on commit 7d223b0

Please sign in to comment.