From bd9fba1bf38e0e5189064940f0af9679b8bfa30e Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 13 Apr 2024 22:49:19 +0200 Subject: [PATCH] refactor fork/exec logic for the command runners --- .../process_execution/src/cache_tests.rs | 3 + .../engine/process_execution/src/fork_exec.rs | 75 +++++++++++++++++++ src/rust/engine/process_execution/src/lib.rs | 2 + .../engine/process_execution/src/local.rs | 72 +++--------------- .../process_execution/src/local_tests.rs | 3 + src/rust/engine/process_executor/src/main.rs | 2 + src/rust/engine/src/context.rs | 6 ++ 7 files changed, 100 insertions(+), 63 deletions(-) create mode 100644 src/rust/engine/process_execution/src/fork_exec.rs diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index dd0cabbb44c..7d839ab52a0 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -3,6 +3,7 @@ use std::convert::TryInto; use std::io::Write; use std::path::PathBuf; +use std::sync::Arc; use cache::PersistentCache; use sharded_lmdb::DEFAULT_LEASE_TIME; @@ -10,6 +11,7 @@ use store::{ImmutableInputs, Store}; use tempfile::TempDir; use testutil::data::TestData; use testutil::relative_paths; +use tokio::sync::RwLock; use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ @@ -35,6 +37,7 @@ fn create_local_runner() -> (Box, Store, TempDir) { NamedCaches::new_local(named_cache_dir), ImmutableInputs::new(store.clone(), base_dir.path()).unwrap(), KeepSandboxes::Never, + Arc::new(RwLock::new(())), )); (runner, store, base_dir) } diff --git a/src/rust/engine/process_execution/src/fork_exec.rs b/src/rust/engine/process_execution/src/fork_exec.rs new file mode 100644 index 00000000000..3c356d5cc10 --- /dev/null +++ b/src/rust/engine/process_execution/src/fork_exec.rs @@ -0,0 +1,75 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::io; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::RwLock; + +use crate::ManagedChild; + +/// Spawn a subprocess safely given that binaries may be written by this process. +pub async fn spawn_process( + spawn_lock: Arc>, + exclusive: bool, + mut fork_exec: impl FnMut() -> io::Result, +) -> Result { + // See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn` + // indicates the binary we're spawning was written out by the current thread, and, as such, + // there may be open file handles against it. This will occur whenever a concurrent call of this + // method proceeds through its fork point + // (https://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html) while the current + // thread is in the middle of writing the binary and thus captures a clone of the open file + // handle, but that concurrent call has not yet gotten to its exec point + // (https://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html) where the operating + // system will close the cloned file handle (via O_CLOEXEC being set on all files opened by + // Rust). To prevent a race like this holding this thread's binary open leading to an ETXTBSY + // (https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html) error, we + // maintain RwLock that allows non-`exclusive_spawn` binaries to spawn concurrently but ensures + // all such concurrent spawns have completed (and thus closed any cloned file handles) before + // proceeding to spawn the `exclusive_spawn` binary this thread has written. + // + // See: https://github.com/golang/go/issues/22315 for an excellent description of this generic + // unix problem. + + if exclusive { + let _write_locked = spawn_lock.write().await; + + // Despite the mitigations taken against racing our own forks, forks can happen in our + // process but outside of our control (in libraries). As such, we back-stop by sleeping and + // trying again for a while if we do hit one of these fork races we do not control. + const MAX_ETXTBSY_WAIT: Duration = Duration::from_millis(100); + let mut retries: u32 = 0; + let mut sleep_millis = 1; + + let start_time = std::time::Instant::now(); + loop { + match fork_exec() { + Err(e) => { + if e.raw_os_error() == Some(libc::ETXTBSY) + && start_time.elapsed() < MAX_ETXTBSY_WAIT + { + tokio::time::sleep(std::time::Duration::from_millis(sleep_millis)).await; + retries += 1; + sleep_millis *= 2; + continue; + } else if retries > 0 { + break Err(format!( + "Error launching process after {} {} for ETXTBSY. Final error was: {:?}", + retries, + if retries == 1 { "retry" } else { "retries" }, + e + )); + } else { + break Err(format!("Error launching process: {e:?}")); + } + } + Ok(child) => break Ok(child), + } + } + } else { + let _read_locked = spawn_lock.read().await; + fork_exec().map_err(|e| format!("Error launching process: {e:?}")) + } +} diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index efbdb47af38..c7e05fee7bb 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -59,6 +59,8 @@ pub mod named_caches; #[cfg(test)] pub mod named_caches_tests; +pub(crate) mod fork_exec; + extern crate uname; pub use crate::children::ManagedChild; diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 92ab5533b6e..075e4d5dc4f 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -31,10 +31,11 @@ use task_executor::Executor; use tempfile::TempDir; use tokio::process::Command; use tokio::sync::RwLock; -use tokio::time::{timeout, Duration}; +use tokio::time::timeout; use tokio_util::codec::{BytesCodec, FramedRead}; use workunit_store::{in_workunit, Level, Metric, RunningWorkunit}; +use crate::fork_exec::spawn_process; use crate::{ Context, FallibleProcessResultWithPlatform, ManagedChild, NamedCaches, Process, ProcessError, ProcessResultMetadata, ProcessResultSource, @@ -57,7 +58,7 @@ pub struct CommandRunner { named_caches: NamedCaches, immutable_inputs: ImmutableInputs, keep_sandboxes: KeepSandboxes, - spawn_lock: RwLock<()>, + spawn_lock: Arc>, } impl CommandRunner { @@ -68,6 +69,7 @@ impl CommandRunner { named_caches: NamedCaches, immutable_inputs: ImmutableInputs, keep_sandboxes: KeepSandboxes, + spawn_lock: Arc>, ) -> CommandRunner { CommandRunner { store, @@ -76,7 +78,7 @@ impl CommandRunner { named_caches, immutable_inputs, keep_sandboxes, - spawn_lock: RwLock::new(()), + spawn_lock, } } @@ -307,66 +309,10 @@ impl CapturedWorkdir for CommandRunner { .stdout(Stdio::piped()) .stderr(Stdio::piped()); - // See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn` - // indicates the binary we're spawning was written out by the current thread, and, as such, - // there may be open file handles against it. This will occur whenever a concurrent call of this - // method proceeds through its fork point - // (https://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html) while the current - // thread is in the middle of writing the binary and thus captures a clone of the open file - // handle, but that concurrent call has not yet gotten to its exec point - // (https://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html) where the operating - // system will close the cloned file handle (via O_CLOEXEC being set on all files opened by - // Rust). To prevent a race like this holding this thread's binary open leading to an ETXTBSY - // (https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html) error, we - // maintain RwLock that allows non-`exclusive_spawn` binaries to spawn concurrently but ensures - // all such concurrent spawns have completed (and thus closed any cloned file handles) before - // proceeding to spawn the `exclusive_spawn` binary this thread has written. - // - // See: https://github.com/golang/go/issues/22315 for an excellent description of this generic - // unix problem. - let mut fork_exec = move || ManagedChild::spawn(&mut command, None); - let mut child = { - if exclusive_spawn { - let _write_locked = self.spawn_lock.write().await; - - // Despite the mitigations taken against racing our own forks, forks can happen in our - // process but outside of our control (in libraries). As such, we back-stop by sleeping and - // trying again for a while if we do hit one of these fork races we do not control. - const MAX_ETXTBSY_WAIT: Duration = Duration::from_millis(100); - let mut retries: u32 = 0; - let mut sleep_millis = 1; - - let start_time = std::time::Instant::now(); - loop { - match fork_exec() { - Err(e) => { - if e.raw_os_error() == Some(libc::ETXTBSY) - && start_time.elapsed() < MAX_ETXTBSY_WAIT - { - tokio::time::sleep(std::time::Duration::from_millis(sleep_millis)) - .await; - retries += 1; - sleep_millis *= 2; - continue; - } else if retries > 0 { - break Err(format!( - "Error launching process after {} {} for ETXTBSY. Final error was: {:?}", - retries, - if retries == 1 { "retry" } else { "retries" }, - e - )); - } else { - break Err(format!("Error launching process: {e:?}")); - } - } - Ok(child) => break Ok(child), - } - } - } else { - let _read_locked = self.spawn_lock.read().await; - fork_exec().map_err(|e| format!("Error launching process: {e:?}")) - } - }?; + let mut child = spawn_process(self.spawn_lock.clone(), exclusive_spawn, move || { + ManagedChild::spawn(&mut command, None) + }) + .await?; debug!("spawned local process as {:?} for {:?}", child.id(), req); let stdout_stream = FramedRead::new(child.stdout.take().unwrap(), BytesCodec::new()) diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index a05c63803e1..38bf4cd8f8e 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::path::PathBuf; use std::str; +use std::sync::Arc; use std::time::Duration; use maplit::hashset; @@ -14,6 +15,7 @@ use store::{ImmutableInputs, Store}; use testutil::data::{TestData, TestDirectory}; use testutil::path::{find_bash, which}; use testutil::{owned_string_vec, relative_paths}; +use tokio::sync::RwLock; use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ @@ -794,6 +796,7 @@ async fn run_command_locally_in_dir( named_caches, immutable_inputs, cleanup, + Arc::new(RwLock::new(())), ); let original = runner.run(Context::default(), workunit, req).await?; let stdout_bytes = store diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 24967b0c8ed..148344e1e0d 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -23,6 +23,7 @@ use protos::gen::buildbarn::cas::UncachedActionResult; use protos::require_digest; use remote::remote_cache::RemoteCacheRunnerOptions; use store::{ImmutableInputs, RemoteProvider, RemoteStoreOptions, Store}; +use tokio::sync::RwLock; use workunit_store::{in_workunit, Level, WorkunitStore}; #[derive(Clone, Debug, Default)] @@ -393,6 +394,7 @@ async fn main() { ), ImmutableInputs::new(store.clone(), &workdir).unwrap(), KeepSandboxes::Never, + Arc::new(RwLock::new(())), )) as Box, }; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 7fbddac08b4..3a7423211ba 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -37,6 +37,7 @@ use remote::{self, remote_cache}; use rule_graph::RuleGraph; use store::{self, ImmutableInputs, RemoteProvider, RemoteStoreOptions, Store}; use task_executor::Executor; +use tokio::sync::RwLock; use watch::{Invalidatable, InvalidateCaller, InvalidationWatcher}; use workunit_store::{Metric, RunningWorkunit}; @@ -210,6 +211,10 @@ impl Core { exec_strategy_opts: &ExecutionStrategyOptions, remoting_opts: &RemotingOptions, ) -> Result, String> { + // Lock shared between local command runner (and in future work) other "local" command runners + // for spawning processes. + let spawn_lock = Arc::new(RwLock::new(())); + let local_command_runner = local::CommandRunner::new( local_runner_store.clone(), executor.clone(), @@ -217,6 +222,7 @@ impl Core { named_caches.clone(), immutable_inputs.clone(), exec_strategy_opts.local_keep_sandboxes, + spawn_lock.clone(), ); let runner: Box = if exec_strategy_opts.local_enable_nailgun {