Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a MergeDeferredDataSource which combines multiple data sources #35

Merged
merged 8 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ client = ["dep:reqwest", "dep:url"]
server = ["dep:actix-cors", "dep:actix-web"]

[dependencies]
percentage = "0.1.0"
egui = "0.22.0"
egui_extras = "0.22.0"
eframe = { version = "0.22.0", default-features = false, features = [
Expand All @@ -32,6 +31,7 @@ rand = { version = "0.8" }
# transitive depedency, required for rand to support wasm
getrandom = { version = "0.2", features = ["js"] }

percentage = "0.1.0"
regex = "1.10.0"


Expand Down
16 changes: 14 additions & 2 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct ItemLink {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct FieldID(usize);

#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct FieldSchema {
// Field names that may potentially exist on a given item. They are not
// necessarily all present on any given item
Expand Down Expand Up @@ -165,7 +165,7 @@ pub struct ItemMeta {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct TileID(pub Interval);

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct TileSet {
pub tiles: Vec<Vec<TileID>>,
}
Expand Down Expand Up @@ -443,3 +443,15 @@ impl fmt::Display for TileIDSlug {
write!(f, "{}", self.0 .0.stop.0)
}
}

// Private helpers for EntryID
impl EntryID {
pub(crate) fn shift_level0(&self, level0_offset: i64) -> EntryID {
assert!(!self.0.is_empty());
assert_ne!(self.0[0], -1);
let mut result = self.clone();
result.0[0] += level0_offset;
assert!(result.0[0] >= 0);
result
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod deferred_data;
#[cfg(not(target_arch = "wasm32"))]
pub mod file_data;
pub mod http;
pub mod merge_data;
#[cfg(not(target_arch = "wasm32"))]
pub mod parallel_data;
pub mod timestamp;
293 changes: 293 additions & 0 deletions src/merge_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
use std::collections::VecDeque;

use crate::data::{
DataSourceInfo, EntryID, EntryIndex, EntryInfo, Field, ItemLink, ItemUID, SlotMetaTile,
SlotTile, SummaryTile, TileID,
};
use crate::deferred_data::DeferredDataSource;
use crate::timestamp::Interval;

pub struct MergeDeferredDataSource {
data_sources: Vec<Box<dyn DeferredDataSource>>,
infos: Vec<VecDeque<DataSourceInfo>>,
mapping: Vec<u64>,
}

impl MergeDeferredDataSource {
pub fn new(data_sources: Vec<Box<dyn DeferredDataSource>>) -> Self {
assert!(!data_sources.is_empty());
let mut infos = Vec::new();
infos.resize_with(data_sources.len(), VecDeque::new);
Self {
data_sources,
infos,
mapping: Vec::new(),
}
}

fn merge_entry(first: EntryInfo, second: EntryInfo) -> EntryInfo {
let EntryInfo::Panel {
short_name,
long_name,
summary: first_summary,
mut slots,
} = first
else {
unreachable!();
};

let EntryInfo::Panel {
summary: second_summary,
slots: second_slots,
..
} = second
else {
unreachable!();
};

assert!(first_summary.is_none());
assert!(second_summary.is_none());

slots.extend(second_slots);

EntryInfo::Panel {
short_name,
long_name,
summary: None,
slots,
}
}

fn compute_mapping(source_infos: &[DataSourceInfo]) -> Vec<u64> {
// Compute the mapping from old to new entries (basically the offset
// of each initial slot)
let slot_lens: Vec<_> = source_infos
.iter()
.map(|info| {
let EntryInfo::Panel { ref slots, .. } = info.entry_info else {
unreachable!();
};
slots.len() as u64
})
.collect();

let mut mapping = Vec::new();
let mut sum = 0;
for slot_len in slot_lens {
mapping.push(sum);
sum += slot_len;
}

mapping
}

fn merge_infos(source_infos: Vec<DataSourceInfo>) -> DataSourceInfo {
assert!(!source_infos.is_empty());

// Some fields can't be meaningfully merged, so just assert they're
// all equivalent.
let first_info = source_infos.first().unwrap();
let tile_set = first_info.tile_set.clone();
let field_schema = first_info.field_schema.clone();

for info in &source_infos {
assert_eq!(tile_set, info.tile_set);
assert_eq!(field_schema, info.field_schema);
}

// Merge remaining fields
// IMPORTANT: entry_info must be kept consistent with compute_mapping
let interval = source_infos
.iter()
.map(|info| info.interval)
.reduce(Interval::union)
.unwrap();
let entry_info = source_infos
.iter()
.map(|info| info.entry_info.clone())
.reduce(Self::merge_entry)
.unwrap();

DataSourceInfo {
entry_info,
interval,
tile_set,
field_schema,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on basic unit tests for functions like this? They seem amenable to unit testing but maybe relying on the compiler for assurances is more common in Rust-land?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the most recent push and see if it seems reasonable.

In general, Rust users do rely a lot on the type system to make sure things work. And Rust's type system is pretty good, so I have a lot of confidence when things compile. But there are still classes of logic errors we hit, so it would be better to add more unit testing in general. So thanks for the reminder.


fn map_src_to_dst_entry(&self, idx: usize, src_entry: &EntryID) -> EntryID {
src_entry.shift_level0(self.mapping[idx] as i64)
}

fn map_dst_to_src_entry(&self, dst_entry: &EntryID) -> (usize, EntryID) {
let Some(EntryIndex::Slot(level0)) = dst_entry.index(0) else {
unreachable!();
};

let idx = self.mapping.partition_point(|&offset| offset < level0);
(idx, dst_entry.shift_level0(-(self.mapping[idx] as i64)))
}

fn map_src_to_dst_item_uid(&self, idx: usize, item_uid: ItemUID) -> ItemUID {
ItemUID(item_uid.0 * (self.mapping.len() as u64) + (idx as u64))
}

fn map_src_to_dst_summary(&self, idx: usize, tile: SummaryTile) -> SummaryTile {
SummaryTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}

fn map_src_to_dst_slot(&self, idx: usize, mut tile: SlotTile) -> SlotTile {
for items in &mut tile.data.items {
for item in items {
item.item_uid = self.map_src_to_dst_item_uid(idx, item.item_uid);
}
}

SlotTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}

fn map_src_to_dst_field(&self, idx: usize, field: &mut Field) {
match field {
Field::ItemLink(ItemLink {
ref mut item_uid,
ref mut entry_id,
..
}) => {
*item_uid = self.map_src_to_dst_item_uid(idx, *item_uid);
*entry_id = self.map_src_to_dst_entry(idx, entry_id);
}
Field::Vec(elts) => {
for elt in elts {
self.map_src_to_dst_field(idx, elt);
}
}
_ => (),
}
}

fn map_src_to_dst_slot_meta(&self, idx: usize, mut tile: SlotMetaTile) -> SlotMetaTile {
for items in &mut tile.data.items {
for item in items {
item.item_uid = self.map_src_to_dst_item_uid(idx, item.item_uid);
for (_, field) in &mut item.fields {
self.map_src_to_dst_field(idx, field);
}
}
}

SlotMetaTile {
entry_id: self.map_src_to_dst_entry(idx, &tile.entry_id),
tile_id: tile.tile_id,
data: tile.data,
}
}
}

impl DeferredDataSource for MergeDeferredDataSource {
fn fetch_info(&mut self) {
for data_source in &mut self.data_sources {
data_source.fetch_info();
}
}

fn get_infos(&mut self) -> Vec<DataSourceInfo> {
for (data_source, infos) in self.data_sources.iter_mut().zip(self.infos.iter_mut()) {
infos.extend(data_source.get_infos());
}

let max_available = self.infos.iter().map(|infos| infos.len()).max().unwrap();

let mut result = Vec::new();
for _ in 0..max_available {
let source_infos: Vec<_> = self
.infos
.iter_mut()
.map(|infos| infos.pop_front().unwrap())
.collect();
self.mapping = Self::compute_mapping(&source_infos);
result.push(Self::merge_infos(source_infos));
}
result
}

fn fetch_summary_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_summary_tile(&src_entry, tile_id, full);
}

fn get_summary_tiles(&mut self) -> Vec<SummaryTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_summary_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_summary(idx, tile))
.collect()
}

fn fetch_slot_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_slot_tile(&src_entry, tile_id, full);
}

fn get_slot_tiles(&mut self) -> Vec<SlotTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_slot_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_slot(idx, tile))
.collect()
}

fn fetch_slot_meta_tile(&mut self, entry_id: &EntryID, tile_id: TileID, full: bool) {
let (idx, src_entry) = self.map_dst_to_src_entry(entry_id);

self.data_sources[idx].fetch_slot_meta_tile(&src_entry, tile_id, full);
}

fn get_slot_meta_tiles(&mut self) -> Vec<SlotMetaTile> {
let mut tiles = Vec::new();
for (idx, data_source) in self.data_sources.iter_mut().enumerate() {
tiles.extend(
data_source
.get_slot_meta_tiles()
.into_iter()
.map(|tile| (idx, tile)),
);
}

// Hack: doing this in two stages to avoid mutability conflict
tiles
.into_iter()
.map(|(idx, tile)| self.map_src_to_dst_slot_meta(idx, tile))
.collect()
}
}
Loading