From d26ef1d46a0af2c075157d4c7a4a9f9ac4857437 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Sun, 21 Apr 2024 15:31:39 -0700 Subject: [PATCH] Expose intrinsics in `intrinsics.py` via a native call response for generators. --- src/python/pants/engine/intrinsics.py | 126 ++++++ src/python/pants/init/engine_initializer.py | 2 + src/rust/engine/src/context.rs | 4 - src/rust/engine/src/externs/interface.rs | 43 +-- src/rust/engine/src/externs/mod.rs | 50 +++ .../engine/src/intrinsics/dep_inference.rs | 63 +-- src/rust/engine/src/intrinsics/digests.rs | 359 +++++++++--------- src/rust/engine/src/intrinsics/docker.rs | 33 +- .../src/intrinsics/interactive_process.rs | 309 ++++++++------- src/rust/engine/src/intrinsics/mod.rs | 188 +-------- src/rust/engine/src/intrinsics/process.rs | 46 +-- src/rust/engine/src/intrinsics/values.rs | 27 +- src/rust/engine/src/lib.rs | 3 +- src/rust/engine/src/nodes/mod.rs | 92 ++--- src/rust/engine/src/nodes/task.rs | 30 +- src/rust/engine/src/tasks.rs | 162 +++----- 16 files changed, 755 insertions(+), 782 deletions(-) create mode 100644 src/python/pants/engine/intrinsics.py diff --git a/src/python/pants/engine/intrinsics.py b/src/python/pants/engine/intrinsics.py new file mode 100644 index 000000000000..186cb79cfd57 --- /dev/null +++ b/src/python/pants/engine/intrinsics.py @@ -0,0 +1,126 @@ +# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +from __future__ import annotations + +from pants.engine.internals import native_engine +from pants.engine.rules import collect_rules, rule, _uncacheable_rule +from pants.engine.fs import Digest, Paths, Snapshot, DigestContents, DigestEntries, MergeDigests, RemovePrefix, AddPrefix, DigestSubset, CreateDigest, PathGlobs, NativeDownloadFile +from pants.engine.process import FallibleProcessResult, InteractiveProcessResult, InteractiveProcess +from pants.engine.internals.native_engine import NativeDependenciesRequest +from pants.engine.internals.session import SessionValues, RunId +from pants.engine.process import ( + Process, + ProcessExecutionEnvironment, + ProcessExecutionFailure, + ProcessResultMetadata, +) +from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult +from pants.engine.internals.native_dep_inference import ( + NativeParsedJavascriptDependencies, + NativeParsedPythonDependencies, +) + + +@rule +async def create_digest_to_digest( + create_digest: CreateDigest, +) -> Digest: + return await native_engine.create_digest_to_digest(create_digest) + + +@rule +async def path_globs_to_digest( + path_globs: PathGlobs, +) -> Digest: + return await native_engine.path_globs_to_digest(path_globs) + + +@rule +async def path_globs_to_paths( + path_globs: PathGlobs, +) -> Paths: + return await native_engine.path_globs_to_paths(path_globs) + + +@rule +async def download_file_to_digest( + native_download_file: NativeDownloadFile, +) -> Digest: + return await native_engine.download_file_to_digest(native_download_file) + + +@rule +async def digest_to_snapshot(digest: Digest) -> Snapshot: + return await native_engine.digest_to_snapshot(digest) + + +@rule +async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents: + return await native_engine.directory_digest_to_digest_contents(digest) + + +@rule +async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries: + return await native_engine.directory_digest_to_digest_entries(digest) + + +@rule +async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest: + return await native_engine.merge_digests_request_to_digest(merge_digests) + + +@rule +async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest: + return await native_engine.remove_prefix_request_to_digest(remove_prefix) + + +@rule +async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest: + return await native_engine.add_prefix_request_to_digest(add_prefix) + + +@rule +async def process_request_to_process_result(process: Process, process_execution_environment: ProcessExecutionEnvironment) -> FallibleProcessResult: + return await native_engine.process_request_to_process_result(process, process_execution_environment) + + +@rule +async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest: + return await native_engine.digest_subset_to_digest(digest_subset) + + +@rule +async def session_values() -> SessionValues: + return await native_engine.session_values() + + +@rule +async def run_id() -> RunId: + return await native_engine.run_id() + + +@_uncacheable_rule +async def interactive_process(process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment) -> InteractiveProcessResult: + return await native_engine.interactive_process(process, process_execution_environment) + + +@rule +async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult: + return await native_engine.docker_resolve_image(request) + + +@rule +async def parse_python_deps(deps_request: NativeDependenciesRequest) -> NativeParsedPythonDependencies: + return await native_engine.parse_python_deps(deps_request) + + +@rule +async def parse_javascript_deps(deps_request: NativeDependenciesRequest) -> NativeParsedJavascriptDependencies: + return await native_engine.parse_javascript_deps(deps_request) + + +def rules(): + return [ + *collect_rules(), + ] diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index f65fc389c15c..7669d477723a 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -21,6 +21,7 @@ from pants.engine.environment import EnvironmentName from pants.engine.fs import PathGlobs, Snapshot, Workspace from pants.engine.goal import CurrentExecutingGoals, Goal +from pants.engine import intrinsics from pants.engine.internals import ( build_files, dep_rules, @@ -276,6 +277,7 @@ def current_executing_goals(session_values: SessionValues) -> CurrentExecutingGo rules = FrozenOrderedSet( ( *collect_rules(locals()), + *intrinsics.rules(), *build_files.rules(), *fs.rules(), *dep_rules.rules(), diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 3edf42450fc4..8bd57c844497 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -10,7 +10,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use crate::intrinsics::Intrinsics; use crate::nodes::{ExecuteProcess, NodeKey, NodeOutput, NodeResult}; use crate::python::{throw, Failure}; use crate::session::{Session, Sessions}; @@ -62,7 +61,6 @@ pub struct Core { pub tasks: Tasks, pub rule_graph: RuleGraph, pub types: Types, - pub intrinsics: Intrinsics, pub executor: Executor, store: Store, /// The CommandRunners to use for execution, in ascending order of reliability (for the purposes @@ -519,7 +517,6 @@ impl Core { executor: Executor, tasks: Tasks, types: Types, - intrinsics: Intrinsics, build_root: PathBuf, ignore_patterns: Vec, use_gitignore: bool, @@ -683,7 +680,6 @@ impl Core { tasks, rule_graph, types, - intrinsics, executor: executor.clone(), store, command_runners, diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 588ce958fdb0..61c9110b58e0 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -39,7 +39,7 @@ use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType}; use pyo3::{create_exception, IntoPy, PyAny, PyRef}; use regex::Regex; use remote::remote_cache::RemoteCacheWarningsBehavior; -use rule_graph::{self, DependencyKey, RuleGraph, RuleId}; +use rule_graph::{self, RuleGraph}; use store::RemoteProvider; use task_executor::Executor; use workunit_store::{ @@ -49,14 +49,16 @@ use workunit_store::{ use crate::externs::fs::{possible_store_missing_digest, PyFileDigest}; use crate::externs::process::PyProcessExecutionEnvironment; +use crate::intrinsics; use crate::{ externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, - Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions, - Rule, Scheduler, Session, SessionCore, Tasks, TypeId, Types, Value, + Failure, Function, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session, + SessionCore, Tasks, TypeId, Types, Value, }; #[pymodule] fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> { + intrinsics::register(py, m)?; externs::register(py, m)?; externs::address::register(py, m)?; externs::fs::register(m)?; @@ -701,9 +703,7 @@ fn scheduler_create( .borrow_mut() .take() .ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?; - let intrinsics = Intrinsics::new(&types); - let mut tasks = py_tasks.0.replace(Tasks::new()); - tasks.intrinsics_set(&intrinsics); + let tasks = py_tasks.0.replace(Tasks::new()); // NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to // use `tokio::spawn` since Python does not setup Tokio for the main thread. This also @@ -716,7 +716,6 @@ fn scheduler_create( py_executor.0.clone(), tasks, types, - intrinsics, build_root, ignore_patterns, use_gitignore, @@ -1022,22 +1021,13 @@ fn session_run_interactive_process( let interactive_process: Value = interactive_process.into(); let process_config = Value::new(process_config_from_environment.into_py(py)); py.allow_threads(|| { - core.executor.clone().block_on(nodes::maybe_side_effecting( - true, - &Arc::new(std::sync::atomic::AtomicBool::new(true)), - core.intrinsics.run( - &Intrinsic { - id: RuleId::new("interactive_process"), - product: core.types.interactive_process_result, - inputs: vec![ - DependencyKey::new(core.types.interactive_process), - DependencyKey::new(core.types.process_config_from_environment), - ], - }, - context, - vec![interactive_process, process_config], - ), - )) + core.executor + .clone() + .block_on(intrinsics::interactive_process_inner( + &context, + interactive_process, + process_config, + )) }) .map(|v| v.into()) .map_err(|e| PyException::new_err(e.to_string())) @@ -1407,8 +1397,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R core.executor.enter(|| { let result = PyDict::new(py); for (rule, rule_dependencies) in core.rule_graph.rule_dependencies() { - let Rule::Task(task) = rule else { continue }; - + let task = rule.0; let function = &task.func; let mut dependencies = Vec::new(); for (dependency_key, rule) in rule_dependencies { @@ -1421,7 +1410,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R { continue; } - let Rule::Task(task) = rule else { continue }; + let function = &rule.0.func; let provided_params = dependency_key .provided_params @@ -1431,7 +1420,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R dependencies.push(( dependency_key.product.as_py_type(py), provided_params, - task.func.0.value.into_py(py), + function.0.value.into_py(py), )); } if dependencies.is_empty() { diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index 68d78aecd689..701f74709810 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -9,6 +9,8 @@ use std::collections::BTreeMap; use std::convert::TryInto; use std::fmt; +use futures::future::{BoxFuture, Future}; +use futures::FutureExt; use lazy_static::lazy_static; use pyo3::exceptions::{PyAssertionError, PyException, PyStopIteration, PyTypeError, PyValueError}; use pyo3::prelude::*; @@ -42,6 +44,7 @@ pub mod workunits; pub fn register(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -334,6 +337,8 @@ pub(crate) fn generator_send( // It isn't necessary to differentiate between `Get` and `Effect` here, as the static // analysis of `@rule`s has already validated usage. Ok(GeneratorResponse::Get(get.take()?)) + } else if let Ok(call) = response.extract::>(py) { + Ok(GeneratorResponse::NativeCall(call.take()?)) } else if let Ok(get_multi) = response.downcast::(py) { // Was an `All` or `MultiGet`. let gogs = get_multi @@ -463,6 +468,45 @@ fn interpret_get_inputs( } } +#[pyclass] +pub struct PyGeneratorResponseNativeCall(RefCell>); + +impl PyGeneratorResponseNativeCall { + pub fn new(call: impl Future> + 'static + Send) -> Self { + Self(RefCell::new(Some(NativeCall { call: call.boxed() }))) + } + + fn take(&self) -> Result { + self.0 + .borrow_mut() + .take() + .ok_or_else(|| "A `NativeCall` may only be consumed once.".to_owned()) + } +} + +#[pymethods] +impl PyGeneratorResponseNativeCall { + fn __await__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> { + self_ + } + + fn __iter__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> { + self_ + } + + fn __next__(self_: PyRef<'_, Self>) -> Option> { + Some(self_) + } + + fn send(&self, py: Python, value: PyObject) -> PyResult<()> { + // TODO: PyStopIteration::new_err(value) seems to attempt to interpret + // the value in some cases, which mangles the return type. + let locals = PyDict::new(py); + locals.set_item("v", value)?; + py.run("raise StopIteration(v)", None, Some(locals)) + } +} + #[pyclass(subclass)] pub struct PyGeneratorResponseCall(RefCell>); @@ -614,6 +658,10 @@ impl PyGeneratorResponseGet { } } +pub struct NativeCall { + pub call: BoxFuture<'static, Result>, +} + #[derive(Debug)] pub struct Call { pub rule_id: RuleId, @@ -681,6 +729,8 @@ pub enum GetOrGenerator { pub enum GeneratorResponse { /// The generator has completed with the given value of the given type. Break(Value, TypeId), + /// The generator is awaiting a call to a unknown native function. + NativeCall(NativeCall), /// The generator is awaiting a call to a known rule. Call(Call), /// The generator is awaiting a call to an unknown rule. diff --git a/src/rust/engine/src/intrinsics/dep_inference.rs b/src/rust/engine/src/intrinsics/dep_inference.rs index 2a5124ee143a..cd1794d02dae 100644 --- a/src/rust/engine/src/intrinsics/dep_inference.rs +++ b/src/rust/engine/src/intrinsics/dep_inference.rs @@ -9,22 +9,28 @@ use dep_inference::javascript::ParsedJavascriptDependencies; use dep_inference::python::ParsedPythonDependencies; use dep_inference::{javascript, python}; use fs::{DirectoryDigest, Entry, SymlinkBehavior}; -use futures::future::{BoxFuture, FutureExt}; use grpc_util::prost::MessageExt; use hashing::Digest; use protos::gen::pants::cache::{ dependency_inference_request, CacheKey, CacheKeyType, DependencyInferenceRequest, }; -use pyo3::{Python, ToPyObject}; +use pyo3::prelude::{pyfunction, wrap_pyfunction, PyModule, PyResult, Python, ToPyObject}; use store::Store; use workunit_store::{in_workunit, Level}; -use crate::context::Context; use crate::externs::dep_inference::PyNativeDependenciesRequest; -use crate::nodes::NodeResult; -use crate::python::Value; +use crate::externs::PyGeneratorResponseNativeCall; +use crate::nodes::{task_get_context, NodeResult}; +use crate::python::{Failure, Value}; use crate::{externs, Core}; +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(parse_python_deps, m)?)?; + m.add_function(wrap_pyfunction!(parse_javascript_deps, m)?)?; + + Ok(()) +} + pub(crate) struct PreparedInferenceRequest { digest: Digest, /// The request that's guaranteed to have been constructed via ::prepare(). @@ -36,7 +42,7 @@ pub(crate) struct PreparedInferenceRequest { impl PreparedInferenceRequest { pub async fn prepare( - args: Vec, + deps_request: Value, store: &Store, backend: &str, impl_hash: &str, @@ -44,7 +50,7 @@ impl PreparedInferenceRequest { let PyNativeDependenciesRequest { directory_digest, metadata, - } = Python::with_gil(|py| (*args[0]).as_ref(py).extract())?; + } = Python::with_gil(|py| deps_request.extract(py))?; let (path, digest) = Self::find_one_file(directory_digest, store, backend).await?; let str_path = path.display().to_string(); @@ -103,15 +109,16 @@ impl PreparedInferenceRequest { } } -pub(crate) fn parse_python_deps( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn parse_python_deps(deps_request: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let core = &context.core; let store = core.store(); let prepared_inference_request = - PreparedInferenceRequest::prepare(args, &store, "Python", python::IMPL_HASH).await?; + PreparedInferenceRequest::prepare(deps_request, &store, "Python", python::IMPL_HASH) + .await?; in_workunit!( "parse_python_dependencies", Level::Debug, @@ -141,24 +148,27 @@ pub(crate) fn parse_python_deps( ) }); - Ok(result) + Ok::<_, Failure>(result) } ) .await - } - .boxed() + }) } -pub(crate) fn parse_javascript_deps( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn parse_javascript_deps(deps_request: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let core = &context.core; let store = core.store(); - let prepared_inference_request = - PreparedInferenceRequest::prepare(args, &store, "Javascript", javascript::IMPL_HASH) - .await?; + let prepared_inference_request = PreparedInferenceRequest::prepare( + deps_request, + &store, + "Javascript", + javascript::IMPL_HASH, + ) + .await?; in_workunit!( "parse_javascript_dependencies", @@ -202,12 +212,11 @@ pub(crate) fn parse_javascript_deps( ) }); - Ok(result) + Ok::<_, Failure>(result) } ) .await - } - .boxed() + }) } pub(crate) async fn get_or_create_inferred_dependencies( diff --git a/src/rust/engine/src/intrinsics/digests.rs b/src/rust/engine/src/intrinsics/digests.rs index 97a8871adbb1..10191e08c35e 100644 --- a/src/rust/engine/src/intrinsics/digests.rs +++ b/src/rust/engine/src/intrinsics/digests.rs @@ -7,63 +7,78 @@ use std::path::PathBuf; use fs::{ DigestTrie, DirectoryDigest, GlobMatching, PathStat, RelativePath, SymlinkBehavior, TypedPath, }; -use futures::future::{BoxFuture, FutureExt}; use hashing::{Digest, EMPTY_DIGEST}; -use pyo3::{PyRef, Python}; +use pyo3::prelude::{pyfunction, wrap_pyfunction, PyModule, PyRef, PyResult, Python}; use store::{SnapshotOps, SubsetParams}; -use crate::context::Context; use crate::externs; use crate::externs::fs::{PyAddPrefix, PyFileDigest, PyMergeDigests, PyRemovePrefix}; +use crate::externs::PyGeneratorResponseNativeCall; use crate::nodes::{ - lift_directory_digest, unmatched_globs_additional_context, DownloadedFile, NodeResult, Snapshot, + lift_directory_digest, task_get_context, unmatched_globs_additional_context, DownloadedFile, + NodeResult, Snapshot, }; use crate::python::{throw, Key, Value}; use crate::Failure; -pub(crate) fn directory_digest_to_digest_contents( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(add_prefix_request_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(create_digest_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(digest_subset_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(digest_to_snapshot, m)?)?; + m.add_function(wrap_pyfunction!(directory_digest_to_digest_contents, m)?)?; + m.add_function(wrap_pyfunction!(directory_digest_to_digest_entries, m)?)?; + m.add_function(wrap_pyfunction!(download_file_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(merge_digests_request_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(path_globs_to_digest, m)?)?; + m.add_function(wrap_pyfunction!(path_globs_to_paths, m)?)?; + m.add_function(wrap_pyfunction!(remove_prefix_request_to_digest, m)?)?; + + Ok(()) +} + +#[pyfunction] +fn directory_digest_to_digest_contents(digest: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); + let py_digest = digest.as_ref().as_ref(py); lift_directory_digest(py_digest) })?; let digest_contents = context.core.store().contents_for_directory(digest).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_digest_contents(py, &context, &digest_contents) })?) - } - .boxed() + }) } -pub(crate) fn directory_digest_to_digest_entries( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn directory_digest_to_digest_entries(digest: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); + let py_digest = digest.as_ref().as_ref(py); lift_directory_digest(py_digest) })?; let digest_entries = context.core.store().entries_for_directory(digest).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_digest_entries(py, &context, &digest_entries) })?) - } - .boxed() + }) } -pub(crate) fn remove_prefix_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn remove_prefix_request_to_digest(remove_prefix: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let (digest, prefix) = Python::with_gil(|py| { - let py_remove_prefix = (*args[0]) + let py_remove_prefix = remove_prefix + .as_ref() .as_ref(py) .extract::>() .map_err(|e| throw(format!("{e}")))?; @@ -73,20 +88,20 @@ pub(crate) fn remove_prefix_request_to_digest( res })?; let digest = context.core.store().strip_prefix(digest, &prefix).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, digest) })?) - } - .boxed() + }) } -pub(crate) fn add_prefix_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn add_prefix_request_to_digest(add_prefix: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let (digest, prefix) = Python::with_gil(|py| { - let py_add_prefix = (*args[0]) + let py_add_prefix = add_prefix + .as_ref() .as_ref(py) .extract::>() .map_err(|e| throw(format!("{e}")))?; @@ -97,93 +112,90 @@ pub(crate) fn add_prefix_request_to_digest( res })?; let digest = context.core.store().add_prefix(digest, &prefix).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, digest) })?) - } - .boxed() + }) } -pub(crate) fn digest_to_snapshot( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let store = context.core.store(); - async move { +#[pyfunction] +fn digest_to_snapshot(digest: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let store = context.core.store(); + let digest = Python::with_gil(|py| { - let py_digest = (*args[0]).as_ref(py); + let py_digest = digest.as_ref().as_ref(py); lift_directory_digest(py_digest) })?; let snapshot = store::Snapshot::from_digest(store, digest).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_snapshot(py, snapshot) })?) - } - .boxed() + }) } -pub(crate) fn merge_digests_request_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let core = &context.core; - let store = core.store(); - async move { +#[pyfunction] +fn merge_digests_request_to_digest(digests: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + + let core = &context.core; + let store = core.store(); + let digests = Python::with_gil(|py| { - (*args[0]) + digests + .as_ref() .as_ref(py) .extract::>() .map(|py_merge_digests| py_merge_digests.0.clone()) .map_err(|e| throw(format!("{e}"))) })?; let digest = store.merge(digests).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, digest) })?) - } - .boxed() + }) } -pub(crate) fn download_file_to_digest( - context: Context, - mut args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { - let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from)?; +#[pyfunction] +fn download_file_to_digest(download_file: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + + let key = Key::from_value(download_file).map_err(Failure::from)?; let snapshot = context.get(DownloadedFile(key)).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, snapshot.into()) })?) - } - .boxed() + }) } -pub(crate) fn path_globs_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +#[pyfunction] +fn path_globs_to_digest(path_globs: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let path_globs = Python::with_gil(|py| { - let py_path_globs = (*args[0]).as_ref(py); + let py_path_globs = path_globs.as_ref().as_ref(py); Snapshot::lift_path_globs(py_path_globs) }) .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; let snapshot = context.get(Snapshot::from_path_globs(path_globs)).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, snapshot.into()) })?) - } - .boxed() + }) } -pub(crate) fn path_globs_to_paths( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let core = context.core.clone(); - async move { +#[pyfunction] +fn path_globs_to_paths(path_globs: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let core = &context.core; + let path_globs = Python::with_gil(|py| { - let py_path_globs = (*args[0]).as_ref(py); + let py_path_globs = path_globs.as_ref().as_ref(py); Snapshot::lift_path_globs(py_path_globs) }) .map_err(|e| throw(format!("Failed to parse PathGlobs: {e}")))?; @@ -213,7 +225,7 @@ pub(crate) fn path_globs_to_paths( } } } - Ok(externs::unsafe_call( + Ok::<_, Failure>(externs::unsafe_call( py, core.types.paths, &[ @@ -222,8 +234,7 @@ pub(crate) fn path_globs_to_paths( ], )) }) - } - .boxed() + }) } enum CreateDigestItem { @@ -233,96 +244,101 @@ enum CreateDigestItem { Dir(RelativePath), } -pub(crate) fn create_digest_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let mut new_file_count = 0; +#[pyfunction] +fn create_digest_to_digest(py: Python, create_digest: Value) -> PyGeneratorResponseNativeCall { + let (items_to_store, trie) = py.allow_threads(|| { + let mut new_file_count = 0; - let items: Vec = { - Python::with_gil(|py| { - let py_create_digest = (*args[0]).as_ref(py); - externs::collect_iterable(py_create_digest) - .unwrap() - .into_iter() - .map(|obj| { - let raw_path: String = externs::getattr(obj, "path").unwrap(); - let path = RelativePath::new(PathBuf::from(raw_path)).unwrap(); - if obj.hasattr("content").unwrap() { - let bytes = bytes::Bytes::from( - externs::getattr::>(obj, "content").unwrap(), - ); - let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); - new_file_count += 1; - CreateDigestItem::FileContent(path, bytes, is_executable) - } else if obj.hasattr("file_digest").unwrap() { - let py_file_digest: PyFileDigest = - externs::getattr(obj, "file_digest").unwrap(); - let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); - CreateDigestItem::FileEntry(path, py_file_digest.0, is_executable) - } else if obj.hasattr("target").unwrap() { - let target: String = externs::getattr(obj, "target").unwrap(); - CreateDigestItem::SymlinkEntry(path, PathBuf::from(target)) - } else { - CreateDigestItem::Dir(path) - } - }) - .collect() - }) - }; - - let mut typed_paths: Vec = Vec::with_capacity(items.len()); - let mut file_digests: HashMap = HashMap::with_capacity(items.len()); - let mut items_to_store = Vec::with_capacity(new_file_count); - - for item in &items { - match item { - CreateDigestItem::FileContent(path, bytes, is_executable) => { - let digest = Digest::of_bytes(bytes); - items_to_store.push((digest.hash, bytes.clone())); - typed_paths.push(TypedPath::File { - path, - is_executable: *is_executable, - }); - file_digests.insert(path.to_path_buf(), digest); - } - CreateDigestItem::FileEntry(path, digest, is_executable) => { - typed_paths.push(TypedPath::File { - path, - is_executable: *is_executable, - }); - file_digests.insert(path.to_path_buf(), *digest); - } - CreateDigestItem::SymlinkEntry(path, target) => { - typed_paths.push(TypedPath::Link { path, target }); - file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); - } - CreateDigestItem::Dir(path) => { - typed_paths.push(TypedPath::Dir(path)); - file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + let items: Vec = { + Python::with_gil(|py| { + let py_create_digest = create_digest.as_ref().as_ref(py); + externs::collect_iterable(py_create_digest) + .unwrap() + .into_iter() + .map(|obj| { + let raw_path: String = externs::getattr(obj, "path").unwrap(); + let path = RelativePath::new(PathBuf::from(raw_path)).unwrap(); + if obj.hasattr("content").unwrap() { + let bytes = bytes::Bytes::from( + externs::getattr::>(obj, "content").unwrap(), + ); + let is_executable: bool = + externs::getattr(obj, "is_executable").unwrap(); + new_file_count += 1; + CreateDigestItem::FileContent(path, bytes, is_executable) + } else if obj.hasattr("file_digest").unwrap() { + let py_file_digest: PyFileDigest = + externs::getattr(obj, "file_digest").unwrap(); + let is_executable: bool = + externs::getattr(obj, "is_executable").unwrap(); + CreateDigestItem::FileEntry(path, py_file_digest.0, is_executable) + } else if obj.hasattr("target").unwrap() { + let target: String = externs::getattr(obj, "target").unwrap(); + CreateDigestItem::SymlinkEntry(path, PathBuf::from(target)) + } else { + CreateDigestItem::Dir(path) + } + }) + .collect() + }) + }; + + let mut typed_paths: Vec = Vec::with_capacity(items.len()); + let mut file_digests: HashMap = HashMap::with_capacity(items.len()); + let mut items_to_store = Vec::with_capacity(new_file_count); + + for item in &items { + match item { + CreateDigestItem::FileContent(path, bytes, is_executable) => { + let digest = Digest::of_bytes(bytes); + items_to_store.push((digest.hash, bytes.clone())); + typed_paths.push(TypedPath::File { + path, + is_executable: *is_executable, + }); + file_digests.insert(path.to_path_buf(), digest); + } + CreateDigestItem::FileEntry(path, digest, is_executable) => { + typed_paths.push(TypedPath::File { + path, + is_executable: *is_executable, + }); + file_digests.insert(path.to_path_buf(), *digest); + } + CreateDigestItem::SymlinkEntry(path, target) => { + typed_paths.push(TypedPath::Link { path, target }); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } + CreateDigestItem::Dir(path) => { + typed_paths.push(TypedPath::Dir(path)); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } } } - } - let store = context.core.store(); - let trie = DigestTrie::from_unique_paths(typed_paths, &file_digests).unwrap(); - async move { + let trie = DigestTrie::from_unique_paths(typed_paths, &file_digests).unwrap(); + + (items_to_store, trie) + }); + + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let store = context.core.store(); store.store_file_bytes_batch(items_to_store, true).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, trie.into()) })?) - } - .boxed() + }) } -pub(crate) fn digest_subset_to_digest( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - let store = context.core.store(); - async move { +#[pyfunction] +fn digest_subset_to_digest(digest_subset: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + + let store = context.core.store(); let (path_globs, original_digest) = Python::with_gil(|py| { - let py_digest_subset = (*args[0]).as_ref(py); + let py_digest_subset = digest_subset.as_ref().as_ref(py); let py_path_globs = externs::getattr(py_digest_subset, "globs").unwrap(); let py_digest = externs::getattr(py_digest_subset, "digest").unwrap(); let res: NodeResult<_> = Ok(( @@ -333,9 +349,8 @@ pub(crate) fn digest_subset_to_digest( })?; let subset_params = SubsetParams { globs: path_globs }; let digest = store.subset(original_digest, subset_params).await?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { Snapshot::store_directory_digest(py, digest) })?) - } - .boxed() + }) } diff --git a/src/rust/engine/src/intrinsics/docker.rs b/src/rust/engine/src/intrinsics/docker.rs index 979f798b079e..a4986b86cfe3 100644 --- a/src/rust/engine/src/intrinsics/docker.rs +++ b/src/rust/engine/src/intrinsics/docker.rs @@ -2,26 +2,30 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use docker::docker::{ImagePullPolicy, ImagePullScope, DOCKER, IMAGE_PULL_CACHE}; -use futures::future::{BoxFuture, FutureExt}; use process_execution::Platform; +use pyo3::prelude::{pyfunction, wrap_pyfunction, PyAny, PyModule, PyResult, Python, ToPyObject}; use pyo3::types::PyString; -use pyo3::{Python, ToPyObject}; -use crate::context::Context; -use crate::externs; -use crate::nodes::NodeResult; -use crate::python::Value; +use crate::externs::{self, PyGeneratorResponseNativeCall}; +use crate::nodes::task_get_context; +use crate::python::{Failure, Value}; + +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(docker_resolve_image, m)?)?; + + Ok(()) +} + +#[pyfunction] +fn docker_resolve_image(docker_request: Value) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); -pub(crate) fn docker_resolve_image( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { let types = &context.core.types; let docker_resolve_image_result = types.docker_resolve_image_result; let (image_name, platform) = Python::with_gil(|py| { - let py_docker_request = (*args[0]).as_ref(py); + let py_docker_request: &PyAny = docker_request.as_ref().as_ref(py); let image_name: String = externs::getattr(py_docker_request, "image_name").unwrap(); let platform: String = externs::getattr(py_docker_request, "platform").unwrap(); (image_name, platform) @@ -55,13 +59,12 @@ pub(crate) fn docker_resolve_image( .id .ok_or_else(|| format!("Image does not exist: `{}`", &image_name))?; - Ok(Python::with_gil(|py| { + Ok::<_, Failure>(Python::with_gil(|py| { externs::unsafe_call( py, docker_resolve_image_result, &[Value::from(PyString::new(py, &image_id).to_object(py))], ) })) - } - .boxed() + }) } diff --git a/src/rust/engine/src/intrinsics/interactive_process.rs b/src/rust/engine/src/intrinsics/interactive_process.rs index 105138a18e2c..323e0271db96 100644 --- a/src/rust/engine/src/intrinsics/interactive_process.rs +++ b/src/rust/engine/src/intrinsics/interactive_process.rs @@ -6,74 +6,102 @@ use std::path::{Path, PathBuf}; use std::process::Stdio; use std::str::FromStr; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use futures::future::TryFutureExt; use process_execution::local::{ apply_chroot, create_sandbox, prepare_workdir, setup_run_sh_script, KeepSandboxes, }; use process_execution::{ManagedChild, ProcessExecutionStrategy}; -use pyo3::{PyAny, Python, ToPyObject}; +use pyo3::prelude::{pyfunction, wrap_pyfunction, PyAny, PyModule, PyResult, Python, ToPyObject}; use stdio::TryCloneAsFile; use tokio::process; use workunit_store::{in_workunit, Level}; use crate::context::Context; -use crate::externs; -use crate::nodes::{task_side_effected, ExecuteProcess, NodeResult}; -use crate::python::Value; - -pub(crate) fn interactive_process( - context: Context, - args: Vec, -) -> BoxFuture<'static, NodeResult> { - in_workunit!( - "interactive_process", - Level::Debug, - |_workunit| async move { - let types = &context.core.types; - let interactive_process_result = types.interactive_process_result; - - let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessExecutionEnvironment) = Python::with_gil(|py| { - let py_interactive_process = (*args[0]).as_ref(py); +use crate::externs::{self, PyGeneratorResponseNativeCall}; +use crate::nodes::{task_get_context, task_side_effected, ExecuteProcess, NodeResult}; +use crate::python::{Failure, Value}; + +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(interactive_process, m)?)?; + + Ok(()) +} + +#[pyfunction] +fn interactive_process( + interactive_process: Value, + process_config: Value, +) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(in_workunit!( + "interactive_process", + Level::Debug, + |_workunit| async move { + let context = task_get_context(); + interactive_process_inner(&context, interactive_process, process_config).await + } + )) +} + +pub async fn interactive_process_inner( + context: &Context, + interactive_process: Value, + process_config: Value, +) -> NodeResult { + let types = &context.core.types; + let interactive_process_result = types.interactive_process_result; + + let (py_interactive_process, py_process, process_config): ( + Value, + Value, + externs::process::PyProcessExecutionEnvironment, + ) = Python::with_gil(|py| { + let py_interactive_process = interactive_process.as_ref().as_ref(py); let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap(); - let process_config = (*args[1]) - .as_ref(py) - .extract() - .unwrap(); - (py_interactive_process.extract().unwrap(), py_process, process_config) - }); - match process_config.environment.strategy { + let process_config = process_config.as_ref().as_ref(py).extract().unwrap(); + ( + py_interactive_process.extract().unwrap(), + py_process, + process_config, + ) + }); + + match process_config.environment.strategy { ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => { - // TODO: #17182 covers adding support for running processes interactively in Docker. - Err( - format!( - "Only local environments support running processes \ - interactively, but a {} environment was used.", - process_config.environment.strategy.strategy_type(), - ) - ) - }, - _ => Ok(()) - }?; - let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process; - let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| { + // TODO: #17182 covers adding support for running processes interactively in Docker. + Err(format!( + "Only local environments support running processes \ + interactively, but a {} environment was used.", + process_config.environment.strategy.strategy_type(), + )) + } + _ => Ok(()), + }?; + let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config) + .await? + .process; + let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| { let py_interactive_process_obj = py_interactive_process.to_object(py); let py_interactive_process = py_interactive_process_obj.as_ref(py); - let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); + let run_in_workspace: bool = + externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); - let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap(); - let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap(); + let keep_sandboxes_value: &PyAny = + externs::getattr(py_interactive_process, "keep_sandboxes").unwrap(); + let keep_sandboxes = + KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()) + .unwrap(); (run_in_workspace, restartable, keep_sandboxes) - }); + }); - let session = context.session.clone(); + let session = context.session.clone(); - let mut tempdir = create_sandbox( + let mut tempdir = create_sandbox( context.core.executor.clone(), &context.core.local_execution_root_dir, "interactive process", keep_sandboxes, - )?; - prepare_workdir( + )?; + prepare_workdir( tempdir.path().to_owned(), &context.core.local_execution_root_dir, &process, @@ -83,113 +111,114 @@ pub(crate) fn interactive_process( &context.core.immutable_inputs, None, None, - ) - .await?; - apply_chroot(tempdir.path().to_str().unwrap(), &mut process); - - let p = Path::new(&process.argv[0]); - // TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args - // instead. - let program_name = if !run_in_workspace && p.is_relative() { + ) + .await?; + apply_chroot(tempdir.path().to_str().unwrap(), &mut process); + + let p = Path::new(&process.argv[0]); + // TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args + // instead. + let program_name = if !run_in_workspace && p.is_relative() { let mut buf = PathBuf::new(); buf.push(tempdir.path()); buf.push(p); buf - } else { + } else { p.to_path_buf() - }; + }; - let mut command = process::Command::new(program_name); - if !run_in_workspace { + let mut command = process::Command::new(program_name); + if !run_in_workspace { command.current_dir(tempdir.path()); - } - for arg in process.argv[1..].iter() { + } + for arg in process.argv[1..].iter() { command.arg(arg); - } - - command.env_clear(); - command.envs(&process.env); - - if !restartable { - task_side_effected()?; - } - - let exit_status = session.clone() - .with_console_ui_disabled(async move { - // Once any UI is torn down, grab exclusive access to the console. - let (term_stdin, term_stdout, term_stderr) = - stdio::get_destination().exclusive_start(Box::new(|_| { - // A stdio handler that will immediately trigger logging. - Err(()) - }))?; - // NB: Command's stdio methods take ownership of a file-like to use, so we use - // `TryCloneAsFile` here to `dup` our thread-local stdio. - command - .stdin(Stdio::from( - term_stdin - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdin: {e}"))?, - )) - .stdout(Stdio::from( - term_stdout - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdout: {e}"))?, - )) - .stderr(Stdio::from( - term_stderr - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stderr: {e}"))?, - )); - let mut subprocess = - ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout)) - .map_err(|e| format!("Error executing interactive process: {e}"))?; - tokio::select! { - _ = session.cancelled() => { - // The Session was cancelled: attempt to kill the process group / process, and - // then wait for it to exit (to avoid zombies). - if let Err(e) = subprocess.attempt_shutdown_sync() { - // Failed to kill the PGID: try the non-group form. - log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ - This is unexpected: please file an issue about this problem at \ - [https://github.com/pantsbuild/pants/issues/new]", e); - subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?; - }; - subprocess.wait().await.map_err(|e| e.to_string()) - } - exit_status = subprocess.wait() => { - // The process exited. - exit_status.map_err(|e| e.to_string()) - } - } - }) - .await?; - - let code = exit_status.code().unwrap_or(-1); - if keep_sandboxes == KeepSandboxes::Always - || keep_sandboxes == KeepSandboxes::OnFailure && code != 0 { + } + + command.env_clear(); + command.envs(&process.env); + + if !restartable { + task_side_effected()?; + } + + let exit_status = session.clone() +.with_console_ui_disabled(async move { + // Once any UI is torn down, grab exclusive access to the console. + let (term_stdin, term_stdout, term_stderr) = + stdio::get_destination().exclusive_start(Box::new(|_| { + // A stdio handler that will immediately trigger logging. + Err(()) + }))?; + // NB: Command's stdio methods take ownership of a file-like to use, so we use + // `TryCloneAsFile` here to `dup` our thread-local stdio. + command + .stdin(Stdio::from( + term_stdin + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdin: {e}"))?, + )) + .stdout(Stdio::from( + term_stdout + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdout: {e}"))?, + )) + .stderr(Stdio::from( + term_stderr + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stderr: {e}"))?, + )); + let mut subprocess = + ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout)) + .map_err(|e| format!("Error executing interactive process: {e}"))?; + tokio::select! { + _ = session.cancelled() => { + // The Session was cancelled: attempt to kill the process group / process, and + // then wait for it to exit (to avoid zombies). + if let Err(e) = subprocess.attempt_shutdown_sync() { + // Failed to kill the PGID: try the non-group form. + log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ + This is unexpected: please file an issue about this problem at \ + [https://github.com/pantsbuild/pants/issues/new]", e); + subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?; + }; + subprocess.wait().await.map_err(|e| e.to_string()) + } + exit_status = subprocess.wait() => { + // The process exited. + exit_status.map_err(|e| e.to_string()) + } + } +}) +.await?; + + let code = exit_status.code().unwrap_or(-1); + if keep_sandboxes == KeepSandboxes::Always + || keep_sandboxes == KeepSandboxes::OnFailure && code != 0 + { tempdir.keep("interactive process"); let do_setup_run_sh_script = |workdir_path| -> Result<(), String> { - setup_run_sh_script(tempdir.path(), &process.env, &process.working_directory, &process.argv, workdir_path) + setup_run_sh_script( + tempdir.path(), + &process.env, + &process.working_directory, + &process.argv, + workdir_path, + ) }; if run_in_workspace { - let cwd = current_dir() - .map_err(|e| format!("Could not detect current working directory: {e}"))?; - do_setup_run_sh_script(cwd.as_path())?; + let cwd = current_dir() + .map_err(|e| format!("Could not detect current working directory: {e}"))?; + do_setup_run_sh_script(cwd.as_path())?; } else { - do_setup_run_sh_script(tempdir.path())?; + do_setup_run_sh_script(tempdir.path())?; } - } - - Ok( - Python::with_gil(|py| { + } + Ok::<_, Failure>(Python::with_gil(|py| { externs::unsafe_call( - py, - interactive_process_result, - &[externs::store_i64(py, i64::from(code))], + py, + interactive_process_result, + &[externs::store_i64(py, i64::from(code))], ) - - }) - ) - } - ).boxed() + })) } diff --git a/src/rust/engine/src/intrinsics/mod.rs b/src/rust/engine/src/intrinsics/mod.rs index c4e6e7d4e387..d42595063048 100644 --- a/src/rust/engine/src/intrinsics/mod.rs +++ b/src/rust/engine/src/intrinsics/mod.rs @@ -1,15 +1,7 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use futures::future::BoxFuture; -use indexmap::IndexMap; -use rule_graph::{DependencyKey, RuleId}; - -use crate::context::Context; -use crate::nodes::NodeResult; -use crate::python::Value; -use crate::tasks::Intrinsic; -use crate::types::Types; +use pyo3::prelude::{PyModule, PyResult, Python}; // Sub-modules with intrinsic implementations. mod dep_inference; @@ -19,175 +11,15 @@ mod interactive_process; mod process; mod values; -type IntrinsicFn = - Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; - -pub struct Intrinsics { - intrinsics: IndexMap, -} - -impl Intrinsics { - pub fn new(types: &Types) -> Intrinsics { - let mut intrinsics: IndexMap = IndexMap::new(); - intrinsics.insert( - Intrinsic::new( - "create_digest_to_digest", - types.directory_digest, - types.create_digest, - ), - Box::new(self::digests::create_digest_to_digest), - ); - intrinsics.insert( - Intrinsic::new( - "path_globs_to_digest", - types.directory_digest, - types.path_globs, - ), - Box::new(self::digests::path_globs_to_digest), - ); - intrinsics.insert( - Intrinsic::new("path_globs_to_paths", types.paths, types.path_globs), - Box::new(self::digests::path_globs_to_paths), - ); - intrinsics.insert( - Intrinsic::new( - "download_file_to_digest", - types.directory_digest, - types.native_download_file, - ), - Box::new(self::digests::download_file_to_digest), - ); - intrinsics.insert( - Intrinsic::new("digest_to_snapshot", types.snapshot, types.directory_digest), - Box::new(self::digests::digest_to_snapshot), - ); - intrinsics.insert( - Intrinsic::new( - "directory_digest_to_digest_contents", - types.digest_contents, - types.directory_digest, - ), - Box::new(self::digests::directory_digest_to_digest_contents), - ); - intrinsics.insert( - Intrinsic::new( - "directory_digest_to_digest_entries", - types.digest_entries, - types.directory_digest, - ), - Box::new(self::digests::directory_digest_to_digest_entries), - ); - intrinsics.insert( - Intrinsic::new( - "merge_digests_request_to_digest", - types.directory_digest, - types.merge_digests, - ), - Box::new(self::digests::merge_digests_request_to_digest), - ); - intrinsics.insert( - Intrinsic::new( - "remove_prefix_request_to_digest", - types.directory_digest, - types.remove_prefix, - ), - Box::new(self::digests::remove_prefix_request_to_digest), - ); - intrinsics.insert( - Intrinsic::new( - "add_prefix_request_to_digest", - types.directory_digest, - types.add_prefix, - ), - Box::new(self::digests::add_prefix_request_to_digest), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("process_request_to_process_result"), - product: types.process_result, - inputs: vec![ - DependencyKey::new(types.process), - DependencyKey::new(types.process_config_from_environment), - ], - }, - Box::new(self::process::process_request_to_process_result), - ); - intrinsics.insert( - Intrinsic::new( - "digest_subset_to_digest", - types.directory_digest, - types.digest_subset, - ), - Box::new(self::digests::digest_subset_to_digest), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("session_values"), - product: types.session_values, - inputs: vec![], - }, - Box::new(self::values::session_values), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("run_id"), - product: types.run_id, - inputs: vec![], - }, - Box::new(self::values::run_id), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("interactive_process"), - product: types.interactive_process_result, - inputs: vec![ - DependencyKey::new(types.interactive_process), - DependencyKey::new(types.process_config_from_environment), - ], - }, - Box::new(self::interactive_process::interactive_process), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("docker_resolve_image"), - product: types.docker_resolve_image_result, - inputs: vec![DependencyKey::new(types.docker_resolve_image_request)], - }, - Box::new(self::docker::docker_resolve_image), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("parse_python_deps"), - product: types.parsed_python_deps_result, - inputs: vec![DependencyKey::new(types.deps_request)], - }, - Box::new(self::dep_inference::parse_python_deps), - ); - intrinsics.insert( - Intrinsic { - id: RuleId::new("parse_javascript_deps"), - product: types.parsed_javascript_deps_result, - inputs: vec![DependencyKey::new(types.deps_request)], - }, - Box::new(self::dep_inference::parse_javascript_deps), - ); - Intrinsics { intrinsics } - } +pub use interactive_process::interactive_process_inner; - pub fn keys(&self) -> impl Iterator { - self.intrinsics.keys() - } +pub fn register(py: Python, m: &PyModule) -> PyResult<()> { + dep_inference::register(py, m)?; + digests::register(py, m)?; + docker::register(py, m)?; + interactive_process::register(py, m)?; + process::register(py, m)?; + values::register(py, m)?; - pub async fn run( - &self, - intrinsic: &Intrinsic, - context: Context, - args: Vec, - ) -> NodeResult { - let function = self - .intrinsics - .get(intrinsic) - .unwrap_or_else(|| panic!("Unrecognized intrinsic: {intrinsic:?}")); - function(context, args).await - } + Ok(()) } diff --git a/src/rust/engine/src/intrinsics/process.rs b/src/rust/engine/src/intrinsics/process.rs index 6340a3209ea2..4bdd6103e266 100644 --- a/src/rust/engine/src/intrinsics/process.rs +++ b/src/rust/engine/src/intrinsics/process.rs @@ -3,32 +3,33 @@ use std::time::Duration; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use futures::future::TryFutureExt; use futures::try_join; -use pyo3::{IntoPy, Python}; +use pyo3::prelude::{pyfunction, wrap_pyfunction, IntoPy, PyModule, PyResult, Python}; -use crate::context::Context; -use crate::externs; -use crate::nodes::{ExecuteProcess, NodeResult, Snapshot}; +use crate::externs::{self, PyGeneratorResponseNativeCall}; +use crate::nodes::{task_get_context, ExecuteProcess, NodeResult, Snapshot}; use crate::python::Value; -pub(crate) fn process_request_to_process_result( - context: Context, - mut args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(process_request_to_process_result, m)?)?; + + Ok(()) +} + +#[pyfunction] +fn process_request_to_process_result( + process: Value, + process_config: Value, +) -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { + let context = task_get_context(); + let process_config: externs::process::PyProcessExecutionEnvironment = - Python::with_gil(|py| { - args.pop() - .unwrap() - .as_ref() - .extract(py) - .map_err(|e| format!("{e}")) - })?; - let process_request = - ExecuteProcess::lift(&context.core.store(), args.pop().unwrap(), process_config) - .map_err(|e| e.enrich("Error lifting Process")) - .await?; + Python::with_gil(|py| process_config.extract(py)).map_err(|e| format!("{e}"))?; + let process_request = ExecuteProcess::lift(&context.core.store(), process, process_config) + .map_err(|e| e.enrich("Error lifting Process")) + .await?; let result = context.get(process_request).await?.result; @@ -77,6 +78,5 @@ pub(crate) fn process_request_to_process_result( ], )) }) - } - .boxed() + }) } diff --git a/src/rust/engine/src/intrinsics/values.rs b/src/rust/engine/src/intrinsics/values.rs index 20cfd0d6ac17..2c6192a86de8 100644 --- a/src/rust/engine/src/intrinsics/values.rs +++ b/src/rust/engine/src/intrinsics/values.rs @@ -1,19 +1,24 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use futures::future::{BoxFuture, FutureExt}; +use pyo3::prelude::{pyfunction, wrap_pyfunction, PyModule, PyResult, Python}; -use crate::context::Context; -use crate::nodes::{NodeResult, RunId, SessionValues}; -use crate::python::Value; +use crate::externs::PyGeneratorResponseNativeCall; +use crate::nodes::{task_get_context, RunId, SessionValues}; -pub(crate) fn session_values( - context: Context, - _args: Vec, -) -> BoxFuture<'static, NodeResult> { - async move { context.get(SessionValues).await }.boxed() +pub fn register(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(session_values, m)?)?; + m.add_function(wrap_pyfunction!(run_id, m)?)?; + + Ok(()) +} + +#[pyfunction] +fn session_values() -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { task_get_context().get(SessionValues).await }) } -pub(crate) fn run_id(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult> { - async move { context.get(RunId).await }.boxed() +#[pyfunction] +fn run_id() -> PyGeneratorResponseNativeCall { + PyGeneratorResponseNativeCall::new(async move { task_get_context().get(RunId).await }) } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 295609f817e1..7e0902bfded3 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -23,9 +23,8 @@ mod types; pub use crate::context::{ Context, Core, ExecutionStrategyOptions, LocalStoreOptions, RemotingOptions, SessionCore, }; -pub use crate::intrinsics::Intrinsics; pub use crate::python::{Failure, Function, Key, Params, TypeId, Value}; pub use crate::scheduler::{ExecutionRequest, ExecutionTermination, Scheduler}; pub use crate::session::Session; -pub use crate::tasks::{Intrinsic, Rule, Tasks}; +pub use crate::tasks::{Rule, Tasks}; pub use crate::types::Types; diff --git a/src/rust/engine/src/nodes/mod.rs b/src/rust/engine/src/nodes/mod.rs index f5079dd78892..fd0c81b7ecc9 100644 --- a/src/rust/engine/src/nodes/mod.rs +++ b/src/rust/engine/src/nodes/mod.rs @@ -23,7 +23,7 @@ use crate::context::{Context, SessionCore}; use crate::externs; use crate::externs::engine_aware::{EngineAwareParameter, EngineAwareReturnType}; use crate::python::{display_sorted_in_parens, throw, Failure, Key, Params, TypeId, Value}; -use crate::tasks::{self, Rule}; +use crate::tasks::Rule; // Sub-modules for the differnt node kinds. mod digest_file; @@ -51,6 +51,7 @@ pub use self::task::Task; tokio::task_local! { static TASK_SIDE_EFFECTED: Arc; + static TASK_CONTEXT: Arc; } pub fn task_side_effected() -> Result<(), String> { @@ -65,15 +66,23 @@ pub fn task_side_effected() -> Result<(), String> { }) } -pub async fn maybe_side_effecting>( +pub fn task_get_context() -> Context { + TASK_CONTEXT.with(|c| (**c).clone()) +} + +pub async fn task_context>( + context: Context, is_side_effecting: bool, side_effected: &Arc, f: F, ) -> T { + let context = Arc::new(context); if is_side_effecting { - TASK_SIDE_EFFECTED.scope(side_effected.clone(), f).await + TASK_SIDE_EFFECTED + .scope(side_effected.clone(), TASK_CONTEXT.scope(context, f)) + .await } else { - f.await + TASK_CONTEXT.scope(context, f).await } } @@ -121,43 +130,18 @@ async fn select( }); match entry.as_ref() { &rule_graph::Entry::WithDeps(wd) => match wd.as_ref() { - rule_graph::EntryWithDeps::Rule(ref rule) => match rule.rule() { - tasks::Rule::Task(task) => { - context - .get(Task { - params: params.clone(), - args, - args_arity, - task: *task, - entry: entry, - side_effected: Arc::new(AtomicBool::new(false)), - }) - .await - } - Rule::Intrinsic(intrinsic) => { - let values = future::try_join_all( - intrinsic - .inputs - .iter() - .map(|dependency_key| { - select_product( - context.clone(), - params.clone(), - dependency_key, - "intrinsic", - entry, - ) - }) - .collect::>(), - ) - .await?; - context - .core - .intrinsics - .run(intrinsic, context.clone(), values) - .await - } - }, + rule_graph::EntryWithDeps::Rule(ref rule) => { + context + .get(Task { + params: params.clone(), + args, + args_arity, + task: rule.rule().0, + entry: entry, + side_effected: Arc::new(AtomicBool::new(false)), + }) + .await + } rule_graph::EntryWithDeps::Reentry(reentry) => { select_reentry(context, params, &reentry.query).await } @@ -208,32 +192,6 @@ fn select_reentry( .boxed() } -fn select_product<'a>( - context: Context, - params: Params, - dependency_key: &'a DependencyKey, - caller_description: &'a str, - entry: Intern>, -) -> BoxFuture<'a, NodeResult> { - let edges = context - .core - .rule_graph - .edges_for_inner(&entry) - .ok_or_else(|| { - throw(format!( - "Tried to request {dependency_key} for {caller_description} but found no edges" - )) - }); - async move { - let edges = edges?; - let entry = edges.entry_for(dependency_key).unwrap_or_else(|| { - panic!("{caller_description} did not declare a dependency on {dependency_key:?}") - }); - select(context, None, 0, params, entry).await - } - .boxed() -} - pub fn lift_directory_digest(digest: &PyAny) -> Result { let py_digest: externs::fs::PyDigest = digest.extract().map_err(|e| format!("{e}"))?; Ok(py_digest.0) diff --git a/src/rust/engine/src/nodes/task.rs b/src/rust/engine/src/nodes/task.rs index a54f8593ff4f..d3614f6308c7 100644 --- a/src/rust/engine/src/nodes/task.rs +++ b/src/rust/engine/src/nodes/task.rs @@ -15,7 +15,7 @@ use pyo3::{IntoPy, ToPyObject}; use rule_graph::DependencyKey; use workunit_store::{in_workunit, Level, RunningWorkunit}; -use super::{maybe_side_effecting, select, NodeKey, NodeResult, Params}; +use super::{select, task_context, NodeKey, NodeResult, Params}; use crate::context::Context; use crate::externs::engine_aware::EngineAwareReturnType; use crate::externs::{self, GeneratorInput, GeneratorResponse}; @@ -171,6 +171,19 @@ impl Task { externs::generator_send(py, &context.core.types.coroutine, &generator, input) })?; match response { + GeneratorResponse::NativeCall(call) => { + let _blocking_token = workunit.blocking(); + let result = (call.call).await; + match result { + Ok(value) => { + input = GeneratorInput::Arg(value); + } + Err(throw @ Failure::Throw { .. }) => { + input = GeneratorInput::Err(PyErr::from(throw)); + } + Err(failure) => break Err(failure), + } + } GeneratorResponse::Call(call) => { let _blocking_token = workunit.blocking(); let result = Self::gen_call(context, params.clone(), entry, call).await; @@ -257,8 +270,11 @@ impl Task { let args = self.args; - let (mut result_val, mut result_type) = - maybe_side_effecting(self.task.side_effecting, &self.side_effected, async move { + let (mut result_val, mut result_type) = task_context( + context.clone(), + self.task.side_effecting, + &self.side_effected, + async move { Python::with_gil(|py| { let func = (*self.task.func.0.value).as_ref(py); @@ -283,11 +299,13 @@ impl Task { }) .map_err(Failure::from) }) - }) - .await?; + }, + ) + .await?; if result_type == context.core.types.coroutine { - let (new_val, new_type) = maybe_side_effecting( + let (new_val, new_type) = task_context( + context.clone(), self.task.side_effecting, &self.side_effected, Self::generate(&context, workunit, params, self.entry, result_val), diff --git a/src/rust/engine/src/tasks.rs b/src/rust/engine/src/tasks.rs index 27721ec01396..2cdbca8c7cf7 100644 --- a/src/rust/engine/src/tasks.rs +++ b/src/rust/engine/src/tasks.rs @@ -3,7 +3,6 @@ use std::fmt; -use crate::intrinsics::Intrinsics; use crate::python::{Function, TypeId}; use deepsize::DeepSizeOf; @@ -13,59 +12,47 @@ use log::Level; use rule_graph::{DependencyKey, DisplayForGraph, DisplayForGraphArgs, Query, RuleId}; #[derive(DeepSizeOf, Eq, Hash, PartialEq, Clone, Debug)] -pub enum Rule { - // Intrinsic rules are implemented in rust. - Intrinsic(Intern), - // Task rules are implemented in python. - Task(Intern), -} +pub struct Rule(pub Intern); impl DisplayForGraph for Rule { fn fmt_for_graph(&self, display_args: DisplayForGraphArgs) -> String { - match self { - Rule::Task(ref task) => { - let task_name = task.func.full_name(); - let product = format!("{}", task.product); - - let clause_portion = Self::formatted_positional_arguments( - task.args.iter().map(|(_name, dk)| dk), - display_args, - ); - - let get_clauses = task - .gets - .iter() - .map(::std::string::ToString::to_string) - .collect::>(); - - let get_portion = if get_clauses.is_empty() { - "".to_string() - } else if get_clauses.len() > 1 { - format!( - ",{}gets=[{}{}{}]", - display_args.line_separator(), - display_args.optional_line_separator(), - get_clauses.join(&format!(",{}", display_args.line_separator())), - display_args.optional_line_separator(), - ) - } else { - format!(", gets=[{}]", get_clauses.join(", ")) - }; - - let rule_type = if task.cacheable { - "rule".to_string() - } else { - "goal_rule".to_string() - }; - - format!("@{rule_type}({task_name}({clause_portion}) -> {product}{get_portion})",) - } - Rule::Intrinsic(ref intrinsic) => format!( - "@rule(({}) -> {})", - Self::formatted_positional_arguments(&intrinsic.inputs, display_args), - intrinsic.product, - ), - } + let task = &self.0; + + let task_name = task.func.full_name(); + let product = format!("{}", task.product); + + let clause_portion = Self::formatted_positional_arguments( + task.args.iter().map(|(_name, dk)| dk), + display_args, + ); + + let get_clauses = task + .gets + .iter() + .map(::std::string::ToString::to_string) + .collect::>(); + + let get_portion = if get_clauses.is_empty() { + "".to_string() + } else if get_clauses.len() > 1 { + format!( + ",{}gets=[{}{}{}]", + display_args.line_separator(), + display_args.optional_line_separator(), + get_clauses.join(&format!(",{}", display_args.line_separator())), + display_args.optional_line_separator(), + ) + } else { + format!(", gets=[{}]", get_clauses.join(", ")) + }; + + let rule_type = if task.cacheable { + "rule".to_string() + } else { + "goal_rule".to_string() + }; + + format!("@{rule_type}({task_name}({clause_portion}) -> {product}{get_portion})",) } } @@ -73,51 +60,33 @@ impl rule_graph::Rule for Rule { type TypeId = TypeId; fn id(&self) -> &RuleId { - match self { - Rule::Task(task) => &task.id, - Rule::Intrinsic(intrinsic) => &intrinsic.id, - } + &self.0.id } fn product(&self) -> TypeId { - match self { - Rule::Task(t) => t.product, - Rule::Intrinsic(i) => i.product, - } + self.0.product } fn dependency_keys(&self, explicit_args_arity: u16) -> Vec<&DependencyKey> { - match self { - Rule::Task(task) => task - .args - .iter() - .skip(explicit_args_arity.into()) - .map(|(_name, dk)| dk) - .chain(task.gets.iter()) - .collect(), - Rule::Intrinsic(intrinsic) => intrinsic.inputs.iter().collect(), - } + self.0 + .args + .iter() + .skip(explicit_args_arity.into()) + .map(|(_name, dk)| dk) + .chain(self.0.gets.iter()) + .collect() } fn masked_params(&self) -> Vec { - match self { - Rule::Task(task) => task.masked_types.clone(), - Rule::Intrinsic(_) => vec![], - } + self.0.masked_types.clone() } fn require_reachable(&self) -> bool { - match self { - Rule::Task(_) => true, - Rule::Intrinsic(_) => false, - } + true } fn color(&self) -> Option { - match self { - Rule::Task(_) => None, - Rule::Intrinsic(_) => Some(rule_graph::Palette::Gray), - } + None } } @@ -175,28 +144,8 @@ pub struct DisplayInfo { pub level: Level, } -#[derive(DeepSizeOf, Eq, Hash, PartialEq, Clone, Debug)] -pub struct Intrinsic { - pub id: RuleId, - pub product: TypeId, - pub inputs: Vec>, -} - -impl Intrinsic { - pub fn new(name: &str, product: TypeId, input: TypeId) -> Self { - // TODO: Python rule code that calls an intrinsic by name will need to be - // able to import that name, so we'll need to create stubs representing the - // intrinsics, in pants.engine.intrinsics.py. - Self { - id: RuleId::from_string(format!("pants.engine.intrinsics:{}", name)), - product, - inputs: vec![DependencyKey::new(input)], - } - } -} - /// -/// Registry of native (rust) Intrinsic tasks and user (python) Tasks. +/// Registry of user Tasks. /// #[derive(Clone, Debug)] pub struct Tasks { @@ -233,13 +182,6 @@ impl Tasks { &self.queries } - pub fn intrinsics_set(&mut self, intrinsics: &Intrinsics) { - for intrinsic in intrinsics.keys() { - self.rules - .insert(Rule::Intrinsic(Intern::new(intrinsic.clone()))); - } - } - /// /// The following methods define the Task registration lifecycle. /// @@ -328,7 +270,7 @@ impl Tasks { .preparing .take() .expect("Must `begin()` a task creation before ending it!"); - self.rules.insert(Rule::Task(Intern::new(task))); + self.rules.insert(Rule(Intern::new(task))); } pub fn query_add(&mut self, product: TypeId, params: Vec) {