From 2519bef34c6c38064565b95525b1a79f57cddf81 Mon Sep 17 00:00:00 2001 From: Mikkel Denker Date: Wed, 9 Oct 2024 11:55:05 +0200 Subject: [PATCH] remove pages_by_host from webgraph as it isn't used anymore --- crates/core/src/entrypoint/webgraph_server.rs | 19 ------ crates/core/src/webgraph/mod.rs | 18 ----- crates/core/src/webgraph/remote.rs | 25 +------ crates/core/src/webgraph/segment.rs | 4 -- crates/core/src/webgraph/store.rs | 67 +------------------ crates/core/src/webgraph/store_writer.rs | 7 +- 6 files changed, 4 insertions(+), 136 deletions(-) diff --git a/crates/core/src/entrypoint/webgraph_server.rs b/crates/core/src/entrypoint/webgraph_server.rs index 96cb3bab..dbe60278 100644 --- a/crates/core/src/entrypoint/webgraph_server.rs +++ b/crates/core/src/entrypoint/webgraph_server.rs @@ -17,7 +17,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use itertools::Itertools; use tracing::info; use utoipa::ToSchema; @@ -58,7 +57,6 @@ sonic_service!( RawOutgoingEdges, RawIngoingEdgesWithLabels, RawOutgoingEdgesWithLabels, - PagesByHosts, GetNodeIDs ] ); @@ -164,23 +162,6 @@ impl Message for RawOutgoingEdgesWithLabels { } } -#[derive(Debug, Clone, bincode::Encode, bincode::Decode)] -pub struct PagesByHosts { - pub hosts: Vec, -} - -impl Message for PagesByHosts { - type Response = Vec; - - async fn handle(self, server: &WebGraphService) -> Self::Response { - self.hosts - .iter() - .flat_map(|host| server.graph.pages_by_host(host)) - .unique() - .collect() - } -} - #[derive(Debug, Clone, bincode::Encode, bincode::Decode)] pub struct InDegreeUpperBound { pub node: NodeID, diff --git a/crates/core/src/webgraph/mod.rs b/crates/core/src/webgraph/mod.rs index 4b3f7e7e..8be8563a 100644 --- a/crates/core/src/webgraph/mod.rs +++ b/crates/core/src/webgraph/mod.rs @@ -229,24 +229,6 @@ impl Webgraph { .collect() } - pub fn pages_by_host(&self, host_node: &NodeID) -> Vec { - let mut pages: Vec<_> = self - .executor - .map( - |segment| segment.pages_by_host(host_node), - self.segments.iter(), - ) - .unwrap() - .into_iter() - .flatten() - .collect(); - - pages.sort(); - pages.dedup(); - - pages - } - pub fn raw_ingoing_edges(&self, node: &NodeID, limit: EdgeLimit) -> Vec> { let edges = self .segments diff --git a/crates/core/src/webgraph/remote.rs b/crates/core/src/webgraph/remote.rs index ab34501a..7c11c927 100644 --- a/crates/core/src/webgraph/remote.rs +++ b/crates/core/src/webgraph/remote.rs @@ -34,7 +34,7 @@ use crate::{ streaming_response::StreamingResponse, }, entrypoint::webgraph_server::{ - GetNode, GetNodeIDs, IngoingEdges, OutgoingEdges, PagesByHosts, RawIngoingEdges, + GetNode, GetNodeIDs, IngoingEdges, OutgoingEdges, RawIngoingEdges, RawIngoingEdgesWithLabels, RawOutgoingEdges, RawOutgoingEdgesWithLabels, WebGraphService, }, Result, @@ -437,29 +437,6 @@ impl RemoteWebgraph { Ok(edges) } - pub async fn pages_by_hosts(&self, hosts: &[NodeID]) -> Result> { - let res = self - .conn() - .await - .send( - PagesByHosts { - hosts: hosts.to_vec(), - }, - &AllShardsSelector, - &RandomReplicaSelector, - ) - .await?; - - Ok(res - .into_iter() - .flat_map(|(_, reps)| { - debug_assert!(reps.len() <= 1); - reps.into_iter().flat_map(|(_, rep)| rep) - }) - .unique() - .collect()) - } - pub async fn stream_node_ids(&self) -> impl futures::Stream { StreamNodeIDs::new(self.conn().await).stream() } diff --git a/crates/core/src/webgraph/segment.rs b/crates/core/src/webgraph/segment.rs index 6c312dbd..28e43305 100644 --- a/crates/core/src/webgraph/segment.rs +++ b/crates/core/src/webgraph/segment.rs @@ -196,10 +196,6 @@ impl Segment { self.reversed_adjacency.degree(node) } - pub fn pages_by_host(&self, host_node: &NodeID) -> Vec { - self.reversed_adjacency.nodes_by_host(host_node) - } - pub fn id(&self) -> String { self.id.clone() } diff --git a/crates/core/src/webgraph/store.rs b/crates/core/src/webgraph/store.rs index 5428f05f..cca6e41e 100644 --- a/crates/core/src/webgraph/store.rs +++ b/crates/core/src/webgraph/store.rs @@ -29,64 +29,13 @@ use file_store::{ }, ConstSerializable, }; -use fst::Automaton; use itertools::Itertools; use super::{ merge::{MergeNode, MergeSegmentOrd}, - Compression, FullNodeID, NodeDatum, NodeID, SegmentEdge, StoredEdge, + Compression, NodeDatum, NodeID, SegmentEdge, StoredEdge, }; -pub struct HostDb { - db: speedy_kv::Db, ()>, -} - -impl HostDb { - pub fn open>(path: P) -> Self { - let db = speedy_kv::Db::open_or_create(path).unwrap(); - - Self { db } - } - - fn optimize_read(&mut self) { - self.db.merge_all_segments().unwrap(); - } - - pub fn insert(&mut self, node: &FullNodeID) { - let key = [ - node.host.as_u64().to_be_bytes(), - node.id.as_u64().to_be_bytes(), - ] - .concat(); - - self.db.insert_raw(key, vec![]); - } - - fn get(&self, host: &NodeID) -> Vec { - let host = host.as_u64().to_be_bytes().to_vec(); - - let query = speedy_kv::automaton::ExactMatch(&host).starts_with(); - - self.db - .search_raw(query) - .map(|(key, _)| { - let id = u64::from_be_bytes( - key.as_bytes()[u64::BITS as usize / 8..].try_into().unwrap(), - ); - NodeID::from(id) - }) - .collect() - } - - pub fn flush(&mut self) { - self.db.commit().unwrap(); - } - - fn merge(&mut self, other: HostDb) { - self.db.merge(other.db).unwrap(); - } -} - #[derive(Debug, Clone, bincode::Encode, bincode::Decode)] pub struct EdgeRange { range: std::ops::Range, @@ -289,7 +238,6 @@ impl CompressedLabelBlock { pub struct EdgeStore { reversed: bool, ranges: RangesDb, - hosts: HostDb, edge_labels: IterableStoreReader, edges: ConstIterableStoreReader, @@ -305,7 +253,6 @@ impl EdgeStore { Self { ranges, - hosts: HostDb::open(path.as_ref().join("hosts")), edge_labels, edges, reversed, @@ -314,7 +261,6 @@ impl EdgeStore { pub fn optimize_read(&mut self) { self.ranges.optimize_read(); - self.hosts.optimize_read(); } fn merge_postings_for_node<'a>( @@ -476,11 +422,6 @@ impl EdgeStore { } let mut res = Self::merge_postings(&stores, label_compression, path)?; - - for store in stores { - res.hosts.merge(store.hosts); - } - res.optimize_read(); Ok(()) @@ -576,10 +517,6 @@ impl EdgeStore { } } - pub fn nodes_by_host(&self, host: &NodeID) -> Vec { - self.hosts.get(host) - } - pub fn iter_without_label(&self) -> impl Iterator> + '_ + Send + Sync { self.ranges.edges.iter_raw().flat_map(move |(key, val)| { let node = u64::from_be_bytes((key.as_bytes()).try_into().unwrap()); @@ -617,7 +554,7 @@ impl EdgeStore { mod tests { use std::sync::Arc; - use crate::webgraph::{store_writer::EdgeStoreWriter, Edge, InsertableEdge}; + use crate::webgraph::{store_writer::EdgeStoreWriter, Edge, FullNodeID, InsertableEdge}; use super::*; diff --git a/crates/core/src/webgraph/store_writer.rs b/crates/core/src/webgraph/store_writer.rs index 91e2a546..71fefc60 100644 --- a/crates/core/src/webgraph/store_writer.rs +++ b/crates/core/src/webgraph/store_writer.rs @@ -39,7 +39,7 @@ use file_store::{ }; use super::{ - store::{CompressedLabelBlock, EdgeStore, HostDb, LabelBlock, RangesDb}, + store::{CompressedLabelBlock, EdgeStore, LabelBlock, RangesDb}, Compression, EdgeLabel, InsertableEdge, NodeDatum, NodeID, StoredEdge, }; @@ -219,7 +219,6 @@ impl Drop for EdgeStoreWriter { struct FinalEdgeStoreWriter { ranges: RangesDb, - hosts: HostDb, labels: IterableStoreWriter, edges: ConstIterableStoreWriter, @@ -261,7 +260,6 @@ impl FinalEdgeStoreWriter { Self { ranges, - hosts: HostDb::open(path.as_ref().join("hosts")), labels, edges, reversed, @@ -296,7 +294,6 @@ impl FinalEdgeStoreWriter { }); } - self.hosts.insert(&node); let node_bytes = node.id.as_u64().to_be_bytes().to_vec(); debug_assert!(self.ranges.nodes_get_raw(&node_bytes).is_none()); @@ -441,8 +438,6 @@ impl FinalEdgeStoreWriter { } fn flush(&mut self) { - self.hosts.flush(); - self.ranges.commit(); self.edges.flush().unwrap();