Skip to content

Commit

Permalink
refs
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Aug 1, 2024
1 parent eca4443 commit 02e656d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 37 deletions.
6 changes: 3 additions & 3 deletions vortex-serde/src/layout/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ impl Footer {
(self.schema_offset - self.leftovers_offset) as usize
}

pub fn layout<'a>(
pub fn layout(
&self,
message_cache: &'a MessageCache,
message_cache: Arc<MessageCache>,
scan: Scan,
) -> VortexResult<Box<dyn LayoutT + 'a>> {
) -> VortexResult<Box<dyn LayoutT>> {
let start_offset = self.leftovers_footer_offset();
let end_offset = self.leftovers.len() - FULL_FOOTER_SIZE;
let layout_bytes = self.leftovers.slice(start_offset..end_offset);
Expand Down
50 changes: 25 additions & 25 deletions vortex-serde/src/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ pub struct LayoutId(u16);
pub trait LayoutDefinition: Debug + Send + Sync {
fn id(&self) -> LayoutId;

fn layout<'a>(
fn layout(
&self,
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
scan: Scan,
) -> Box<dyn LayoutT + 'a>;
) -> Box<dyn LayoutT>;
}

pub type LayoutRef = &'static dyn LayoutDefinition;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl MessageCache {
}
}

pub fn view(&self, path: Vec<MessageId>) -> RelativeMessageCache {
pub fn view(self: Arc<Self>, path: Vec<MessageId>) -> RelativeMessageCache {
RelativeMessageCache::new(self, path)
}

Expand All @@ -113,13 +113,13 @@ impl MessageCache {
}

#[derive(Debug)]
pub struct RelativeMessageCache<'a> {
cache_ref: &'a MessageCache,
pub struct RelativeMessageCache {
cache_ref: Arc<MessageCache>,
path: Vec<MessageId>,
}

impl<'a> RelativeMessageCache<'a> {
pub fn new(cache_ref: &'a MessageCache, path: Vec<MessageId>) -> Self {
impl RelativeMessageCache {
pub fn new(cache_ref: Arc<MessageCache>, path: Vec<MessageId>) -> Self {
Self { cache_ref, path }
}

Expand Down Expand Up @@ -150,32 +150,32 @@ impl LayoutDefinition for ColumnLayoutDefinition {
Self::ID
}

fn layout<'a>(
fn layout(
&self,
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
scan: Scan,
) -> Box<dyn LayoutT + 'a> {
) -> Box<dyn LayoutT> {
Box::new(ColumnLayout::new(flatbuffer, ctx, scan, message_cache))
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct ColumnLayout<'a> {
pub struct ColumnLayout {
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
scan: Scan,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
}

impl<'a> ColumnLayout<'a> {
impl ColumnLayout {
pub fn new(
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
scan: Scan,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
) -> Self {
Self {
flatbuffer,
Expand All @@ -186,7 +186,7 @@ impl<'a> ColumnLayout<'a> {
}
}

impl<'a> LayoutT for ColumnLayout<'a> {
impl LayoutT for ColumnLayout {
fn read(&mut self) -> VortexResult<ReadResult> {
let mut column_info = Vec::default();

Expand All @@ -204,7 +204,7 @@ impl<'a> LayoutT for ColumnLayout<'a> {
}
}

impl<'a> Future for ColumnLayout<'a> {
impl Future for ColumnLayout {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -224,32 +224,32 @@ impl LayoutDefinition for ChunkedLayoutDefinition {
Self::ID
}

fn layout<'a>(
fn layout(
&self,
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
scan: Scan,
) -> Box<dyn LayoutT + 'a> {
) -> Box<dyn LayoutT> {
Box::new(ChunkedLayout::new(flatbuffer, ctx, scan, message_cache))
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct ChunkedLayout<'a> {
pub struct ChunkedLayout {
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
scan: Scan,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
}

impl<'a> ChunkedLayout<'a> {
impl ChunkedLayout {
pub fn new(
flatbuffer: Bytes,
ctx: Arc<LayoutContext>,
scan: Scan,
message_cache: RelativeMessageCache<'a>,
message_cache: RelativeMessageCache,
) -> Self {
Self {
flatbuffer,
Expand All @@ -260,7 +260,7 @@ impl<'a> ChunkedLayout<'a> {
}
}

impl LayoutT for ChunkedLayout<'_> {
impl LayoutT for ChunkedLayout {
fn read(&mut self) -> VortexResult<ReadResult> {
todo!()
}
Expand Down
12 changes: 3 additions & 9 deletions vortex-serde/src/layout/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
};

let message_cache = Arc::new(MessageCache::new());
let layout = footer.layout(&message_cache, scan)?;
let layout = footer.layout(message_cache.clone(), scan)?;

let dtype = if let DType::Struct(s, _) = footer.dtype()? {
s
Expand All @@ -110,14 +110,7 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {

let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

let stream = VortexLayoutBatchStream::try_new(
self.reader,
layout,
message_cache.clone(),
dtype,
batch_size,
);
stream
VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, dtype, batch_size)
}

async fn len(&self) -> usize {
Expand Down Expand Up @@ -170,6 +163,7 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
footer_offset,
leftovers: buf.freeze(),
leftovers_offset: read_offset,
ctx: self.layout_context.clone(),
})
}
}
Expand Down

0 comments on commit 02e656d

Please sign in to comment.