Skip to content

Commit

Permalink
Merge pull request #4278 from systeminit/incident-1/rebase-batches
Browse files Browse the repository at this point in the history
feat: switch rebaser to use operational transforms
  • Loading branch information
fnichol authored Aug 5, 2024
2 parents c5f15ab + b15536d commit d80ad04
Show file tree
Hide file tree
Showing 54 changed files with 1,222 additions and 3,828 deletions.
37 changes: 2 additions & 35 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::{env, fs::File, io::prelude::*};

use si_layer_cache::db::serialize;

use dal::{
workspace_snapshot::node_weight::NodeWeight, NodeWeightDiscriminants, WorkspaceSnapshotGraphV1,
};
use dal::WorkspaceSnapshotGraphV1;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;

Expand Down Expand Up @@ -39,41 +37,10 @@ fn main() -> Result<()> {
onto_vector_clock_id,
)?;

let mut last_ordering_node = None;
for update in &conflicts_and_updates.updates {
match update {
dal::workspace_snapshot::update::Update::NewEdge {
source,
destination,
..
} => {
if matches!(source.node_weight_kind, NodeWeightDiscriminants::Ordering) {
if let Some(ordering_node) = &last_ordering_node {
if let NodeWeight::Ordering(ordering) = ordering_node {
dbg!(destination, ordering.order());
}
}
}
}
dal::workspace_snapshot::update::Update::RemoveEdge { .. } => {}
dal::workspace_snapshot::update::Update::ReplaceSubgraph { onto, .. } => {
if matches!(onto.node_weight_kind, NodeWeightDiscriminants::Ordering) {
last_ordering_node = onto_graph
.get_node_weight_opt(onto.index)
.expect("couldn't get node")
.map(ToOwned::to_owned);
}
}
dal::workspace_snapshot::update::Update::MergeCategoryNodes { .. } => {}
}
dbg!(update);
}

dbg!(to_rebase_graph.perform_updates(
to_rebase_vector_clock_id,
&onto_graph,
&conflicts_and_updates.updates,
))?;

Ok(())
}

Expand Down
108 changes: 3 additions & 105 deletions lib/dal/examples/snapshot-analyzer/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
use std::{collections::HashMap, env, fs::File, io::prelude::*};
use std::{env, fs::File, io::Read as _};

use si_layer_cache::db::serialize;

