Skip to content

Commit

Permalink
remove pages_by_host from webgraph as it isn't used anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Oct 9, 2024
1 parent f494a11 commit 2519bef
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 136 deletions.
19 changes: 0 additions & 19 deletions crates/core/src/entrypoint/webgraph_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::net::SocketAddr;
use std::sync::Arc;

use itertools::Itertools;
use tracing::info;
use utoipa::ToSchema;

Expand Down Expand Up @@ -58,7 +57,6 @@ sonic_service!(
RawOutgoingEdges,
RawIngoingEdgesWithLabels,
RawOutgoingEdgesWithLabels,
PagesByHosts,
GetNodeIDs
]
);
Expand Down Expand Up @@ -164,23 +162,6 @@ impl Message<WebGraphService> for RawOutgoingEdgesWithLabels {
}
}

#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
pub struct PagesByHosts {
pub hosts: Vec<NodeID>,
}

impl Message<WebGraphService> for PagesByHosts {
type Response = Vec<NodeID>;

async fn handle(self, server: &WebGraphService) -> Self::Response {
self.hosts
.iter()
.flat_map(|host| server.graph.pages_by_host(host))
.unique()
.collect()
}
}

#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
pub struct InDegreeUpperBound {
pub node: NodeID,
Expand Down
18 changes: 0 additions & 18 deletions crates/core/src/webgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,6 @@ impl Webgraph {
.collect()
}

pub fn pages_by_host(&self, host_node: &NodeID) -> Vec<NodeID> {
let mut pages: Vec<_> = self
.executor
.map(
|segment| segment.pages_by_host(host_node),
self.segments.iter(),
)
.unwrap()
.into_iter()
.flatten()
.collect();

pages.sort();
pages.dedup();

pages
}

pub fn raw_ingoing_edges(&self, node: &NodeID, limit: EdgeLimit) -> Vec<Edge<()>> {
let edges = self
.segments
Expand Down
25 changes: 1 addition & 24 deletions crates/core/src/webgraph/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
streaming_response::StreamingResponse,
},
entrypoint::webgraph_server::{
GetNode, GetNodeIDs, IngoingEdges, OutgoingEdges, PagesByHosts, RawIngoingEdges,
GetNode, GetNodeIDs, IngoingEdges, OutgoingEdges, RawIngoingEdges,
RawIngoingEdgesWithLabels, RawOutgoingEdges, RawOutgoingEdgesWithLabels, WebGraphService,
},
Result,
Expand Down Expand Up @@ -437,29 +437,6 @@ impl<G: WebgraphGranularity> RemoteWebgraph<G> {
Ok(edges)
}

pub async fn pages_by_hosts(&self, hosts: &[NodeID]) -> Result<Vec<NodeID>> {
let res = self
.conn()
.await
.send(
PagesByHosts {
hosts: hosts.to_vec(),
},
&AllShardsSelector,
&RandomReplicaSelector,
)
.await?;

Ok(res
.into_iter()
.flat_map(|(_, reps)| {
debug_assert!(reps.len() <= 1);
reps.into_iter().flat_map(|(_, rep)| rep)
})
.unique()
.collect())
}

pub async fn stream_node_ids(&self) -> impl futures::Stream<Item = NodeID> {
StreamNodeIDs::new(self.conn().await).stream()
}
Expand Down
4 changes: 0 additions & 4 deletions crates/core/src/webgraph/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ impl Segment {
self.reversed_adjacency.degree(node)
}

pub fn pages_by_host(&self, host_node: &NodeID) -> Vec<NodeID> {
self.reversed_adjacency.nodes_by_host(host_node)
}

pub fn id(&self) -> String {
self.id.clone()
}
Expand Down
67 changes: 2 additions & 65 deletions crates/core/src/webgraph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,64 +29,13 @@ use file_store::{
},
ConstSerializable,
};
use fst::Automaton;
use itertools::Itertools;

use super::{
merge::{MergeNode, MergeSegmentOrd},
Compression, FullNodeID, NodeDatum, NodeID, SegmentEdge, StoredEdge,
Compression, NodeDatum, NodeID, SegmentEdge, StoredEdge,
};

pub struct HostDb {
db: speedy_kv::Db<Vec<u8>, ()>,
}

