Skip to content

Commit

Permalink
fix: fix event sending race condition for pb manager (#1455)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Nov 23, 2024
1 parent 9a89891 commit 0e2642a
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 63 deletions.
3 changes: 1 addition & 2 deletions docker/dev-full/rivet-client/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ client:
logs:
redirect_logs: true
foundationdb:
username: docker
password: docker
cluster_id: docker
address:
!static
- "foundationdb:4500"
Expand Down
3 changes: 1 addition & 2 deletions packages/infra/client/config/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ impl Metrics {
#[derive(Clone, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct FoundationDb {
pub username: String,
pub password: String,
pub cluster_id: String,
pub address: FoundationDbAddress,
}

Expand Down
2 changes: 1 addition & 1 deletion packages/infra/client/container-runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "pegboard-container-runner"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true

[[bin]]
name = "rivet-container-runner"
Expand Down
29 changes: 25 additions & 4 deletions packages/infra/client/container-runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
FROM clux/muslrust:1.81.0-stable
# This version is required for GLIBC 2.31 (used by edge servers on Linode)
FROM rust:1.82.0-bullseye AS rust

WORKDIR /app
COPY Cargo.toml .
COPY src/ src/
RUN cargo build --release --bin rivet-container-runner
COPY . .

# Installs shared libs
#
# The FDB version should match `cluster::workflows::server::install::install_scripts::components::fdb::FDB_VERSION`
RUN apt-get update && apt-get install -y libclang-dev protobuf-compiler && \
curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so"

RUN \
--mount=type=cache,target=/root/.cargo/git \
--mount=type=cache,target=/root/.cargo/registry \
--mount=type=cache,target=/app/packages/infra/client/target \
cd packages/infra/client && \
RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin rivet-container-runner && \
mkdir -p /app/dist && \
mv /app/packages/infra/client/target/release/rivet-container-runner /app/dist/rivet-container-runner

# Create an empty image and copy binaries into it to minimize the size of the image
FROM scratch
COPY --from=rust /app/dist/ /

# Allows `docker create` to work even though this fails
CMD [""]
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
*

!Cargo.lock
!Cargo.toml
!src
!packages
!resources/legacy/proto
8 changes: 4 additions & 4 deletions packages/infra/client/echo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "pegboard-echo-server"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true
version = "0.0.1"
edition = "2021"
authors = ["Rivet Gaming, LLC <[email protected]>"]
license = "Apache-2.0"

[dependencies]
tiny_http = "0.12"
1 change: 0 additions & 1 deletion packages/infra/client/echo/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
!Cargo.lock
!Cargo.toml
!src

2 changes: 1 addition & 1 deletion packages/infra/client/isolate-v8-runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN \
--mount=type=cache,target=/root/.cargo/git \
--mount=type=cache,target=/root/.cargo/registry \
--mount=type=cache,target=/app/packages/infra/client/target \
cd packages/infra/client/isolate-v8-runner && \
cd packages/infra/client && \
RUSTFLAGS="--cfg tokio_unstable" cargo build --release --target x86_64-unknown-linux-gnu --bin rivet-isolate-v8-runner && \
mkdir -p /app/dist && \
mv /app/packages/infra/client/target/x86_64-unknown-linux-gnu/release/rivet-isolate-v8-runner /app/dist/rivet-isolate-v8-runner
Expand Down
10 changes: 4 additions & 6 deletions packages/infra/client/isolate-v8-runner/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
*

!packages/infra/client/actor-kv
!packages/infra/client/config
!packages/infra/client/isolate-v8-runner/Cargo.toml
!packages/infra/client/isolate-v8-runner/src
!packages/infra/client/isolate-v8-runner/js

!Cargo.lock
!Cargo.toml
!packages
!resources/legacy/proto
57 changes: 35 additions & 22 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use uuid::Uuid;

use crate::{
actor::Actor,
event_sender::EventSender,
metrics, runner,
utils::{self, sql::SqliteConnectionExt},
};
Expand Down Expand Up @@ -61,10 +62,12 @@ struct ActorRow {

pub struct Ctx {
config: Config,
system: SystemInfo,

pool: SqlitePool,
tx: Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
event_sender: EventSender,

system: SystemInfo,
pub(crate) actors: RwLock<HashMap<Uuid, Arc<Actor>>>,
isolate_runner: RwLock<Option<runner::Handle>>,
}
Expand All @@ -78,10 +81,12 @@ impl Ctx {
) -> Arc<Self> {
Arc::new(Ctx {
config,
system,

pool,
tx: Mutex::new(tx),
event_sender: EventSender::new(),

system,
actors: RwLock::new(HashMap::new()),
isolate_runner: RwLock::new(None),
})
Expand All @@ -97,7 +102,7 @@ impl Ctx {
}
}

async fn send_packet(&self, packet: protocol::ToServer) -> Result<()> {
pub(crate) async fn send_packet(&self, packet: protocol::ToServer) -> Result<()> {
let buf = packet.serialize()?;
self.tx.lock().await.send(Message::Binary(buf)).await?;

Expand Down Expand Up @@ -153,13 +158,7 @@ impl Ctx {
pub async fn event(&self, event: protocol::Event) -> Result<()> {
let index = self.write_event(&event).await?;

let wrapped_event = protocol::EventWrapper {
index,
inner: protocol::Raw::new(&event)?,
};

self.send_packet(protocol::ToServer::Events(vec![wrapped_event]))
.await
self.event_sender.send(self, event, index).await
}

pub async fn run(
Expand Down Expand Up @@ -386,18 +385,32 @@ impl Ctx {

/// Rebuilds state from DB upon restart.
async fn rebuild(self: &Arc<Self>) -> Result<()> {
let actor_rows = utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
})
.await?;
let ((last_event_idx,), actor_rows) = tokio::try_join!(
// There should not be any database operations going on at this point so it is safe to read this
// value
utils::query(|| async {
sqlx::query_as::<_, (i64,)>(indoc!(
"
SELECT last_event_idx FROM state
",
))
.fetch_one(&mut *self.sql().await?)
.await
}),
utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
})
)?;

self.event_sender.set_idx(last_event_idx + 1);

let isolate_runner = { self.isolate_runner.read().await.clone() };

Expand Down
60 changes: 60 additions & 0 deletions packages/infra/client/manager/src/event_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::atomic::{AtomicI64, Ordering};

use anyhow::*;
use pegboard::protocol;
use tokio::sync::broadcast;

use crate::Ctx;

/// Handles sending events in a sequentially consistent order.
pub struct EventSender {
awaiting_event_idx: AtomicI64,
tx: broadcast::Sender<i64>,
}

impl EventSender {
pub fn new() -> Self {
EventSender {
awaiting_event_idx: AtomicI64::new(0),
tx: broadcast::channel(4).0,
}
}

pub fn set_idx(&self, idx: i64) {
self.awaiting_event_idx.store(idx, Ordering::SeqCst);
}

pub async fn send(&self, ctx: &Ctx, event: protocol::Event, idx: i64) -> Result<()> {
// Subscribe before checking the idx
let mut rx = self.tx.subscribe();

// Read source of truth
if idx != self.awaiting_event_idx.load(Ordering::SeqCst) {
// Wait for idx from channel
loop {
if rx.recv().await? == idx {
break;
}
}
}

// Drop receiver so it does not become a "slow receiver"
drop(rx);

let wrapped_event = protocol::EventWrapper {
index: idx,
inner: protocol::Raw::new(&event)?,
};

ctx.send_packet(protocol::ToServer::Events(vec![wrapped_event]))
.await?;

// Increment idx only after sending. We don't use `fetch_add` because we need the next value to be
// exactly `idx + 1`. This should always be the case anyway.
self.awaiting_event_idx.store(idx + 1, Ordering::SeqCst);
// An error means there are currently no receivers
let _ = self.tx.send(idx + 1);

Ok(())
}
}
4 changes: 4 additions & 0 deletions packages/infra/client/manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// TODO: Make tests work without this

// Test exports

#[cfg(feature = "test")]
mod actor;
#[cfg(feature = "test")]
mod ctx;
#[cfg(feature = "test")]
pub mod event_sender;
#[cfg(feature = "test")]
mod metrics;
#[cfg(feature = "test")]
mod runner;
Expand Down
1 change: 1 addition & 0 deletions packages/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use url::Url;

mod actor;
mod ctx;
mod event_sender;
mod metrics;
mod runner;
mod system_info;
Expand Down
7 changes: 3 additions & 4 deletions packages/infra/client/manager/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
net::Ipv4Addr,
path::Path,
time::{self, Duration},
};
Expand Down Expand Up @@ -269,9 +269,8 @@ pub async fn init_fdb_config(config: &Config) -> Result<()> {
fs::write(
config.client.data_dir().join("fdb.cluster"),
format!(
"{username}:{password}@{joined}",
username = config.client.foundationdb.username,
password = config.client.foundationdb.password,
"fdb:{cluster_id}@{joined}",
cluster_id = config.client.foundationdb.cluster_id,
),
)
.await?;
Expand Down
32 changes: 17 additions & 15 deletions packages/infra/client/manager/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,37 +196,40 @@ pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config {

let config = Config {
client: Client {
data_dir: Some(working_path.to_path_buf()),
cluster: Cluster {
client_id: Uuid::new_v4(),
datacenter_id: Uuid::new_v4(),
pegboard_endpoint: Url::parse("ws://127.0.0.1:5030").unwrap(),
// Not necessary for the test
api_endpoint: Url::parse("http://127.0.0.1").unwrap(),
foundationdb: FoundationDb::Addresses(vec!["127.0.0.1:4500".parse().unwrap()]),
},
runtime: Runtime {
runner: Runner {
// Not necessary for the test
flavor: protocol::ClientFlavor::Container,
data_dir: Some(working_path.to_path_buf()),
port: None,
container_runner_binary_path: Some(container_runner_binary_path),
isolate_runner_binary_path: Some(isolate_runner_binary_path),
},
actor: Actor {
network: ActorNetwork {
bind_ip: "127.0.0.1".parse().unwrap(),
lan_ip: "127.0.0.1".parse().unwrap(),
wan_ip: "127.0.0.1".parse().unwrap(),
lan_port_range_min: None,
lan_port_range_max: None,
wan_port_range_min: None,
wan_port_range_max: None,
},
network: Network {
bind_ip: "127.0.0.1".parse().unwrap(),
lan_ip: "127.0.0.1".parse().unwrap(),
wan_ip: "127.0.0.1".parse().unwrap(),
lan_port_range_min: None,
lan_port_range_max: None,
wan_port_range_min: None,
wan_port_range_max: None,
},
cni: Default::default(),
reserved_resources: Default::default(),
logs: Logs {
redirect_logs: Some(false),
},
metrics: Default::default(),
foundationdb: FoundationDb {
cluster_id: "fdb".into(),
address: FoundationDbAddress::Static(vec!["127.0.0.1:4500".parse().unwrap()]),
},
vector: Some(Vector {
address: "127.0.0.1:5021".into(),
}),
Expand All @@ -251,7 +254,6 @@ pub async fn start_client(
"sqlite://{}",
config
.client
.runtime
.data_dir()
.join("db")
.join("database.db")
Expand Down Expand Up @@ -342,7 +344,7 @@ pub async fn build_binaries(gen_path: &Path) {

// Js image
let status = Command::new("tar")
.arg("-czf")
.arg("-cf")
.arg(js_image_path(gen_path))
.arg("-C")
.arg(Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"))
Expand Down

0 comments on commit 0e2642a

Please sign in to comment.