use dal::{
workspace_snapshot::{
content_address::ContentAddressDiscriminants,
node_weight::{NodeWeight, NodeWeightDiscriminants},
vector_clock::HasVectorClocks,
},
EdgeWeightKindDiscriminants, WorkspaceSnapshotGraph,
};
use dal::WorkspaceSnapshotGraph;
use tokio::time::Instant;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;
Expand All @@ -31,102 +24,7 @@ async fn main() -> Result<()> {
let now = Instant::now();
let graph: WorkspaceSnapshotGraph = serialize::from_bytes(&snap_bytes)?;
println!("deserialization took: {:?}", now.elapsed());
let inner_graph = graph.graph();

let mut edge_kind_counts = HashMap::new();
let mut node_kind_counts = HashMap::new();

let mut edge_count = 0;
let mut node_count = 0;
let mut edge_vector_clock_first_seen_entries = 0;
let mut edge_vector_clock_write_entries = 0;

for edge_weight in inner_graph.edge_weights() {
edge_count += 1;
edge_vector_clock_first_seen_entries += edge_weight.vector_clock_first_seen().len();
edge_vector_clock_write_entries += edge_weight.vector_clock_write().len();

let kind: EdgeWeightKindDiscriminants = edge_weight.kind().into();
let kind_string = format!("{:?}", kind);

edge_kind_counts
.entry(kind_string)
.and_modify(|count| *count += 1)
.or_insert(1);
}

let mut node_first_seen_entries = 0;
let mut node_recently_seen_entries = 0;
let mut node_write_entries = 0;

for node_weight in inner_graph.node_weights() {
node_count += 1;

node_first_seen_entries += node_weight.vector_clock_first_seen().len();
node_recently_seen_entries += node_weight.vector_clock_recently_seen().len();
node_write_entries += node_weight.vector_clock_write().len();

let kind_string = {
if let NodeWeight::Content(content_node) = node_weight {
let cad_discrim: ContentAddressDiscriminants =
content_node.content_address().into();
cad_discrim.to_string()
} else {
let kind: NodeWeightDiscriminants = node_weight.into();
kind.to_string()
}
};

node_kind_counts
.entry(kind_string)
.and_modify(|count| *count += 1)
.or_insert(1);
}

println!("edges: {edge_count}, nodes: {node_count}");

println!(
"\nedge vector clock first seen entries: {edge_vector_clock_first_seen_entries}, {} per edge",
edge_vector_clock_first_seen_entries / edge_count
);
println!(
"edge vector clock write entries: {edge_vector_clock_write_entries}, {} per edge",
edge_vector_clock_write_entries / edge_count
);

// 128 bit id, 64 bit timestamp = 24bytes
let rough_bytes = (edge_vector_clock_first_seen_entries + edge_vector_clock_write_entries) * 24;

println!(
"edge vector clocks are ~{} bytes, which is {}% of the total snapshot",
rough_bytes,
(rough_bytes as f64 / decompressed.len() as f64) * 100.0
);

println!("\nEdge kinds:");

for (k, v) in edge_kind_counts {
println!("\t{k}: {v}");
}

println!(
"\nnode vector clock first seen entries: {node_first_seen_entries}, {} per node",
node_first_seen_entries / node_count
);
println!(
"node vector clock recently seen entries: {node_recently_seen_entries}, {} per node",
node_recently_seen_entries / node_count
);
println!(
"node vector clock write entries: {node_write_entries}, {} per node",
node_write_entries / node_count
);

println!("\nNode kinds:");

for (k, v) in node_kind_counts {
println!("\t{k}: {v}");
}
let _inner_graph = graph.graph();

Ok(())
}
2 changes: 1 addition & 1 deletion lib/dal/src/attribute/value/dependent_value_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl DependentValueGraph {
// at the time the job was created.
if workspace_snapshot
.try_get_node_index_by_id(initial_id)
.await?
.await
.is_none()
{
debug!(%initial_id, "missing node, skipping it in DependentValueGraph");
Expand Down
4 changes: 4 additions & 0 deletions lib/dal/src/builtins/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::module::Module;
use crate::{
func::intrinsics::IntrinsicFunc, pkg::import_pkg_from_pkg, BuiltinsResult, DalContext,
};
use telemetry::prelude::*;

/// We want the src/builtins/func/** files to be available at run time inside of the Docker container
/// that we build, but it would be nice to not have to include arbitrary bits of the source tree when
Expand Down Expand Up @@ -36,8 +37,11 @@ pub async fn migrate_intrinsics(ctx: &DalContext) -> BuiltinsResult<()> {
.await?
.is_none()
{
info!("importing");
import_pkg_from_pkg(ctx, &intrinsics_pkg, None).await?;
info!("imported, commiting");
ctx.blocking_commit().await?;
info!("commit finished");
}

Ok(())
Expand Down
33 changes: 25 additions & 8 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
Expand All @@ -14,6 +15,7 @@ use si_events::{ulid::Ulid, WorkspaceSnapshotAddress};
use telemetry::prelude::*;

use crate::context::{Conflicts, RebaseRequest};
use crate::slow_rt::SlowRuntimeError;
use crate::workspace_snapshot::vector_clock::VectorClockId;
use crate::{
action::{ActionError, ActionId},
Expand Down Expand Up @@ -46,6 +48,8 @@ pub enum ChangeSetError {
InvalidActor(UserPk),
#[error("invalid user system init")]
InvalidUserSystemInit,
#[error("tokio join error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("layer db error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("ulid monotonic error: {0}")]
Expand All @@ -62,6 +66,8 @@ pub enum ChangeSetError {
Pg(#[from] PgError),
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),
#[error("slow runtime error: {0}")]
SlowRuntime(#[from] SlowRuntimeError),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("ulid decode error: {0}")]
Expand Down Expand Up @@ -453,15 +459,26 @@ impl ChangeSet {
let to_rebase_change_set_id = self
.base_change_set_id
.ok_or(ChangeSetError::NoBaseChangeSet(self.id))?;
let onto_workspace_snapshot_address = self
.workspace_snapshot_address
.ok_or(ChangeSetError::NoWorkspaceSnapshot(self.id))?;
let rebase_request = RebaseRequest {
onto_workspace_snapshot_address,
onto_vector_clock_id: ctx.vector_clock_id()?,

let to_rebase_workspace_snapshot = Arc::new(
WorkspaceSnapshot::find_for_change_set(ctx, to_rebase_change_set_id)
.await
.map_err(Box::new)?,
);

if let Some(rebase_batch) = WorkspaceSnapshot::calculate_rebase_batch(
to_rebase_change_set_id,
};
ctx.do_rebase_request(rebase_request).await?;
to_rebase_workspace_snapshot,
ctx.workspace_snapshot().map_err(Box::new)?,
)
.await
.map_err(Box::new)?
{
let rebase_batch_address = ctx.write_rebase_batch(rebase_batch).await?;

let rebase_request = RebaseRequest::new(to_rebase_change_set_id, rebase_batch_address);
ctx.do_rebase_request(rebase_request).await?;
}

self.update_status(ctx, ChangeSetStatus::Applied).await?;
let user = Self::extract_userid_from_context(ctx).await;
Expand Down
8 changes: 2 additions & 6 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,11 +1376,7 @@ impl Component {
component_id: ComponentId,
) -> ComponentResult<Option<(ComponentNodeWeight, ContentHash)>> {
let id: Ulid = component_id.into();
if let Some(node_index) = ctx
.workspace_snapshot()?
.try_get_node_index_by_id(id)
.await?
{
if let Some(node_index) = ctx.workspace_snapshot()?.try_get_node_index_by_id(id).await {
let node_weight = ctx
.workspace_snapshot()?
.get_node_weight(node_index)
Expand Down Expand Up @@ -2465,7 +2461,7 @@ impl Component {
match ctx
.workspace_snapshot()?
.try_get_node_index_by_id(component_id)
.await?
.await
{
Some(component_idx) => {
let component_node_weight = ctx
Expand Down
10 changes: 0 additions & 10 deletions lib/dal/src/component/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ impl Frame {
parent_id: ComponentId,
child_id: ComponentId,
) -> FrameResult<()> {
let total_start = std::time::Instant::now();

// cache current map of input <-> output sockets based on what the parent knows about right now!!!!
let initial_impacted_values: HashSet<SocketAttributeValuePair> =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
Expand All @@ -150,10 +148,6 @@ impl Frame {
if let Some(current_parent_id) = Component::get_parent_by_id(ctx, child_id).await? {
//remove the edge
Component::remove_edge_from_frame(ctx, current_parent_id, child_id).await?;
info!(
"Remove existing edge from frame took: {:?}",
total_start.elapsed()
);
// get the map of input <-> output sockets after the edge was removed. so we can determine if more
// updates need to be made due to the upsert
// note we need to see what the current parent's tree looked like, as there could be nested impacts
Expand All @@ -172,10 +166,6 @@ impl Frame {
Component::add_edge_to_frame(ctx, parent_id, child_id, EdgeWeightKind::FrameContains)
.await?;
drop(cycle_check_guard);
info!(
"Cycle Check Guard dropped, add edge took {:?}",
total_start.elapsed()
);

// now figure out what needs to rerun!
let mut values_to_run: HashSet<SocketAttributeValuePair> = HashSet::new();
Expand Down
Loading

0 comments on commit d80ad04

Please sign in to comment.