diff --git a/docker/dev-full/rivet-client/config.yaml b/docker/dev-full/rivet-client/config.yaml index 9c6a3afa2..9d66fd6f0 100644 --- a/docker/dev-full/rivet-client/config.yaml +++ b/docker/dev-full/rivet-client/config.yaml @@ -25,8 +25,7 @@ client: logs: redirect_logs: true foundationdb: - username: docker - password: docker + cluster_id: docker address: !static - "foundationdb:4500" diff --git a/packages/infra/client/config/src/manager.rs b/packages/infra/client/config/src/manager.rs index d823626fc..2a92969f3 100644 --- a/packages/infra/client/config/src/manager.rs +++ b/packages/infra/client/config/src/manager.rs @@ -224,8 +224,7 @@ impl Metrics { #[derive(Clone, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct FoundationDb { - pub username: String, - pub password: String, + pub cluster_id: String, pub address: FoundationDbAddress, } diff --git a/packages/infra/client/container-runner/Cargo.toml b/packages/infra/client/container-runner/Cargo.toml index e02ecd4a8..93ccadb4d 100644 --- a/packages/infra/client/container-runner/Cargo.toml +++ b/packages/infra/client/container-runner/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "pegboard-container-runner" version.workspace = true +edition.workspace = true authors.workspace = true license.workspace = true -edition.workspace = true [[bin]] name = "rivet-container-runner" diff --git a/packages/infra/client/container-runner/Dockerfile b/packages/infra/client/container-runner/Dockerfile index 3d80fcadb..06dba8706 100644 --- a/packages/infra/client/container-runner/Dockerfile +++ b/packages/infra/client/container-runner/Dockerfile @@ -1,6 +1,27 @@ -FROM clux/muslrust:1.81.0-stable +# This version is required for GLIBC 2.31 (used by edge servers on Linode) +FROM rust:1.82.0-bullseye AS rust WORKDIR /app -COPY Cargo.toml . -COPY src/ src/ -RUN cargo build --release --bin rivet-container-runner +COPY . . + +# Installs shared libs +# +# The FDB version should match `cluster::workflows::server::install::install_scripts::components::fdb::FDB_VERSION` +RUN apt-get update && apt-get install -y libclang-dev protobuf-compiler && \ + curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so" + +RUN \ + --mount=type=cache,target=/root/.cargo/git \ + --mount=type=cache,target=/root/.cargo/registry \ + --mount=type=cache,target=/app/packages/infra/client/target \ + cd packages/infra/client && \ + RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin rivet-container-runner && \ + mkdir -p /app/dist && \ + mv /app/packages/infra/client/target/release/rivet-container-runner /app/dist/rivet-container-runner + +# Create an empty image and copy binaries into it to minimize the size of the image +FROM scratch +COPY --from=rust /app/dist/ / + +# Allows `docker create` to work even though this fails +CMD [""] diff --git a/packages/infra/client/container-runner/Dockerfile.dockerignore b/packages/infra/client/container-runner/Dockerfile.dockerignore index 03e816fa2..cd737a51e 100644 --- a/packages/infra/client/container-runner/Dockerfile.dockerignore +++ b/packages/infra/client/container-runner/Dockerfile.dockerignore @@ -1,4 +1,6 @@ * + !Cargo.lock !Cargo.toml -!src +!packages +!resources/legacy/proto diff --git a/packages/infra/client/echo/Cargo.toml b/packages/infra/client/echo/Cargo.toml index dadf7d2d9..ffe4a2e3b 100644 --- a/packages/infra/client/echo/Cargo.toml +++ b/packages/infra/client/echo/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "pegboard-echo-server" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true +version = "0.0.1" +edition = "2021" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" [dependencies] tiny_http = "0.12" diff --git a/packages/infra/client/echo/Dockerfile.dockerignore b/packages/infra/client/echo/Dockerfile.dockerignore index 78a813a10..03e816fa2 100644 --- a/packages/infra/client/echo/Dockerfile.dockerignore +++ b/packages/infra/client/echo/Dockerfile.dockerignore @@ -2,4 +2,3 @@ !Cargo.lock !Cargo.toml !src - diff --git a/packages/infra/client/isolate-v8-runner/Dockerfile b/packages/infra/client/isolate-v8-runner/Dockerfile index 3221fd58c..254d2198a 100644 --- a/packages/infra/client/isolate-v8-runner/Dockerfile +++ b/packages/infra/client/isolate-v8-runner/Dockerfile @@ -14,7 +14,7 @@ RUN \ --mount=type=cache,target=/root/.cargo/git \ --mount=type=cache,target=/root/.cargo/registry \ --mount=type=cache,target=/app/packages/infra/client/target \ - cd packages/infra/client/isolate-v8-runner && \ + cd packages/infra/client && \ RUSTFLAGS="--cfg tokio_unstable" cargo build --release --target x86_64-unknown-linux-gnu --bin rivet-isolate-v8-runner && \ mkdir -p /app/dist && \ mv /app/packages/infra/client/target/x86_64-unknown-linux-gnu/release/rivet-isolate-v8-runner /app/dist/rivet-isolate-v8-runner diff --git a/packages/infra/client/isolate-v8-runner/Dockerfile.dockerignore b/packages/infra/client/isolate-v8-runner/Dockerfile.dockerignore index 808881f1c..cd737a51e 100644 --- a/packages/infra/client/isolate-v8-runner/Dockerfile.dockerignore +++ b/packages/infra/client/isolate-v8-runner/Dockerfile.dockerignore @@ -1,8 +1,6 @@ * -!packages/infra/client/actor-kv -!packages/infra/client/config -!packages/infra/client/isolate-v8-runner/Cargo.toml -!packages/infra/client/isolate-v8-runner/src -!packages/infra/client/isolate-v8-runner/js - +!Cargo.lock +!Cargo.toml +!packages +!resources/legacy/proto diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index 49f2e680c..1a334258f 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -28,6 +28,7 @@ use uuid::Uuid; use crate::{ actor::Actor, + event_sender::EventSender, metrics, runner, utils::{self, sql::SqliteConnectionExt}, }; @@ -61,10 +62,12 @@ struct ActorRow { pub struct Ctx { config: Config, + system: SystemInfo, + pool: SqlitePool, tx: Mutex>, Message>>, + event_sender: EventSender, - system: SystemInfo, pub(crate) actors: RwLock>>, isolate_runner: RwLock>, } @@ -78,10 +81,12 @@ impl Ctx { ) -> Arc { Arc::new(Ctx { config, + system, + pool, tx: Mutex::new(tx), + event_sender: EventSender::new(), - system, actors: RwLock::new(HashMap::new()), isolate_runner: RwLock::new(None), }) @@ -97,7 +102,7 @@ impl Ctx { } } - async fn send_packet(&self, packet: protocol::ToServer) -> Result<()> { + pub(crate) async fn send_packet(&self, packet: protocol::ToServer) -> Result<()> { let buf = packet.serialize()?; self.tx.lock().await.send(Message::Binary(buf)).await?; @@ -153,13 +158,7 @@ impl Ctx { pub async fn event(&self, event: protocol::Event) -> Result<()> { let index = self.write_event(&event).await?; - let wrapped_event = protocol::EventWrapper { - index, - inner: protocol::Raw::new(&event)?, - }; - - self.send_packet(protocol::ToServer::Events(vec![wrapped_event])) - .await + self.event_sender.send(self, event, index).await } pub async fn run( @@ -386,18 +385,32 @@ impl Ctx { /// Rebuilds state from DB upon restart. async fn rebuild(self: &Arc) -> Result<()> { - let actor_rows = utils::query(|| async { - sqlx::query_as::<_, ActorRow>(indoc!( - " - SELECT actor_id, config, pid, stop_ts - FROM actors - WHERE exit_ts IS NULL - ", - )) - .fetch_all(&mut *self.sql().await?) - .await - }) - .await?; + let ((last_event_idx,), actor_rows) = tokio::try_join!( + // There should not be any database operations going on at this point so it is safe to read this + // value + utils::query(|| async { + sqlx::query_as::<_, (i64,)>(indoc!( + " + SELECT last_event_idx FROM state + ", + )) + .fetch_one(&mut *self.sql().await?) + .await + }), + utils::query(|| async { + sqlx::query_as::<_, ActorRow>(indoc!( + " + SELECT actor_id, config, pid, stop_ts + FROM actors + WHERE exit_ts IS NULL + ", + )) + .fetch_all(&mut *self.sql().await?) + .await + }) + )?; + + self.event_sender.set_idx(last_event_idx + 1); let isolate_runner = { self.isolate_runner.read().await.clone() }; diff --git a/packages/infra/client/manager/src/event_sender.rs b/packages/infra/client/manager/src/event_sender.rs new file mode 100644 index 000000000..3cebb18b8 --- /dev/null +++ b/packages/infra/client/manager/src/event_sender.rs @@ -0,0 +1,60 @@ +use std::sync::atomic::{AtomicI64, Ordering}; + +use anyhow::*; +use pegboard::protocol; +use tokio::sync::broadcast; + +use crate::Ctx; + +/// Handles sending events in a sequentially consistent order. +pub struct EventSender { + awaiting_event_idx: AtomicI64, + tx: broadcast::Sender, +} + +impl EventSender { + pub fn new() -> Self { + EventSender { + awaiting_event_idx: AtomicI64::new(0), + tx: broadcast::channel(4).0, + } + } + + pub fn set_idx(&self, idx: i64) { + self.awaiting_event_idx.store(idx, Ordering::SeqCst); + } + + pub async fn send(&self, ctx: &Ctx, event: protocol::Event, idx: i64) -> Result<()> { + // Subscribe before checking the idx + let mut rx = self.tx.subscribe(); + + // Read source of truth + if idx != self.awaiting_event_idx.load(Ordering::SeqCst) { + // Wait for idx from channel + loop { + if rx.recv().await? == idx { + break; + } + } + } + + // Drop receiver so it does not become a "slow receiver" + drop(rx); + + let wrapped_event = protocol::EventWrapper { + index: idx, + inner: protocol::Raw::new(&event)?, + }; + + ctx.send_packet(protocol::ToServer::Events(vec![wrapped_event])) + .await?; + + // Increment idx only after sending. We don't use `fetch_add` because we need the next value to be + // exactly `idx + 1`. This should always be the case anyway. + self.awaiting_event_idx.store(idx + 1, Ordering::SeqCst); + // An error means there are currently no receivers + let _ = self.tx.send(idx + 1); + + Ok(()) + } +} diff --git a/packages/infra/client/manager/src/lib.rs b/packages/infra/client/manager/src/lib.rs index 497bd57cb..3d03a054a 100644 --- a/packages/infra/client/manager/src/lib.rs +++ b/packages/infra/client/manager/src/lib.rs @@ -1,3 +1,5 @@ +// TODO: Make tests work without this + // Test exports #[cfg(feature = "test")] @@ -5,6 +7,8 @@ mod actor; #[cfg(feature = "test")] mod ctx; #[cfg(feature = "test")] +pub mod event_sender; +#[cfg(feature = "test")] mod metrics; #[cfg(feature = "test")] mod runner; diff --git a/packages/infra/client/manager/src/main.rs b/packages/infra/client/manager/src/main.rs index 4d08e53e5..8dd375ea7 100644 --- a/packages/infra/client/manager/src/main.rs +++ b/packages/infra/client/manager/src/main.rs @@ -19,6 +19,7 @@ use url::Url; mod actor; mod ctx; +mod event_sender; mod metrics; mod runner; mod system_info; diff --git a/packages/infra/client/manager/src/utils/mod.rs b/packages/infra/client/manager/src/utils/mod.rs index 70a3f4262..9315817cc 100644 --- a/packages/infra/client/manager/src/utils/mod.rs +++ b/packages/infra/client/manager/src/utils/mod.rs @@ -1,5 +1,5 @@ use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + net::Ipv4Addr, path::Path, time::{self, Duration}, }; @@ -269,9 +269,8 @@ pub async fn init_fdb_config(config: &Config) -> Result<()> { fs::write( config.client.data_dir().join("fdb.cluster"), format!( - "{username}:{password}@{joined}", - username = config.client.foundationdb.username, - password = config.client.foundationdb.password, + "fdb:{cluster_id}@{joined}", + cluster_id = config.client.foundationdb.cluster_id, ), ) .await?; diff --git a/packages/infra/client/manager/tests/common.rs b/packages/infra/client/manager/tests/common.rs index e7f9cf36f..7ab48b752 100644 --- a/packages/infra/client/manager/tests/common.rs +++ b/packages/infra/client/manager/tests/common.rs @@ -196,37 +196,40 @@ pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config { let config = Config { client: Client { + data_dir: Some(working_path.to_path_buf()), cluster: Cluster { client_id: Uuid::new_v4(), datacenter_id: Uuid::new_v4(), pegboard_endpoint: Url::parse("ws://127.0.0.1:5030").unwrap(), // Not necessary for the test api_endpoint: Url::parse("http://127.0.0.1").unwrap(), - foundationdb: FoundationDb::Addresses(vec!["127.0.0.1:4500".parse().unwrap()]), }, - runtime: Runtime { + runner: Runner { // Not necessary for the test flavor: protocol::ClientFlavor::Container, - data_dir: Some(working_path.to_path_buf()), + port: None, container_runner_binary_path: Some(container_runner_binary_path), isolate_runner_binary_path: Some(isolate_runner_binary_path), }, - actor: Actor { - network: ActorNetwork { - bind_ip: "127.0.0.1".parse().unwrap(), - lan_ip: "127.0.0.1".parse().unwrap(), - wan_ip: "127.0.0.1".parse().unwrap(), - lan_port_range_min: None, - lan_port_range_max: None, - wan_port_range_min: None, - wan_port_range_max: None, - }, + network: Network { + bind_ip: "127.0.0.1".parse().unwrap(), + lan_ip: "127.0.0.1".parse().unwrap(), + wan_ip: "127.0.0.1".parse().unwrap(), + lan_port_range_min: None, + lan_port_range_max: None, + wan_port_range_min: None, + wan_port_range_max: None, }, cni: Default::default(), reserved_resources: Default::default(), logs: Logs { redirect_logs: Some(false), }, + metrics: Default::default(), + foundationdb: FoundationDb { + cluster_id: "fdb".into(), + address: FoundationDbAddress::Static(vec!["127.0.0.1:4500".parse().unwrap()]), + }, vector: Some(Vector { address: "127.0.0.1:5021".into(), }), @@ -251,7 +254,6 @@ pub async fn start_client( "sqlite://{}", config .client - .runtime .data_dir() .join("db") .join("database.db") @@ -342,7 +344,7 @@ pub async fn build_binaries(gen_path: &Path) { // Js image let status = Command::new("tar") - .arg("-czf") + .arg("-cf") .arg(js_image_path(gen_path)) .arg("-C") .arg(Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"))