From bf01e28aad6d813d39220cb73a6a257ac6aa892a Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 18:26:17 -0400 Subject: [PATCH 1/9] move intrinics setup to a mod.rs in subdirectory --- src/rust/engine/src/{intrinsics.rs => intrinsics/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/rust/engine/src/{intrinsics.rs => intrinsics/mod.rs} (100%) diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics/mod.rs similarity index 100% rename from src/rust/engine/src/intrinsics.rs rename to src/rust/engine/src/intrinsics/mod.rs From 91093b05b8f0bd3374e539eac5306488cee0df88 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 19:03:22 -0400 Subject: [PATCH 2/9] refactor docker intrinsic --- src/rust/engine/src/intrinsics/docker.rs | 67 ++++++++++++++++++++++++ src/rust/engine/src/intrinsics/mod.rs | 62 ++-------------------- 2 files changed, 72 insertions(+), 57 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/docker.rs diff --git a/src/rust/engine/src/intrinsics/docker.rs b/src/rust/engine/src/intrinsics/docker.rs new file mode 100644 index 00000000000..4074ad650ba --- /dev/null +++ b/src/rust/engine/src/intrinsics/docker.rs @@ -0,0 +1,67 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use docker::docker::{ImagePullPolicy, ImagePullScope, DOCKER, IMAGE_PULL_CACHE}; +use futures::future::{BoxFuture, FutureExt}; +use process_execution::Platform; +use pyo3::types::PyString; +use pyo3::{Python, ToPyObject}; + +use crate::context::Context; +use crate::externs; +use crate::nodes::NodeResult; +use crate::python::Value; + +pub(crate) fn docker_resolve_image( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let types = &context.core.types; + let docker_resolve_image_result = types.docker_resolve_image_result; + + let (image_name, platform) = Python::with_gil(|py| { + let py_docker_request = (*args[0]).as_ref(py); + let image_name: String = externs::getattr(py_docker_request, "image_name").unwrap(); + let platform: String = externs::getattr(py_docker_request, "platform").unwrap(); + (image_name, platform) + }); + + let platform = Platform::try_from(platform)?; + + let docker = DOCKER.get().await?; + let image_pull_scope = ImagePullScope::new(context.session.build_id()); + + // Ensure that the image has been pulled. + IMAGE_PULL_CACHE + .pull_image( + docker, + &context.core.executor, + &image_name, + &platform, + image_pull_scope, + ImagePullPolicy::OnlyIfLatestOrMissing, + ) + .await + .map_err(|err| format!("Failed to pull image `{image_name}`: {err}"))?; + + let image_metadata = docker.inspect_image(&image_name).await.map_err(|err| { + format!( + "Failed to resolve image ID for image `{}`: {:?}", + &image_name, err + ) + })?; + let image_id = image_metadata + .id + .ok_or_else(|| format!("Image does not exist: `{}`", &image_name))?; + + Ok(Python::with_gil(|py| { + externs::unsafe_call( + py, + docker_resolve_image_result, + &[Value::from(PyString::new(py, &image_id).to_object(py))], + ) + })) + } + .boxed() +} diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index f1bcac79b2e..d495ba32217 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -31,11 +31,9 @@ use dep_inference::python::ParsedPythonDependencies; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; -use pyo3::types::PyString; use pyo3::{IntoPy, PyAny, PyRef, Python, ToPyObject}; use tokio::process; -use docker::docker::{ImagePullPolicy, ImagePullScope, DOCKER, IMAGE_PULL_CACHE}; use fs::{ DigestTrie, DirectoryDigest, Entry, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, @@ -44,7 +42,7 @@ use hashing::{Digest, EMPTY_DIGEST}; use process_execution::local::{ apply_chroot, create_sandbox, prepare_workdir, setup_run_sh_script, KeepSandboxes, }; -use process_execution::{ManagedChild, Platform, ProcessExecutionStrategy}; +use process_execution::{ManagedChild, ProcessExecutionStrategy}; use rule_graph::{DependencyKey, RuleId}; use stdio::TryCloneAsFile; use store::{SnapshotOps, Store, SubsetParams}; @@ -54,6 +52,10 @@ use workunit_store::{in_workunit, Level}; use grpc_util::prost::MessageExt; +mod docker; + +use self::docker::docker_resolve_image; + type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -793,60 +795,6 @@ fn interactive_process( ).boxed() } -fn docker_resolve_image( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let types = &context.core.types; - let docker_resolve_image_result = types.docker_resolve_image_result; - - let (image_name, platform) = Python::with_gil(|py| { - let py_docker_request = (*args[0]).as_ref(py); - let image_name: String = externs::getattr(py_docker_request, "image_name").unwrap(); - let platform: String = externs::getattr(py_docker_request, "platform").unwrap(); - (image_name, platform) - }); - - let platform = Platform::try_from(platform)?; - - let docker = DOCKER.get().await?; - let image_pull_scope = ImagePullScope::new(context.session.build_id()); - - // Ensure that the image has been pulled. - IMAGE_PULL_CACHE - .pull_image( - docker, - &context.core.executor, - &image_name, - &platform, - image_pull_scope, - ImagePullPolicy::OnlyIfLatestOrMissing, - ) - .await - .map_err(|err| format!("Failed to pull image `{image_name}`: {err}"))?; - - let image_metadata = docker.inspect_image(&image_name).await.map_err(|err| { - format!( - "Failed to resolve image ID for image `{}`: {:?}", - &image_name, err - ) - })?; - let image_id = image_metadata - .id - .ok_or_else(|| format!("Image does not exist: `{}`", &image_name))?; - - Ok(Python::with_gil(|py| { - externs::unsafe_call( - py, - docker_resolve_image_result, - &[Value::from(PyString::new(py, &image_id).to_object(py))], - ) - })) - } - .boxed() -} - struct PreparedInferenceRequest { digest: Digest, /// The request that's guaranteed to have been constructed via ::prepare(). From b43678f7697d7d5d4990413c64490b3f9cb5ee1a Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 21:09:13 -0400 Subject: [PATCH 3/9] refactor dep inference intrinsics --- .../engine/src/intrinsics/dep_inference.rs | 251 ++++++++++++++++++ src/rust/engine/src/intrinsics/mod.rs | 243 +---------------- 2 files changed, 257 insertions(+), 237 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/dep_inference.rs diff --git a/src/rust/engine/src/intrinsics/dep_inference.rs b/src/rust/engine/src/intrinsics/dep_inference.rs new file mode 100644 index 00000000000..402792ed74a --- /dev/null +++ b/src/rust/engine/src/intrinsics/dep_inference.rs @@ -0,0 +1,251 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::Bytes; +use dep_inference::javascript::ParsedJavascriptDependencies; +use dep_inference::python::ParsedPythonDependencies; +use dep_inference::{javascript, python}; +use fs::{DirectoryDigest, Entry, SymlinkBehavior}; +use futures::future::{BoxFuture, FutureExt}; +use grpc_util::prost::MessageExt; +use hashing::Digest; +use protos::gen::pants::cache::{ + dependency_inference_request, CacheKey, CacheKeyType, DependencyInferenceRequest, +}; +use pyo3::{Python, ToPyObject}; +use store::Store; +use workunit_store::{in_workunit, Level}; + +use crate::context::Context; +use crate::externs::dep_inference::PyNativeDependenciesRequest; +use crate::nodes::NodeResult; +use crate::python::Value; +use crate::{externs, Core}; + +pub(crate) struct PreparedInferenceRequest { + digest: Digest, + /// The request that's guaranteed to have been constructed via ::prepare(). + /// + /// NB. this `inner` value is used as the cache key, so anything that can influence the dep + /// inference should (also) be inside it, not just a key on the outer struct + inner: DependencyInferenceRequest, +} + +impl PreparedInferenceRequest { + pub async fn prepare( + args: Vec, + store: &Store, + backend: &str, + impl_hash: &str, + ) -> NodeResult { + let PyNativeDependenciesRequest { + directory_digest, + metadata, + } = Python::with_gil(|py| (*args[0]).as_ref(py).extract())?; + + let (path, digest) = Self::find_one_file(directory_digest, store, backend).await?; + let str_path = path.display().to_string(); + + Ok(Self { + digest, + inner: DependencyInferenceRequest { + input_file_path: str_path, + input_file_digest: Some(digest.into()), + metadata, + impl_hash: impl_hash.to_string(), + }, + }) + } + + pub async fn read_digest(&self, store: &Store) -> NodeResult { + let bytes = store + .load_file_bytes_with(self.digest, |bytes| Vec::from(bytes)) + .await?; + + Ok(String::from_utf8(bytes) + .map_err(|err| format!("Failed to convert digest bytes to utf-8: {err}"))?) + } + + async fn find_one_file( + directory_digest: DirectoryDigest, + store: &Store, + backend: &str, + ) -> NodeResult<(PathBuf, Digest)> { + let mut path = None; + let mut digest = None; + store + .load_digest_trie(directory_digest.clone()) + .await? + .walk(SymlinkBehavior::Oblivious, &mut |node_path, entry| { + if let Entry::File(file) = entry { + path = Some(node_path.to_owned()); + digest = Some(file.digest()); + } + }); + if digest.is_none() || path.is_none() { + Err(format!( + "Couldn't find a file in digest for {backend} inference: {directory_digest:?}" + ))? + } + let path = path.unwrap(); + let digest = digest.unwrap(); + Ok((path, digest)) + } + + fn cache_key(&self) -> CacheKey { + CacheKey { + key_type: CacheKeyType::DepInferenceRequest.into(), + digest: Some(Digest::of_bytes(&self.inner.to_bytes()).into()), + } + } +} + +pub(crate) fn parse_python_deps( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let core = &context.core; + let store = core.store(); + let prepared_inference_request = + PreparedInferenceRequest::prepare(args, &store, "Python", python::IMPL_HASH).await?; + in_workunit!( + "parse_python_dependencies", + Level::Debug, + desc = Some(format!( + "Determine Python dependencies for {:?}", + &prepared_inference_request.inner.input_file_path + )), + |_workunit| async move { + let result: ParsedPythonDependencies = get_or_create_inferred_dependencies( + core, + &store, + prepared_inference_request, + |content, request| { + python::get_dependencies(content, request.inner.input_file_path.into()) + }, + ) + .await?; + + let result = Python::with_gil(|py| { + externs::unsafe_call( + py, + core.types.parsed_python_deps_result, + &[ + result.imports.to_object(py).into(), + result.string_candidates.to_object(py).into(), + ], + ) + }); + + Ok(result) + } + ) + .await + } + .boxed() +} + +pub(crate) fn parse_javascript_deps( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let core = &context.core; + let store = core.store(); + let prepared_inference_request = + PreparedInferenceRequest::prepare(args, &store, "Javascript", javascript::IMPL_HASH) + .await?; + + in_workunit!( + "parse_javascript_dependencies", + Level::Debug, + desc = Some(format!( + "Determine Javascript dependencies for {:?}", + prepared_inference_request.inner.input_file_path + )), + |_workunit| async move { + let result: ParsedJavascriptDependencies = get_or_create_inferred_dependencies( + core, + &store, + prepared_inference_request, + |content, request| { + if let Some(dependency_inference_request::Metadata::Js(metadata)) = + request.inner.metadata + { + javascript::get_dependencies( + content, + request.inner.input_file_path.into(), + metadata, + ) + } else { + Err(format!( + "{:?} is not valid metadata for Javascript dependency inference", + request.inner.metadata + )) + } + }, + ) + .await?; + + let result = Python::with_gil(|py| { + externs::unsafe_call( + py, + core.types.parsed_javascript_deps_result, + &[ + result.file_imports.to_object(py).into(), + result.package_imports.to_object(py).into(), + ], + ) + }); + + Ok(result) + } + ) + .await + } + .boxed() +} + +pub(crate) async fn get_or_create_inferred_dependencies( + core: &Arc, + store: &Store, + request: PreparedInferenceRequest, + dependencies_parser: F, +) -> NodeResult +where + T: serde::de::DeserializeOwned + serde::Serialize, + F: Fn(&str, PreparedInferenceRequest) -> Result, +{ + let cache_key = request.cache_key(); + let result = + if let Some(result) = lookup_inferred_dependencies(&cache_key, core).await? { + result + } else { + let contents = request.read_digest(store).await?; + let result = dependencies_parser(&contents, request)?; + core.local_cache + .store( + &cache_key, + Bytes::from(serde_json::to_string(&result).map_err(|e| { + format!("Failed to serialize dep inference cache result: {e}") + })?), + ) + .await?; + result + }; + Ok(result) +} + +pub(crate) async fn lookup_inferred_dependencies( + key: &CacheKey, + core: &Arc, +) -> NodeResult> { + let cached_result = core.local_cache.load(key).await?; + Ok(cached_result + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .flatten()) +} diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index d495ba32217..1f867341f36 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -6,10 +6,10 @@ use std::env::current_dir; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use crate::context::Context; +use crate::externs; use crate::externs::fs::{PyAddPrefix, PyFileDigest, PyMergeDigests, PyRemovePrefix}; use crate::nodes::{ lift_directory_digest, task_side_effected, unmatched_globs_additional_context, DownloadedFile, @@ -19,15 +19,7 @@ use crate::python::{throw, Key, Value}; use crate::tasks::Intrinsic; use crate::types::Types; use crate::Failure; -use crate::{externs, Core}; -use dep_inference::{javascript, python}; -use protos::gen::pants::cache::{ - dependency_inference_request, CacheKey, CacheKeyType, DependencyInferenceRequest, -}; -use bytes::Bytes; -use dep_inference::javascript::ParsedJavascriptDependencies; -use dep_inference::python::ParsedPythonDependencies; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; @@ -35,8 +27,7 @@ use pyo3::{IntoPy, PyAny, PyRef, Python, ToPyObject}; use tokio::process; use fs::{ - DigestTrie, DirectoryDigest, Entry, GlobMatching, PathStat, RelativePath, SymlinkBehavior, - TypedPath, + DigestTrie, DirectoryDigest, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, }; use hashing::{Digest, EMPTY_DIGEST}; use process_execution::local::{ @@ -45,15 +36,15 @@ use process_execution::local::{ use process_execution::{ManagedChild, ProcessExecutionStrategy}; use rule_graph::{DependencyKey, RuleId}; use stdio::TryCloneAsFile; -use store::{SnapshotOps, Store, SubsetParams}; +use store::{SnapshotOps, SubsetParams}; -use crate::externs::dep_inference::PyNativeDependenciesRequest; use workunit_store::{in_workunit, Level}; -use grpc_util::prost::MessageExt; - +// Sub-modules with intrinsic implementations. +mod dep_inference; mod docker; +use self::dep_inference::{parse_javascript_deps, parse_python_deps}; use self::docker::docker_resolve_image; type IntrinsicFn = @@ -794,225 +785,3 @@ fn interactive_process( } ).boxed() } - -struct PreparedInferenceRequest { - digest: Digest, - /// The request that's guaranteed to have been constructed via ::prepare(). - /// - /// NB. this `inner` value is used as the cache key, so anything that can influence the dep - /// inference should (also) be inside it, not just a key on the outer struct - inner: DependencyInferenceRequest, -} - -impl PreparedInferenceRequest { - pub async fn prepare( - args: Vec, - store: &Store, - backend: &str, - impl_hash: &str, - ) -> NodeResult { - let PyNativeDependenciesRequest { - directory_digest, - metadata, - } = Python::with_gil(|py| (*args[0]).as_ref(py).extract())?; - - let (path, digest) = Self::find_one_file(directory_digest, store, backend).await?; - let str_path = path.display().to_string(); - - Ok(Self { - digest, - inner: DependencyInferenceRequest { - input_file_path: str_path, - input_file_digest: Some(digest.into()), - metadata, - impl_hash: impl_hash.to_string(), - }, - }) - } - - pub async fn read_digest(&self, store: &Store) -> NodeResult { - let bytes = store - .load_file_bytes_with(self.digest, |bytes| Vec::from(bytes)) - .await?; - - Ok(String::from_utf8(bytes) - .map_err(|err| format!("Failed to convert digest bytes to utf-8: {err}"))?) - } - - async fn find_one_file( - directory_digest: DirectoryDigest, - store: &Store, - backend: &str, - ) -> NodeResult<(PathBuf, Digest)> { - let mut path = None; - let mut digest = None; - store - .load_digest_trie(directory_digest.clone()) - .await? - .walk(SymlinkBehavior::Oblivious, &mut |node_path, entry| { - if let Entry::File(file) = entry { - path = Some(node_path.to_owned()); - digest = Some(file.digest()); - } - }); - if digest.is_none() || path.is_none() { - Err(format!( - "Couldn't find a file in digest for {backend} inference: {directory_digest:?}" - ))? - } - let path = path.unwrap(); - let digest = digest.unwrap(); - Ok((path, digest)) - } - - fn cache_key(&self) -> CacheKey { - CacheKey { - key_type: CacheKeyType::DepInferenceRequest.into(), - digest: Some(Digest::of_bytes(&self.inner.to_bytes()).into()), - } - } -} - -fn parse_python_deps(context: Context, args: Vec) -> BoxFuture<'static, NodeResult> { - async move { - let core = &context.core; - let store = core.store(); - let prepared_inference_request = - PreparedInferenceRequest::prepare(args, &store, "Python", python::IMPL_HASH).await?; - in_workunit!( - "parse_python_dependencies", - Level::Debug, - desc = Some(format!( - "Determine Python dependencies for {:?}", - &prepared_inference_request.inner.input_file_path - )), - |_workunit| async move { - let result: ParsedPythonDependencies = get_or_create_inferred_dependencies( - core, - &store, - prepared_inference_request, - |content, request| { - python::get_dependencies(content, request.inner.input_file_path.into()) - }, - ) - .await?; - - let result = Python::with_gil(|py| { - externs::unsafe_call( - py, - core.types.parsed_python_deps_result, - &[ - result.imports.to_object(py).into(), - result.string_candidates.to_object(py).into(), - ], - ) - }); - - Ok(result) - } - ) - .await - } - .boxed() -} - -fn parse_javascript_deps( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let core = &context.core; - let store = core.store(); - let prepared_inference_request = - PreparedInferenceRequest::prepare(args, &store, "Javascript", javascript::IMPL_HASH) - .await?; - - in_workunit!( - "parse_javascript_dependencies", - Level::Debug, - desc = Some(format!( - "Determine Javascript dependencies for {:?}", - prepared_inference_request.inner.input_file_path - )), - |_workunit| async move { - let result: ParsedJavascriptDependencies = get_or_create_inferred_dependencies( - core, - &store, - prepared_inference_request, - |content, request| { - if let Some(dependency_inference_request::Metadata::Js(metadata)) = - request.inner.metadata - { - javascript::get_dependencies( - content, - request.inner.input_file_path.into(), - metadata, - ) - } else { - Err(format!( - "{:?} is not valid metadata for Javascript dependency inference", - request.inner.metadata - )) - } - }, - ) - .await?; - - let result = Python::with_gil(|py| { - externs::unsafe_call( - py, - core.types.parsed_javascript_deps_result, - &[ - result.file_imports.to_object(py).into(), - result.package_imports.to_object(py).into(), - ], - ) - }); - - Ok(result) - } - ) - .await - } - .boxed() -} - -async fn get_or_create_inferred_dependencies( - core: &Arc, - store: &Store, - request: PreparedInferenceRequest, - dependencies_parser: F, -) -> NodeResult -where - T: serde::de::DeserializeOwned + serde::Serialize, - F: Fn(&str, PreparedInferenceRequest) -> Result, -{ - let cache_key = request.cache_key(); - let result = - if let Some(result) = lookup_inferred_dependencies(&cache_key, core).await? { - result - } else { - let contents = request.read_digest(store).await?; - let result = dependencies_parser(&contents, request)?; - core.local_cache - .store( - &cache_key, - Bytes::from(serde_json::to_string(&result).map_err(|e| { - format!("Failed to serialize dep inference cache result: {e}") - })?), - ) - .await?; - result - }; - Ok(result) -} - -async fn lookup_inferred_dependencies( - key: &CacheKey, - core: &Arc, -) -> NodeResult> { - let cached_result = core.local_cache.load(key).await?; - Ok(cached_result - .and_then(|bytes| serde_json::from_slice(&bytes).ok()) - .flatten()) -} From 29d8af3c6590929d75b9bcd688c571dbe758e734 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 21:21:12 -0400 Subject: [PATCH 4/9] refactor interactive_process intrinsic --- .../src/intrinsics/interactive_process.rs | 195 ++++++++++++++++++ src/rust/engine/src/intrinsics/mod.rs | 194 +---------------- 2 files changed, 201 insertions(+), 188 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/interactive_process.rs diff --git a/src/rust/engine/src/intrinsics/interactive_process.rs b/src/rust/engine/src/intrinsics/interactive_process.rs new file mode 100644 index 00000000000..449a1fc4863 --- /dev/null +++ b/src/rust/engine/src/intrinsics/interactive_process.rs @@ -0,0 +1,195 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::env::current_dir; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::str::FromStr; + +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use process_execution::local::{ + apply_chroot, create_sandbox, prepare_workdir, setup_run_sh_script, KeepSandboxes, +}; +use process_execution::{ManagedChild, ProcessExecutionStrategy}; +use pyo3::{PyAny, Python, ToPyObject}; +use stdio::TryCloneAsFile; +use tokio::process; +use workunit_store::{in_workunit, Level}; + +use crate::context::Context; +use crate::externs; +use crate::nodes::{task_side_effected, ExecuteProcess, NodeResult}; +use crate::python::Value; + +pub(crate) fn interactive_process( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + in_workunit!( + "interactive_process", + Level::Debug, + |_workunit| async move { + let types = &context.core.types; + let interactive_process_result = types.interactive_process_result; + + let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessExecutionEnvironment) = Python::with_gil(|py| { + let py_interactive_process = (*args[0]).as_ref(py); + let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap(); + let process_config = (*args[1]) + .as_ref(py) + .extract() + .unwrap(); + (py_interactive_process.extract().unwrap(), py_process, process_config) + }); + match process_config.environment.strategy { + ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => { + // TODO: #17182 covers adding support for running processes interactively in Docker. + Err( + format!( + "Only local environments support running processes \ + interactively, but a {} environment was used.", + process_config.environment.strategy.strategy_type(), + ) + ) + }, + _ => Ok(()) + }?; + let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process; + let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| { + let py_interactive_process_obj = py_interactive_process.to_object(py); + let py_interactive_process = py_interactive_process_obj.as_ref(py); + let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); + let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); + let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap(); + let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap(); + (run_in_workspace, restartable, keep_sandboxes) + }); + + let session = context.session.clone(); + + let mut tempdir = create_sandbox( + context.core.executor.clone(), + &context.core.local_execution_root_dir, + "interactive process", + keep_sandboxes, + )?; + prepare_workdir( + tempdir.path().to_owned(), + &context.core.local_execution_root_dir, + &process, + process.input_digests.inputs.clone(), + &context.core.store(), + &context.core.named_caches, + &context.core.immutable_inputs, + None, + None, + ) + .await?; + apply_chroot(tempdir.path().to_str().unwrap(), &mut process); + + let p = Path::new(&process.argv[0]); + // TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args + // instead. + let program_name = if !run_in_workspace && p.is_relative() { + let mut buf = PathBuf::new(); + buf.push(tempdir.path()); + buf.push(p); + buf + } else { + p.to_path_buf() + }; + + let mut command = process::Command::new(program_name); + if !run_in_workspace { + command.current_dir(tempdir.path()); + } + for arg in process.argv[1..].iter() { + command.arg(arg); + } + + command.env_clear(); + command.envs(&process.env); + + if !restartable { + task_side_effected()?; + } + + let exit_status = session.clone() + .with_console_ui_disabled(async move { + // Once any UI is torn down, grab exclusive access to the console. + let (term_stdin, term_stdout, term_stderr) = + stdio::get_destination().exclusive_start(Box::new(|_| { + // A stdio handler that will immediately trigger logging. + Err(()) + }))?; + // NB: Command's stdio methods take ownership of a file-like to use, so we use + // `TryCloneAsFile` here to `dup` our thread-local stdio. + command + .stdin(Stdio::from( + term_stdin + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdin: {e}"))?, + )) + .stdout(Stdio::from( + term_stdout + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdout: {e}"))?, + )) + .stderr(Stdio::from( + term_stderr + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stderr: {e}"))?, + )); + let mut subprocess = + ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout)) + .map_err(|e| format!("Error executing interactive process: {e}"))?; + tokio::select! { + _ = session.cancelled() => { + // The Session was cancelled: attempt to kill the process group / process, and + // then wait for it to exit (to avoid zombies). + if let Err(e) = subprocess.attempt_shutdown_sync() { + // Failed to kill the PGID: try the non-group form. + log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ + This is unexpected: please file an issue about this problem at \ + [https://github.com/pantsbuild/pants/issues/new]", e); + subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?; + }; + subprocess.wait().await.map_err(|e| e.to_string()) + } + exit_status = subprocess.wait() => { + // The process exited. + exit_status.map_err(|e| e.to_string()) + } + } + }) + .await?; + + let code = exit_status.code().unwrap_or(-1); + if keep_sandboxes == KeepSandboxes::Always + || keep_sandboxes == KeepSandboxes::OnFailure && code != 0 { + tempdir.keep("interactive process"); + let do_setup_run_sh_script = |workdir_path| -> Result<(), String> { + setup_run_sh_script(tempdir.path(), &process.env, &process.working_directory, &process.argv, workdir_path) + }; + if run_in_workspace { + let cwd = current_dir() + .map_err(|e| format!("Could not detect current working directory: {e}"))?; + do_setup_run_sh_script(cwd.as_path())?; + } else { + do_setup_run_sh_script(tempdir.path())?; + } + } + + Ok( + Python::with_gil(|py| { + externs::unsafe_call( + py, + interactive_process_result, + &[externs::store_i64(py, i64::from(code))], + ) + + }) + ) + } + ).boxed() +} diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index 1f867341f36..00b93201a43 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -2,18 +2,15 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::HashMap; -use std::env::current_dir; -use std::path::{Path, PathBuf}; -use std::process::Stdio; -use std::str::FromStr; +use std::path::PathBuf; use std::time::Duration; use crate::context::Context; use crate::externs; use crate::externs::fs::{PyAddPrefix, PyFileDigest, PyMergeDigests, PyRemovePrefix}; use crate::nodes::{ - lift_directory_digest, task_side_effected, unmatched_globs_additional_context, DownloadedFile, - ExecuteProcess, NodeResult, RunId, SessionValues, Snapshot, + lift_directory_digest, unmatched_globs_additional_context, DownloadedFile, ExecuteProcess, + NodeResult, RunId, SessionValues, Snapshot, }; use crate::python::{throw, Key, Value}; use crate::tasks::Intrinsic; @@ -23,29 +20,23 @@ use crate::Failure; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; -use pyo3::{IntoPy, PyAny, PyRef, Python, ToPyObject}; -use tokio::process; +use pyo3::{IntoPy, PyRef, Python}; use fs::{ DigestTrie, DirectoryDigest, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, }; use hashing::{Digest, EMPTY_DIGEST}; -use process_execution::local::{ - apply_chroot, create_sandbox, prepare_workdir, setup_run_sh_script, KeepSandboxes, -}; -use process_execution::{ManagedChild, ProcessExecutionStrategy}; use rule_graph::{DependencyKey, RuleId}; -use stdio::TryCloneAsFile; use store::{SnapshotOps, SubsetParams}; -use workunit_store::{in_workunit, Level}; - // Sub-modules with intrinsic implementations. mod dep_inference; mod docker; +mod interactive_process; use self::dep_inference::{parse_javascript_deps, parse_python_deps}; use self::docker::docker_resolve_image; +use self::interactive_process::interactive_process; type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -612,176 +603,3 @@ fn session_values(context: Context, _args: Vec) -> BoxFuture<'static, Nod fn run_id(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { async move { context.get(RunId).await }.boxed() } - -fn interactive_process( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - in_workunit!( - "interactive_process", - Level::Debug, - |_workunit| async move { - let types = &context.core.types; - let interactive_process_result = types.interactive_process_result; - - let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessExecutionEnvironment) = Python::with_gil(|py| { - let py_interactive_process = (*args[0]).as_ref(py); - let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap(); - let process_config = (*args[1]) - .as_ref(py) - .extract() - .unwrap(); - (py_interactive_process.extract().unwrap(), py_process, process_config) - }); - match process_config.environment.strategy { - ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => { - // TODO: #17182 covers adding support for running processes interactively in Docker. - Err( - format!( - "Only local environments support running processes \ - interactively, but a {} environment was used.", - process_config.environment.strategy.strategy_type(), - ) - ) - }, - _ => Ok(()) - }?; - let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process; - let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| { - let py_interactive_process_obj = py_interactive_process.to_object(py); - let py_interactive_process = py_interactive_process_obj.as_ref(py); - let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); - let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); - let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap(); - let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap(); - (run_in_workspace, restartable, keep_sandboxes) - }); - - let session = context.session.clone(); - - let mut tempdir = create_sandbox( - context.core.executor.clone(), - &context.core.local_execution_root_dir, - "interactive process", - keep_sandboxes, - )?; - prepare_workdir( - tempdir.path().to_owned(), - &context.core.local_execution_root_dir, - &process, - process.input_digests.inputs.clone(), - &context.core.store(), - &context.core.named_caches, - &context.core.immutable_inputs, - None, - None, - ) - .await?; - apply_chroot(tempdir.path().to_str().unwrap(), &mut process); - - let p = Path::new(&process.argv[0]); - // TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args - // instead. - let program_name = if !run_in_workspace && p.is_relative() { - let mut buf = PathBuf::new(); - buf.push(tempdir.path()); - buf.push(p); - buf - } else { - p.to_path_buf() - }; - - let mut command = process::Command::new(program_name); - if !run_in_workspace { - command.current_dir(tempdir.path()); - } - for arg in process.argv[1..].iter() { - command.arg(arg); - } - - command.env_clear(); - command.envs(&process.env); - - if !restartable { - task_side_effected()?; - } - - let exit_status = session.clone() - .with_console_ui_disabled(async move { - // Once any UI is torn down, grab exclusive access to the console. - let (term_stdin, term_stdout, term_stderr) = - stdio::get_destination().exclusive_start(Box::new(|_| { - // A stdio handler that will immediately trigger logging. - Err(()) - }))?; - // NB: Command's stdio methods take ownership of a file-like to use, so we use - // `TryCloneAsFile` here to `dup` our thread-local stdio. - command - .stdin(Stdio::from( - term_stdin - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdin: {e}"))?, - )) - .stdout(Stdio::from( - term_stdout - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdout: {e}"))?, - )) - .stderr(Stdio::from( - term_stderr - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stderr: {e}"))?, - )); - let mut subprocess = - ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout)) - .map_err(|e| format!("Error executing interactive process: {e}"))?; - tokio::select! { - _ = session.cancelled() => { - // The Session was cancelled: attempt to kill the process group / process, and - // then wait for it to exit (to avoid zombies). - if let Err(e) = subprocess.attempt_shutdown_sync() { - // Failed to kill the PGID: try the non-group form. - log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ - This is unexpected: please file an issue about this problem at \ - [https://github.com/pantsbuild/pants/issues/new]", e); - subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?; - }; - subprocess.wait().await.map_err(|e| e.to_string()) - } - exit_status = subprocess.wait() => { - // The process exited. - exit_status.map_err(|e| e.to_string()) - } - } - }) - .await?; - - let code = exit_status.code().unwrap_or(-1); - if keep_sandboxes == KeepSandboxes::Always - || keep_sandboxes == KeepSandboxes::OnFailure && code != 0 { - tempdir.keep("interactive process"); - let do_setup_run_sh_script = |workdir_path| -> Result<(), String> { - setup_run_sh_script(tempdir.path(), &process.env, &process.working_directory, &process.argv, workdir_path) - }; - if run_in_workspace { - let cwd = current_dir() - .map_err(|e| format!("Could not detect current working directory: {e}"))?; - do_setup_run_sh_script(cwd.as_path())?; - } else { - do_setup_run_sh_script(tempdir.path())?; - } - } - - Ok( - Python::with_gil(|py| { - externs::unsafe_call( - py, - interactive_process_result, - &[externs::store_i64(py, i64::from(code))], - ) - - }) - ) - } - ).boxed() -} From 0e97b3d4c32d394c15d554215db6893dd427451b Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 21:46:40 -0400 Subject: [PATCH 5/9] refactor digest-related intrinstics --- src/rust/engine/src/intrinsics/digests.rs | 341 ++++++++++++++++++++++ src/rust/engine/src/intrinsics/mod.rs | 341 +--------------------- 2 files changed, 351 insertions(+), 331 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/digests.rs diff --git a/src/rust/engine/src/intrinsics/digests.rs b/src/rust/engine/src/intrinsics/digests.rs new file mode 100644 index 00000000000..9cac1401da9 --- /dev/null +++ b/src/rust/engine/src/intrinsics/digests.rs @@ -0,0 +1,341 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::collections::HashMap; +use std::path::PathBuf; + +use fs::{ + DigestTrie, DirectoryDigest, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, +}; +use futures::future::{BoxFuture, FutureExt}; +use hashing::{Digest, EMPTY_DIGEST}; +use pyo3::{PyRef, Python}; +use store::{SnapshotOps, SubsetParams}; + +use crate::context::Context; +use crate::externs; +use crate::externs::fs::{PyAddPrefix, PyFileDigest, PyMergeDigests, PyRemovePrefix}; +use crate::nodes::{ + lift_directory_digest, unmatched_globs_additional_context, DownloadedFile, NodeResult, Snapshot, +}; +use crate::python::{throw, Key, Value}; +use crate::Failure; + +pub(crate) fn directory_digest_to_digest_contents( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + })?; + + let digest_contents = context.core.store().contents_for_directory(digest).await?; + + Ok(Python::with_gil(|py| { + Snapshot::store_digest_contents(py, &context, &digest_contents) + })?) + } + .boxed() +} + +pub(crate) fn directory_digest_to_digest_entries( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + })?; + let digest_entries = context.core.store().entries_for_directory(digest).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_digest_entries(py, &context, &digest_entries) + })?) + } + .boxed() +} + +pub(crate) fn remove_prefix_request_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let (digest, prefix) = Python::with_gil(|py| { + let py_remove_prefix = (*args[0]) + .as_ref(py) + .extract::>() + .map_err(|e| throw(format!("{e}")))?; + let prefix = RelativePath::new(&py_remove_prefix.prefix) + .map_err(|e| throw(format!("The `prefix` must be relative: {e}")))?; + let res: NodeResult<_> = Ok((py_remove_prefix.digest.clone(), prefix)); + res + })?; + let digest = context.core.store().strip_prefix(digest, &prefix).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, digest) + })?) + } + .boxed() +} + +pub(crate) fn add_prefix_request_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let (digest, prefix) = Python::with_gil(|py| { + let py_add_prefix = (*args[0]) + .as_ref(py) + .extract::>() + .map_err(|e| throw(format!("{e}")))?; + let prefix = RelativePath::new(&py_add_prefix.prefix) + .map_err(|e| throw(format!("The `prefix` must be relative: {e}")))?; + let res: NodeResult<(DirectoryDigest, RelativePath)> = + Ok((py_add_prefix.digest.clone(), prefix)); + res + })?; + let digest = context.core.store().add_prefix(digest, &prefix).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, digest) + })?) + } + .boxed() +} + +pub(crate) fn digest_to_snapshot( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + let store = context.core.store(); + async move { + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + })?; + let snapshot = store::Snapshot::from_digest(store, digest).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_snapshot(py, snapshot) + })?) + } + .boxed() +} + +pub(crate) fn merge_digests_request_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + let core = &context.core; + let store = core.store(); + async move { + let digests = Python::with_gil(|py| { + (*args[0]) + .as_ref(py) + .extract::>() + .map(|py_merge_digests| py_merge_digests.0.clone()) + .map_err(|e| throw(format!("{e}"))) + })?; + let digest = store.merge(digests).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, digest) + })?) + } + .boxed() +} + +pub(crate) fn download_file_to_digest( + context: Context, + mut args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from)?; + let snapshot = context.get(DownloadedFile(key)).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, snapshot.into()) + })?) + } + .boxed() +} + +pub(crate) fn path_globs_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let path_globs = Python::with_gil(|py| { + let py_path_globs = (*args[0]).as_ref(py); + Snapshot::lift_path_globs(py_path_globs) + }) + .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; + let snapshot = context.get(Snapshot::from_path_globs(path_globs)).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, snapshot.into()) + })?) + } + .boxed() +} + +pub(crate) fn path_globs_to_paths( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + let core = context.core.clone(); + async move { + let path_globs = Python::with_gil(|py| { + let py_path_globs = (*args[0]).as_ref(py); + Snapshot::lift_path_globs(py_path_globs) + }) + .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; + + let path_globs = path_globs.parse().map_err(throw)?; + let path_stats = context + .expand_globs( + path_globs, + SymlinkBehavior::Oblivious, + unmatched_globs_additional_context(), + ) + .await?; + + Python::with_gil(|py| { + let mut files = Vec::new(); + let mut dirs = Vec::new(); + for ps in path_stats.iter() { + match ps { + PathStat::File { path, .. } => { + files.push(Snapshot::store_path(py, path)?); + } + PathStat::Link { path, .. } => { + panic!("Paths shouldn't be symlink-aware {path:?}"); + } + PathStat::Dir { path, .. } => { + dirs.push(Snapshot::store_path(py, path)?); + } + } + } + Ok(externs::unsafe_call( + py, + core.types.paths, + &[ + externs::store_tuple(py, files), + externs::store_tuple(py, dirs), + ], + )) + }) + } + .boxed() +} + +enum CreateDigestItem { + FileContent(RelativePath, bytes::Bytes, bool), + FileEntry(RelativePath, Digest, bool), + SymlinkEntry(RelativePath, PathBuf), + Dir(RelativePath), +} + +pub(crate) fn create_digest_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + let mut new_file_count = 0; + + let items: Vec = { + Python::with_gil(|py| { + let py_create_digest = (*args[0]).as_ref(py); + externs::collect_iterable(py_create_digest) + .unwrap() + .into_iter() + .map(|obj| { + let raw_path: String = externs::getattr(obj, "path").unwrap(); + let path = RelativePath::new(PathBuf::from(raw_path)).unwrap(); + if obj.hasattr("content").unwrap() { + let bytes = bytes::Bytes::from( + externs::getattr::>(obj, "content").unwrap(), + ); + let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); + new_file_count += 1; + CreateDigestItem::FileContent(path, bytes, is_executable) + } else if obj.hasattr("file_digest").unwrap() { + let py_file_digest: PyFileDigest = + externs::getattr(obj, "file_digest").unwrap(); + let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); + CreateDigestItem::FileEntry(path, py_file_digest.0, is_executable) + } else if obj.hasattr("target").unwrap() { + let target: String = externs::getattr(obj, "target").unwrap(); + CreateDigestItem::SymlinkEntry(path, PathBuf::from(target)) + } else { + CreateDigestItem::Dir(path) + } + }) + .collect() + }) + }; + + let mut typed_paths: Vec = Vec::with_capacity(items.len()); + let mut file_digests: HashMap = HashMap::with_capacity(items.len()); + let mut items_to_store = Vec::with_capacity(new_file_count); + + for item in &items { + match item { + CreateDigestItem::FileContent(path, bytes, is_executable) => { + let digest = Digest::of_bytes(bytes); + items_to_store.push((digest.hash, bytes.clone())); + typed_paths.push(TypedPath::File { + path, + is_executable: *is_executable, + }); + file_digests.insert(path.to_path_buf(), digest); + } + CreateDigestItem::FileEntry(path, digest, is_executable) => { + typed_paths.push(TypedPath::File { + path, + is_executable: *is_executable, + }); + file_digests.insert(path.to_path_buf(), *digest); + } + CreateDigestItem::SymlinkEntry(path, target) => { + typed_paths.push(TypedPath::Link { path, target }); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } + CreateDigestItem::Dir(path) => { + typed_paths.push(TypedPath::Dir(path)); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } + } + } + + let store = context.core.store(); + let trie = DigestTrie::from_unique_paths(typed_paths, &file_digests).unwrap(); + async move { + store.store_file_bytes_batch(items_to_store, true).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, trie.into()) + })?) + } + .boxed() +} + +pub(crate) fn digest_subset_to_digest( + context: Context, + args: Vec, +) -> BoxFuture<'static, NodeResult> { + let store = context.core.store(); + async move { + let (path_globs, original_digest) = Python::with_gil(|py| { + let py_digest_subset = (*args[0]).as_ref(py); + let py_path_globs = externs::getattr(py_digest_subset, "globs").unwrap(); + let py_digest = externs::getattr(py_digest_subset, "digest").unwrap(); + let res: NodeResult<_> = Ok(( + Snapshot::lift_prepared_path_globs(py_path_globs)?, + lift_directory_digest(py_digest)?, + )); + res + })?; + let subset_params = SubsetParams { globs: path_globs }; + let digest = store.subset(original_digest, subset_params).await?; + Ok(Python::with_gil(|py| { + Snapshot::store_directory_digest(py, digest) + })?) + } + .boxed() +} diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index 00b93201a43..293058c6903 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -1,40 +1,35 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::HashMap; -use std::path::PathBuf; use std::time::Duration; use crate::context::Context; use crate::externs; -use crate::externs::fs::{PyAddPrefix, PyFileDigest, PyMergeDigests, PyRemovePrefix}; -use crate::nodes::{ - lift_directory_digest, unmatched_globs_additional_context, DownloadedFile, ExecuteProcess, - NodeResult, RunId, SessionValues, Snapshot, -}; -use crate::python::{throw, Key, Value}; +use crate::nodes::{ExecuteProcess, NodeResult, RunId, SessionValues, Snapshot}; +use crate::python::Value; use crate::tasks::Intrinsic; use crate::types::Types; -use crate::Failure; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; -use pyo3::{IntoPy, PyRef, Python}; +use pyo3::{IntoPy, Python}; -use fs::{ - DigestTrie, DirectoryDigest, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, -}; -use hashing::{Digest, EMPTY_DIGEST}; use rule_graph::{DependencyKey, RuleId}; -use store::{SnapshotOps, SubsetParams}; // Sub-modules with intrinsic implementations. mod dep_inference; +mod digests; mod docker; mod interactive_process; use self::dep_inference::{parse_javascript_deps, parse_python_deps}; +use self::digests::{ + add_prefix_request_to_digest, create_digest_to_digest, digest_subset_to_digest, + digest_to_snapshot, directory_digest_to_digest_contents, directory_digest_to_digest_entries, + download_file_to_digest, merge_digests_request_to_digest, path_globs_to_digest, + path_globs_to_paths, remove_prefix_request_to_digest, +}; use self::docker::docker_resolve_image; use self::interactive_process::interactive_process; @@ -280,322 +275,6 @@ fn process_request_to_process_result( .boxed() } -fn directory_digest_to_digest_contents( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); - lift_directory_digest(py_digest) - })?; - - let digest_contents = context.core.store().contents_for_directory(digest).await?; - - Ok(Python::with_gil(|py| { - Snapshot::store_digest_contents(py, &context, &digest_contents) - })?) - } - .boxed() -} - -fn directory_digest_to_digest_entries( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); - lift_directory_digest(py_digest) - })?; - let digest_entries = context.core.store().entries_for_directory(digest).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_digest_entries(py, &context, &digest_entries) - })?) - } - .boxed() -} - -fn remove_prefix_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let (digest, prefix) = Python::with_gil(|py| { - let py_remove_prefix = (*args[0]) - .as_ref(py) - .extract::>() - .map_err(|e| throw(format!("{e}")))?; - let prefix = RelativePath::new(&py_remove_prefix.prefix) - .map_err(|e| throw(format!("The `prefix` must be relative: {e}")))?; - let res: NodeResult<_> = Ok((py_remove_prefix.digest.clone(), prefix)); - res - })?; - let digest = context.core.store().strip_prefix(digest, &prefix).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, digest) - })?) - } - .boxed() -} - -fn add_prefix_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let (digest, prefix) = Python::with_gil(|py| { - let py_add_prefix = (*args[0]) - .as_ref(py) - .extract::>() - .map_err(|e| throw(format!("{e}")))?; - let prefix = RelativePath::new(&py_add_prefix.prefix) - .map_err(|e| throw(format!("The `prefix` must be relative: {e}")))?; - let res: NodeResult<(DirectoryDigest, RelativePath)> = - Ok((py_add_prefix.digest.clone(), prefix)); - res - })?; - let digest = context.core.store().add_prefix(digest, &prefix).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, digest) - })?) - } - .boxed() -} - -fn digest_to_snapshot(context: Context, args: Vec) -> BoxFuture<'static, NodeResult> { - let store = context.core.store(); - async move { - let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); - lift_directory_digest(py_digest) - })?; - let snapshot = store::Snapshot::from_digest(store, digest).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_snapshot(py, snapshot) - })?) - } - .boxed() -} - -fn merge_digests_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let core = &context.core; - let store = core.store(); - async move { - let digests = Python::with_gil(|py| { - (*args[0]) - .as_ref(py) - .extract::>() - .map(|py_merge_digests| py_merge_digests.0.clone()) - .map_err(|e| throw(format!("{e}"))) - })?; - let digest = store.merge(digests).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, digest) - })?) - } - .boxed() -} - -fn download_file_to_digest( - context: Context, - mut args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from)?; - let snapshot = context.get(DownloadedFile(key)).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, snapshot.into()) - })?) - } - .boxed() -} - -fn path_globs_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let path_globs = Python::with_gil(|py| { - let py_path_globs = (*args[0]).as_ref(py); - Snapshot::lift_path_globs(py_path_globs) - }) - .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; - let snapshot = context.get(Snapshot::from_path_globs(path_globs)).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, snapshot.into()) - })?) - } - .boxed() -} - -fn path_globs_to_paths( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let core = context.core.clone(); - async move { - let path_globs = Python::with_gil(|py| { - let py_path_globs = (*args[0]).as_ref(py); - Snapshot::lift_path_globs(py_path_globs) - }) - .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; - - let path_globs = path_globs.parse().map_err(throw)?; - let path_stats = context - .expand_globs( - path_globs, - SymlinkBehavior::Oblivious, - unmatched_globs_additional_context(), - ) - .await?; - - Python::with_gil(|py| { - let mut files = Vec::new(); - let mut dirs = Vec::new(); - for ps in path_stats.iter() { - match ps { - PathStat::File { path, .. } => { - files.push(Snapshot::store_path(py, path)?); - } - PathStat::Link { path, .. } => { - panic!("Paths shouldn't be symlink-aware {path:?}"); - } - PathStat::Dir { path, .. } => { - dirs.push(Snapshot::store_path(py, path)?); - } - } - } - Ok(externs::unsafe_call( - py, - core.types.paths, - &[ - externs::store_tuple(py, files), - externs::store_tuple(py, dirs), - ], - )) - }) - } - .boxed() -} - -enum CreateDigestItem { - FileContent(RelativePath, bytes::Bytes, bool), - FileEntry(RelativePath, Digest, bool), - SymlinkEntry(RelativePath, PathBuf), - Dir(RelativePath), -} - -fn create_digest_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let mut new_file_count = 0; - - let items: Vec = { - Python::with_gil(|py| { - let py_create_digest = (*args[0]).as_ref(py); - externs::collect_iterable(py_create_digest) - .unwrap() - .into_iter() - .map(|obj| { - let raw_path: String = externs::getattr(obj, "path").unwrap(); - let path = RelativePath::new(PathBuf::from(raw_path)).unwrap(); - if obj.hasattr("content").unwrap() { - let bytes = bytes::Bytes::from( - externs::getattr::>(obj, "content").unwrap(), - ); - let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); - new_file_count += 1; - CreateDigestItem::FileContent(path, bytes, is_executable) - } else if obj.hasattr("file_digest").unwrap() { - let py_file_digest: PyFileDigest = - externs::getattr(obj, "file_digest").unwrap(); - let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); - CreateDigestItem::FileEntry(path, py_file_digest.0, is_executable) - } else if obj.hasattr("target").unwrap() { - let target: String = externs::getattr(obj, "target").unwrap(); - CreateDigestItem::SymlinkEntry(path, PathBuf::from(target)) - } else { - CreateDigestItem::Dir(path) - } - }) - .collect() - }) - }; - - let mut typed_paths: Vec = Vec::with_capacity(items.len()); - let mut file_digests: HashMap = HashMap::with_capacity(items.len()); - let mut items_to_store = Vec::with_capacity(new_file_count); - - for item in &items { - match item { - CreateDigestItem::FileContent(path, bytes, is_executable) => { - let digest = Digest::of_bytes(bytes); - items_to_store.push((digest.hash, bytes.clone())); - typed_paths.push(TypedPath::File { - path, - is_executable: *is_executable, - }); - file_digests.insert(path.to_path_buf(), digest); - } - CreateDigestItem::FileEntry(path, digest, is_executable) => { - typed_paths.push(TypedPath::File { - path, - is_executable: *is_executable, - }); - file_digests.insert(path.to_path_buf(), *digest); - } - CreateDigestItem::SymlinkEntry(path, target) => { - typed_paths.push(TypedPath::Link { path, target }); - file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); - } - CreateDigestItem::Dir(path) => { - typed_paths.push(TypedPath::Dir(path)); - file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); - } - } - } - - let store = context.core.store(); - let trie = DigestTrie::from_unique_paths(typed_paths, &file_digests).unwrap(); - async move { - store.store_file_bytes_batch(items_to_store, true).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, trie.into()) - })?) - } - .boxed() -} - -fn digest_subset_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let store = context.core.store(); - async move { - let (path_globs, original_digest) = Python::with_gil(|py| { - let py_digest_subset = (*args[0]).as_ref(py); - let py_path_globs = externs::getattr(py_digest_subset, "globs").unwrap(); - let py_digest = externs::getattr(py_digest_subset, "digest").unwrap(); - let res: NodeResult<_> = Ok(( - Snapshot::lift_prepared_path_globs(py_path_globs)?, - lift_directory_digest(py_digest)?, - )); - res - })?; - let subset_params = SubsetParams { globs: path_globs }; - let digest = store.subset(original_digest, subset_params).await?; - Ok(Python::with_gil(|py| { - Snapshot::store_directory_digest(py, digest) - })?) - } - .boxed() -} - fn session_values(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { async move { context.get(SessionValues).await }.boxed() } From 1315ea187d7ed4b6bb2f35ba4102a6d492b91adf Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 21:55:09 -0400 Subject: [PATCH 6/9] refactor process intrinsic --- src/rust/engine/src/intrinsics/mod.rs | 85 ++--------------------- src/rust/engine/src/intrinsics/process.rs | 82 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 79 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/process.rs diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index 293058c6903..6d5c75b6267 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -1,27 +1,22 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::time::Duration; +use futures::future::{BoxFuture, FutureExt}; +use indexmap::IndexMap; +use rule_graph::{DependencyKey, RuleId}; use crate::context::Context; -use crate::externs; -use crate::nodes::{ExecuteProcess, NodeResult, RunId, SessionValues, Snapshot}; +use crate::nodes::{NodeResult, RunId, SessionValues}; use crate::python::Value; use crate::tasks::Intrinsic; use crate::types::Types; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use futures::try_join; -use indexmap::IndexMap; -use pyo3::{IntoPy, Python}; - -use rule_graph::{DependencyKey, RuleId}; - // Sub-modules with intrinsic implementations. mod dep_inference; mod digests; mod docker; mod interactive_process; +mod process; use self::dep_inference::{parse_javascript_deps, parse_python_deps}; use self::digests::{ @@ -32,6 +27,7 @@ use self::digests::{ }; use self::docker::docker_resolve_image; use self::interactive_process::interactive_process; +use self::process::process_request_to_process_result; type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -206,75 +202,6 @@ impl Intrinsics { } } -fn process_request_to_process_result( - context: Context, - mut args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let process_config: externs::process::PyProcessExecutionEnvironment = - Python::with_gil(|py| { - args.pop() - .unwrap() - .as_ref() - .extract(py) - .map_err(|e| format!("{e}")) - })?; - let process_request = - ExecuteProcess::lift(&context.core.store(), args.pop().unwrap(), process_config) - .map_err(|e| e.enrich("Error lifting Process")) - .await?; - - let result = context.get(process_request).await?.result; - - let store = context.core.store(); - let (stdout_bytes, stderr_bytes) = try_join!( - store - .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) - .map_err(|e| e.enrich("Bytes from stdout")), - store - .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) - .map_err(|e| e.enrich("Bytes from stderr")) - )?; - - Python::with_gil(|py| -> NodeResult { - Ok(externs::unsafe_call( - py, - context.core.types.process_result, - &[ - externs::store_bytes(py, &stdout_bytes), - Snapshot::store_file_digest(py, result.stdout_digest)?, - externs::store_bytes(py, &stderr_bytes), - Snapshot::store_file_digest(py, result.stderr_digest)?, - externs::store_i64(py, result.exit_code.into()), - Snapshot::store_directory_digest(py, result.output_directory)?, - externs::unsafe_call( - py, - context.core.types.process_result_metadata, - &[ - result - .metadata - .total_elapsed - .map(|d| { - externs::store_u64(py, Duration::from(d).as_millis() as u64) - }) - .unwrap_or_else(|| Value::from(py.None())), - Value::from( - externs::process::PyProcessExecutionEnvironment { - environment: result.metadata.environment, - } - .into_py(py), - ), - externs::store_utf8(py, result.metadata.source.into()), - externs::store_u64(py, result.metadata.source_run_id.0.into()), - ], - ), - ], - )) - }) - } - .boxed() -} - fn session_values(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { async move { context.get(SessionValues).await }.boxed() } diff --git a/src/rust/engine/src/intrinsics/process.rs b/src/rust/engine/src/intrinsics/process.rs new file mode 100644 index 00000000000..5105dbcfd30 --- /dev/null +++ b/src/rust/engine/src/intrinsics/process.rs @@ -0,0 +1,82 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::time::Duration; + +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use futures::try_join; +use pyo3::{IntoPy, Python}; + +use crate::context::Context; +use crate::externs; +use crate::nodes::{ExecuteProcess, NodeResult, Snapshot}; +use crate::python::Value; + +pub(crate) fn process_request_to_process_result( + context: Context, + mut args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { + let process_config: externs::process::PyProcessExecutionEnvironment = + Python::with_gil(|py| { + args.pop() + .unwrap() + .as_ref() + .extract(py) + .map_err(|e| format!("{e}")) + })?; + let process_request = + ExecuteProcess::lift(&context.core.store(), args.pop().unwrap(), process_config) + .map_err(|e| e.enrich("Error lifting Process")) + .await?; + + let result = context.get(process_request).await?.result; + + let store = context.core.store(); + let (stdout_bytes, stderr_bytes) = try_join!( + store + .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) + .map_err(|e| e.enrich("Bytes from stdout")), + store + .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) + .map_err(|e| e.enrich("Bytes from stderr")) + )?; + + Python::with_gil(|py| -> NodeResult { + Ok(externs::unsafe_call( + py, + context.core.types.process_result, + &[ + externs::store_bytes(py, &stdout_bytes), + Snapshot::store_file_digest(py, result.stdout_digest)?, + externs::store_bytes(py, &stderr_bytes), + Snapshot::store_file_digest(py, result.stderr_digest)?, + externs::store_i64(py, result.exit_code.into()), + Snapshot::store_directory_digest(py, result.output_directory)?, + externs::unsafe_call( + py, + context.core.types.process_result_metadata, + &[ + result + .metadata + .total_elapsed + .map(|d| { + externs::store_u64(py, Duration::from(d).as_millis() as u64) + }) + .unwrap_or_else(|| Value::from(py.None())), + Value::from( + externs::process::PyProcessExecutionEnvironment { + environment: result.metadata.environment, + } + .into_py(py), + ), + externs::store_utf8(py, result.metadata.source.into()), + externs::store_u64(py, result.metadata.source_run_id.0.into()), + ], + ), + ], + )) + }) + } + .boxed() +} From 9fa1c5c0eb4162299a720cca9c092fcb26be0742 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 22:00:59 -0400 Subject: [PATCH 7/9] refactor misc values intrinsics --- src/rust/engine/src/intrinsics/mod.rs | 14 ++++---------- src/rust/engine/src/intrinsics/values.rs | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 10 deletions(-) create mode 100644 src/rust/engine/src/intrinsics/values.rs diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index 6d5c75b6267..3154a091f77 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -1,12 +1,12 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use futures::future::{BoxFuture, FutureExt}; +use futures::future::BoxFuture; use indexmap::IndexMap; use rule_graph::{DependencyKey, RuleId}; use crate::context::Context; -use crate::nodes::{NodeResult, RunId, SessionValues}; +use crate::nodes::NodeResult; use crate::python::Value; use crate::tasks::Intrinsic; use crate::types::Types; @@ -17,6 +17,7 @@ mod digests; mod docker; mod interactive_process; mod process; +mod values; use self::dep_inference::{parse_javascript_deps, parse_python_deps}; use self::digests::{ @@ -28,6 +29,7 @@ use self::digests::{ use self::docker::docker_resolve_image; use self::interactive_process::interactive_process; use self::process::process_request_to_process_result; +use self::values::{run_id, session_values}; type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -201,11 +203,3 @@ impl Intrinsics { function(context, args).await } } - -fn session_values(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { - async move { context.get(SessionValues).await }.boxed() -} - -fn run_id(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { - async move { context.get(RunId).await }.boxed() -} diff --git a/src/rust/engine/src/intrinsics/values.rs b/src/rust/engine/src/intrinsics/values.rs new file mode 100644 index 00000000000..7d58411c80e --- /dev/null +++ b/src/rust/engine/src/intrinsics/values.rs @@ -0,0 +1,19 @@ +// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use futures::future::{BoxFuture, FutureExt}; + +use crate::context::Context; +use crate::nodes::{NodeResult, RunId, SessionValues}; +use crate::python::Value; + +pub(crate) fn session_values( + context: Context, + _args: Vec, +) -> BoxFuture<'static, NodeResult> { + async move { context.get(SessionValues).await }.boxed() +} + +pub(crate) fn run_id(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { + async move { context.get(RunId).await }.boxed() +} From 5ca68cc1b0dedb702338d1adbdf7f4b9a9cadc6d Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 16 Mar 2024 22:05:51 -0400 Subject: [PATCH 8/9] remove imports from sub-modules --- src/rust/engine/src/intrinsics/mod.rs | 48 ++++++++++----------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index 3154a091f77..c4e6e7d4e38 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -19,18 +19,6 @@ mod interactive_process; mod process; mod values; -use self::dep_inference::{parse_javascript_deps, parse_python_deps}; -use self::digests::{ - add_prefix_request_to_digest, create_digest_to_digest, digest_subset_to_digest, - digest_to_snapshot, directory_digest_to_digest_contents, directory_digest_to_digest_entries, - download_file_to_digest, merge_digests_request_to_digest, path_globs_to_digest, - path_globs_to_paths, remove_prefix_request_to_digest, -}; -use self::docker::docker_resolve_image; -use self::interactive_process::interactive_process; -use self::process::process_request_to_process_result; -use self::values::{run_id, session_values}; - type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -47,7 +35,7 @@ impl Intrinsics { types.directory_digest, types.create_digest, ), - Box::new(create_digest_to_digest), + Box::new(self::digests::create_digest_to_digest), ); intrinsics.insert( Intrinsic::new( @@ -55,11 +43,11 @@ impl Intrinsics { types.directory_digest, types.path_globs, ), - Box::new(path_globs_to_digest), + Box::new(self::digests::path_globs_to_digest), ); intrinsics.insert( Intrinsic::new("path_globs_to_paths", types.paths, types.path_globs), - Box::new(path_globs_to_paths), + Box::new(self::digests::path_globs_to_paths), ); intrinsics.insert( Intrinsic::new( @@ -67,11 +55,11 @@ impl Intrinsics { types.directory_digest, types.native_download_file, ), - Box::new(download_file_to_digest), + Box::new(self::digests::download_file_to_digest), ); intrinsics.insert( Intrinsic::new("digest_to_snapshot", types.snapshot, types.directory_digest), - Box::new(digest_to_snapshot), + Box::new(self::digests::digest_to_snapshot), ); intrinsics.insert( Intrinsic::new( @@ -79,7 +67,7 @@ impl Intrinsics { types.digest_contents, types.directory_digest, ), - Box::new(directory_digest_to_digest_contents), + Box::new(self::digests::directory_digest_to_digest_contents), ); intrinsics.insert( Intrinsic::new( @@ -87,7 +75,7 @@ impl Intrinsics { types.digest_entries, types.directory_digest, ), - Box::new(directory_digest_to_digest_entries), + Box::new(self::digests::directory_digest_to_digest_entries), ); intrinsics.insert( Intrinsic::new( @@ -95,7 +83,7 @@ impl Intrinsics { types.directory_digest, types.merge_digests, ), - Box::new(merge_digests_request_to_digest), + Box::new(self::digests::merge_digests_request_to_digest), ); intrinsics.insert( Intrinsic::new( @@ -103,7 +91,7 @@ impl Intrinsics { types.directory_digest, types.remove_prefix, ), - Box::new(remove_prefix_request_to_digest), + Box::new(self::digests::remove_prefix_request_to_digest), ); intrinsics.insert( Intrinsic::new( @@ -111,7 +99,7 @@ impl Intrinsics { types.directory_digest, types.add_prefix, ), - Box::new(add_prefix_request_to_digest), + Box::new(self::digests::add_prefix_request_to_digest), ); intrinsics.insert( Intrinsic { @@ -122,7 +110,7 @@ impl Intrinsics { DependencyKey::new(types.process_config_from_environment), ], }, - Box::new(process_request_to_process_result), + Box::new(self::process::process_request_to_process_result), ); intrinsics.insert( Intrinsic::new( @@ -130,7 +118,7 @@ impl Intrinsics { types.directory_digest, types.digest_subset, ), - Box::new(digest_subset_to_digest), + Box::new(self::digests::digest_subset_to_digest), ); intrinsics.insert( Intrinsic { @@ -138,7 +126,7 @@ impl Intrinsics { product: types.session_values, inputs: vec![], }, - Box::new(session_values), + Box::new(self::values::session_values), ); intrinsics.insert( Intrinsic { @@ -146,7 +134,7 @@ impl Intrinsics { product: types.run_id, inputs: vec![], }, - Box::new(run_id), + Box::new(self::values::run_id), ); intrinsics.insert( Intrinsic { @@ -157,7 +145,7 @@ impl Intrinsics { DependencyKey::new(types.process_config_from_environment), ], }, - Box::new(interactive_process), + Box::new(self::interactive_process::interactive_process), ); intrinsics.insert( Intrinsic { @@ -165,7 +153,7 @@ impl Intrinsics { product: types.docker_resolve_image_result, inputs: vec![DependencyKey::new(types.docker_resolve_image_request)], }, - Box::new(docker_resolve_image), + Box::new(self::docker::docker_resolve_image), ); intrinsics.insert( Intrinsic { @@ -173,7 +161,7 @@ impl Intrinsics { product: types.parsed_python_deps_result, inputs: vec![DependencyKey::new(types.deps_request)], }, - Box::new(parse_python_deps), + Box::new(self::dep_inference::parse_python_deps), ); intrinsics.insert( Intrinsic { @@ -181,7 +169,7 @@ impl Intrinsics { product: types.parsed_javascript_deps_result, inputs: vec![DependencyKey::new(types.deps_request)], }, - Box::new(parse_javascript_deps), + Box::new(self::dep_inference::parse_javascript_deps), ); Intrinsics { intrinsics } } From 091cbc95f6b0a5269ba0aaa27332ecb9aacd2f40 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sun, 17 Mar 2024 10:19:17 +0100 Subject: [PATCH 9/9] use same year as base file --- src/rust/engine/src/intrinsics/dep_inference.rs | 2 +- src/rust/engine/src/intrinsics/digests.rs | 2 +- src/rust/engine/src/intrinsics/docker.rs | 2 +- src/rust/engine/src/intrinsics/interactive_process.rs | 2 +- src/rust/engine/src/intrinsics/process.rs | 2 +- src/rust/engine/src/intrinsics/values.rs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rust/engine/src/intrinsics/dep_inference.rs b/src/rust/engine/src/intrinsics/dep_inference.rs index 402792ed74a..2a5124ee143 100644 --- a/src/rust/engine/src/intrinsics/dep_inference.rs +++ b/src/rust/engine/src/intrinsics/dep_inference.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::path::PathBuf; diff --git a/src/rust/engine/src/intrinsics/digests.rs b/src/rust/engine/src/intrinsics/digests.rs index 9cac1401da9..97a8871adbb 100644 --- a/src/rust/engine/src/intrinsics/digests.rs +++ b/src/rust/engine/src/intrinsics/digests.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::HashMap; diff --git a/src/rust/engine/src/intrinsics/docker.rs b/src/rust/engine/src/intrinsics/docker.rs index 4074ad650ba..979f798b079 100644 --- a/src/rust/engine/src/intrinsics/docker.rs +++ b/src/rust/engine/src/intrinsics/docker.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use docker::docker::{ImagePullPolicy, ImagePullScope, DOCKER, IMAGE_PULL_CACHE}; diff --git a/src/rust/engine/src/intrinsics/interactive_process.rs b/src/rust/engine/src/intrinsics/interactive_process.rs index 449a1fc4863..105138a18e2 100644 --- a/src/rust/engine/src/intrinsics/interactive_process.rs +++ b/src/rust/engine/src/intrinsics/interactive_process.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::env::current_dir; diff --git a/src/rust/engine/src/intrinsics/process.rs b/src/rust/engine/src/intrinsics/process.rs index 5105dbcfd30..6340a3209ea 100644 --- a/src/rust/engine/src/intrinsics/process.rs +++ b/src/rust/engine/src/intrinsics/process.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::time::Duration; diff --git a/src/rust/engine/src/intrinsics/values.rs b/src/rust/engine/src/intrinsics/values.rs index 7d58411c80e..20cfd0d6ac1 100644 --- a/src/rust/engine/src/intrinsics/values.rs +++ b/src/rust/engine/src/intrinsics/values.rs @@ -1,4 +1,4 @@ -// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use futures::future::{BoxFuture, FutureExt};