Skip to content

Commit

Permalink
Add a MergeDeferredDataSource which combines multiple data sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottslaughter committed Oct 20, 2023
1 parent 9d587fb commit 93fb3d5
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct ItemLink {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct FieldID(usize);

#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct FieldSchema {
// Field names that may potentially exist on a given item. They are not
// necessarily all present on any given item
Expand Down Expand Up @@ -165,7 +165,7 @@ pub struct ItemMeta {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct TileID(pub Interval);

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct TileSet {
pub tiles: Vec<Vec<TileID>>,
}
Expand Down Expand Up @@ -443,3 +443,15 @@ impl fmt::Display for TileIDSlug {
write!(f, "{}", self.0 .0.stop.0)
}
}

// Private helpers for EntryID
impl EntryID {
pub(crate) fn shift_level0(&self, level0_offset: i64) -> EntryID {
assert!(!self.0.is_empty());
assert_ne!(self.0[0], -1);
let mut result = self.clone();
result.0[0] += level0_offset;
assert!(result.0[0] >= 0);
result
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod deferred_data;
#[cfg(not(target_arch = "wasm32"))]
pub mod file_data;
pub mod http;
pub mod merge_data;
#[cfg(not(target_arch = "wasm32"))]
pub mod parallel_data;
pub mod timestamp;
292 changes: 292 additions & 0 deletions src/merge_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
use std::collections::VecDeque;

use crate::data::{
DataSourceInfo, EntryID, EntryIndex, EntryInfo, Field, ItemLink, ItemUID, SlotMetaTile,
SlotTile, SummaryTile, TileID,
};
use crate::deferred_data::DeferredDataSource;
use crate::timestamp::Interval;

pub struct MergeDeferredDataSource {
data_sources: Vec<Box<dyn DeferredDataSource>>,
infos: Vec<VecDeque<DataSourceInfo>>,
mapping: Vec<u64>,
}

impl MergeDeferredDataSource {
pub fn new(data_sources: Vec<Box<dyn DeferredDataSource>>) -> Self {
assert!(!data_sources.is_empty());
let infos = (0..data_sources.len()).map(|_| VecDeque::new()).collect();
Self {
data_sources,
infos,
mapping: Vec::new(),
}
}

fn merge_entry(first: EntryInfo, second: EntryInfo) -> EntryInfo {
let EntryInfo::Panel {
short_name,
long_name,
summary: first_summary,
mut slots,
} = first
else {
unreachable!();
};

let EntryInfo::Panel {
summary: second_summary,
slots: second_slots,
..
} = second
else {
unreachable!();
};

assert!(first_summary.is_none());
assert!(second_summary.is_none());

slots.extend(second_slots);

EntryInfo::Panel {
short_name,
long_name,
summary: None,
slots,
}
}

fn compute_mapping(source_infos: &[DataSourceInfo]) -> Vec<u64> {
// Compute the mapping from old to new entries (basically the offset
// of each initial slot)
let slot_lens: Vec<_> = source_infos
.iter()
.map(|info| {
let EntryInfo::Panel { ref slots, .. } = info.entry_info else {
unreachable!();
};
slots.len() as u64
})
.collect();

let mut mapping = Vec::new();
let mut sum = 0;
for slot_len in slot_lens {
mapping.push(sum);
sum += slot_len;
}

mapping
}

fn merge_infos(source_infos: Vec<DataSourceInfo>) -> DataSourceInfo {
assert!(!source_infos.is_empty());

// Some fields can't be meaningfully merged, so just assert they're
// all equivalent.
let first_info = source_infos.first().unwrap();
let tile_set = first_info.tile_set.clone();
let field_schema = first_info.field_schema.clone();

for info in &source_infos {
assert_eq!(tile_set, info.tile_set);
assert_eq!(field_schema, info.field_schema);
}

// Merge remaining fields
// IMPORTANT: entry_info must be kept consistent with compute_mapping
let interval = source_infos
.iter()
.map(|info| info.interval)
.reduce(Interval::union)
.unwrap();
let entry_info = source_infos
.iter()
.map(|info| info.entry_info.clone())
.reduce(Self::merge_entry)
.unwrap();

DataSourceInfo {
entry_info,
interval,
tile_set,
field_schema,
}
}

fn map_src_to_dst_entry(&self, idx: usize, src_entry: &EntryID) -> EntryID {
src_entry.shift_level0(self.mapping[idx] as i64)
}

fn map_dst_to_src_entry(&self, dst_entry: &EntryID) -> (usize, EntryID) {
let Some(EntryIndex::Slot(level0)) = dst_entry.index(0) else {
unreachable!();
};

let idx = self.mapping.partition_point(|&offset| offset < level0);
(idx, dst_entry.shift_level0(-(self.mapping[idx] as i64)))
}

fn map_src_to_dst_item_uid(&self, idx: usize, item_uid: ItemUID) -> ItemUID {
ItemUID(item_uid.0 * (self.mapping.len() as u64) + (idx as u64))
}

fn map_src_to_dst_summary(&self, idx: usize, tile: SummaryTile) -> SummaryTile {
SummaryTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}

fn map_src_to_dst_slot(&self, idx: usize, mut tile: SlotTile) -> SlotTile {
for items in &mut tile.data.items {
for item in items {
item.item_uid = self.map_src_to_dst_item_uid(idx, item.item_uid);
}
}

SlotTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}

fn map_src_to_dst_field(&self, idx: usize, field: &mut Field) {
match field {
Field::ItemLink(ItemLink {
ref mut item_uid,
ref mut entry_id,
..
}) => {
*item_uid = self.map_src_to_dst_item_uid(idx, *item_uid);
*entry_id = self.map_src_to_dst_entry(idx, entry_id);
}
Field::Vec(elts) => {
for elt in elts {
self.map_src_to_dst_field(idx, elt);
}
}
_ => (),
}
}

fn map_src_to_dst_slot_meta(&self, idx: usize, mut tile: SlotMetaTile) -> SlotMetaTile {
for items in &mut tile.data.items {
for item in items {
item.item_uid = self.map_src_to_dst_item_uid(idx, item.item_uid);
for (_, field) in &mut item.fields {
self.map_src_to_dst_field(idx, field);
}
}
}

SlotMetaTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}
}

impl DeferredDataSource for MergeDeferredDataSource {
fn fetch_info(&mut self) {
for data_source in &mut self.data_sources {
data_source.fetch_info();
}
}

fn get_infos(&mut self) -> Vec<DataSourceInfo> {
for (data_source, infos) in self.data_sources.iter_mut().zip(self.infos.iter_mut()) {
infos.extend(data_source.get_infos());
}

let max_available = self.infos.iter().map(|infos| infos.len()).max().unwrap();

let mut result = Vec::new();
for _ in 0..max_available {
let source_infos: Vec<_> = self
.infos
.iter_mut()
.map(|infos| infos.pop_front().unwrap())
.collect();
self.mapping = Self::compute_mapping(&source_infos);
result.push(Self::merge_infos(source_infos));
}
result
}

fn fetch_summary_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_summary_tile(&src_entry, tile_id, full);
}

fn get_summary_tiles(&mut self) -> Vec<SummaryTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_summary_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_summary(idx, tile))
.collect()
}

fn fetch_slot_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_slot_tile(&src_entry, tile_id, full);
}

fn get_slot_tiles(&mut self) -> Vec<SlotTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_slot_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_slot(idx, tile))
.collect()
}

fn fetch_slot_meta_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_slot_meta_tile(&src_entry, tile_id, full);
}

fn get_slot_meta_tiles(&mut self) -> Vec<SlotMetaTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_slot_meta_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_slot_meta(idx, tile))
.collect()
}
}

0 comments on commit 93fb3d5

Please sign in to comment.