impl HostDb {
pub fn open<P: AsRef<Path>>(path: P) -> Self {
let db = speedy_kv::Db::open_or_create(path).unwrap();

Self { db }
}

fn optimize_read(&mut self) {
self.db.merge_all_segments().unwrap();
}

pub fn insert(&mut self, node: &FullNodeID) {
let key = [
node.host.as_u64().to_be_bytes(),
node.id.as_u64().to_be_bytes(),
]
.concat();

self.db.insert_raw(key, vec![]);
}

fn get(&self, host: &NodeID) -> Vec<NodeID> {
let host = host.as_u64().to_be_bytes().to_vec();

let query = speedy_kv::automaton::ExactMatch(&host).starts_with();

self.db
.search_raw(query)
.map(|(key, _)| {
let id = u64::from_be_bytes(
key.as_bytes()[u64::BITS as usize / 8..].try_into().unwrap(),
);
NodeID::from(id)
})
.collect()
}

pub fn flush(&mut self) {
self.db.commit().unwrap();
}

fn merge(&mut self, other: HostDb) {
self.db.merge(other.db).unwrap();
}
}

#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
pub struct EdgeRange {
range: std::ops::Range<u64>,
Expand Down Expand Up @@ -289,7 +238,6 @@ impl CompressedLabelBlock {
pub struct EdgeStore {
reversed: bool,
ranges: RangesDb,
hosts: HostDb,

edge_labels: IterableStoreReader<CompressedLabelBlock>,
edges: ConstIterableStoreReader<StoredEdge>,
Expand All @@ -305,7 +253,6 @@ impl EdgeStore {

Self {
ranges,
hosts: HostDb::open(path.as_ref().join("hosts")),
edge_labels,
edges,
reversed,
Expand All @@ -314,7 +261,6 @@ impl EdgeStore {

pub fn optimize_read(&mut self) {
self.ranges.optimize_read();
self.hosts.optimize_read();
}

fn merge_postings_for_node<'a>(
Expand Down Expand Up @@ -476,11 +422,6 @@ impl EdgeStore {
}

let mut res = Self::merge_postings(&stores, label_compression, path)?;

for store in stores {
res.hosts.merge(store.hosts);
}

res.optimize_read();

Ok(())
Expand Down Expand Up @@ -576,10 +517,6 @@ impl EdgeStore {
}
}

pub fn nodes_by_host(&self, host: &NodeID) -> Vec<NodeID> {
self.hosts.get(host)
}

pub fn iter_without_label(&self) -> impl Iterator<Item = SegmentEdge<()>> + '_ + Send + Sync {
self.ranges.edges.iter_raw().flat_map(move |(key, val)| {
let node = u64::from_be_bytes((key.as_bytes()).try_into().unwrap());
Expand Down Expand Up @@ -617,7 +554,7 @@ impl EdgeStore {
mod tests {
use std::sync::Arc;

use crate::webgraph::{store_writer::EdgeStoreWriter, Edge, InsertableEdge};
use crate::webgraph::{store_writer::EdgeStoreWriter, Edge, FullNodeID, InsertableEdge};

use super::*;

Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/webgraph/store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use file_store::{
};

use super::{
store::{CompressedLabelBlock, EdgeStore, HostDb, LabelBlock, RangesDb},
store::{CompressedLabelBlock, EdgeStore, LabelBlock, RangesDb},
Compression, EdgeLabel, InsertableEdge, NodeDatum, NodeID, StoredEdge,
};

Expand Down Expand Up @@ -219,7 +219,6 @@ impl Drop for EdgeStoreWriter {

struct FinalEdgeStoreWriter {
ranges: RangesDb,
hosts: HostDb,

labels: IterableStoreWriter<CompressedLabelBlock, File>,
edges: ConstIterableStoreWriter<StoredEdge, File>,
Expand Down Expand Up @@ -261,7 +260,6 @@ impl FinalEdgeStoreWriter {

Self {
ranges,
hosts: HostDb::open(path.as_ref().join("hosts")),
labels,
edges,
reversed,
Expand Down Expand Up @@ -296,7 +294,6 @@ impl FinalEdgeStoreWriter {
});
}

self.hosts.insert(&node);
let node_bytes = node.id.as_u64().to_be_bytes().to_vec();

debug_assert!(self.ranges.nodes_get_raw(&node_bytes).is_none());
Expand Down Expand Up @@ -441,8 +438,6 @@ impl FinalEdgeStoreWriter {
}

fn flush(&mut self) {
self.hosts.flush();

self.ranges.commit();

self.edges.flush().unwrap();
Expand Down

0 comments on commit 2519bef

Please sign in to comment.