Skip to content

Commit

Permalink
Expose intrinsics in intrinsics.py via a native call response for g…
Browse files Browse the repository at this point in the history
…enerators.
  • Loading branch information
stuhood committed May 25, 2024
1 parent 3b0ebf3 commit d26ef1d
Show file tree
Hide file tree
Showing 16 changed files with 755 additions and 782 deletions.
126 changes: 126 additions & 0 deletions src/python/pants/engine/intrinsics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

from pants.engine.internals import native_engine
from pants.engine.rules import collect_rules, rule, _uncacheable_rule
from pants.engine.fs import Digest, Paths, Snapshot, DigestContents, DigestEntries, MergeDigests, RemovePrefix, AddPrefix, DigestSubset, CreateDigest, PathGlobs, NativeDownloadFile
from pants.engine.process import FallibleProcessResult, InteractiveProcessResult, InteractiveProcess
from pants.engine.internals.native_engine import NativeDependenciesRequest
from pants.engine.internals.session import SessionValues, RunId
from pants.engine.process import (
Process,
ProcessExecutionEnvironment,
ProcessExecutionFailure,
ProcessResultMetadata,
)
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
from pants.engine.internals.native_dep_inference import (
NativeParsedJavascriptDependencies,
NativeParsedPythonDependencies,
)


@rule
async def create_digest_to_digest(
create_digest: CreateDigest,
) -> Digest:
return await native_engine.create_digest_to_digest(create_digest)


@rule
async def path_globs_to_digest(
path_globs: PathGlobs,
) -> Digest:
return await native_engine.path_globs_to_digest(path_globs)


@rule
async def path_globs_to_paths(
path_globs: PathGlobs,
) -> Paths:
return await native_engine.path_globs_to_paths(path_globs)


@rule
async def download_file_to_digest(
native_download_file: NativeDownloadFile,
) -> Digest:
return await native_engine.download_file_to_digest(native_download_file)


@rule
async def digest_to_snapshot(digest: Digest) -> Snapshot:
return await native_engine.digest_to_snapshot(digest)


@rule
async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents:
return await native_engine.directory_digest_to_digest_contents(digest)


@rule
async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries:
return await native_engine.directory_digest_to_digest_entries(digest)


@rule
async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest:
return await native_engine.merge_digests_request_to_digest(merge_digests)


@rule
async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest:
return await native_engine.remove_prefix_request_to_digest(remove_prefix)


@rule
async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest:
return await native_engine.add_prefix_request_to_digest(add_prefix)


@rule
async def process_request_to_process_result(process: Process, process_execution_environment: ProcessExecutionEnvironment) -> FallibleProcessResult:
return await native_engine.process_request_to_process_result(process, process_execution_environment)


@rule
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest:
return await native_engine.digest_subset_to_digest(digest_subset)


@rule
async def session_values() -> SessionValues:
return await native_engine.session_values()


@rule
async def run_id() -> RunId:
return await native_engine.run_id()


@_uncacheable_rule
async def interactive_process(process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment) -> InteractiveProcessResult:
return await native_engine.interactive_process(process, process_execution_environment)


@rule
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult:
return await native_engine.docker_resolve_image(request)


@rule
async def parse_python_deps(deps_request: NativeDependenciesRequest) -> NativeParsedPythonDependencies:
return await native_engine.parse_python_deps(deps_request)


@rule
async def parse_javascript_deps(deps_request: NativeDependenciesRequest) -> NativeParsedJavascriptDependencies:
return await native_engine.parse_javascript_deps(deps_request)


def rules():
return [
*collect_rules(),
]
2 changes: 2 additions & 0 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pants.engine.environment import EnvironmentName
from pants.engine.fs import PathGlobs, Snapshot, Workspace
from pants.engine.goal import CurrentExecutingGoals, Goal
from pants.engine import intrinsics
from pants.engine.internals import (
build_files,
dep_rules,
Expand Down Expand Up @@ -276,6 +277,7 @@ def current_executing_goals(session_values: SessionValues) -> CurrentExecutingGo
rules = FrozenOrderedSet(
(
*collect_rules(locals()),
*intrinsics.rules(),
*build_files.rules(),
*fs.rules(),
*dep_rules.rules(),
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use crate::intrinsics::Intrinsics;
use crate::nodes::{ExecuteProcess, NodeKey, NodeOutput, NodeResult};
use crate::python::{throw, Failure};
use crate::session::{Session, Sessions};
Expand Down Expand Up @@ -62,7 +61,6 @@ pub struct Core {
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
pub intrinsics: Intrinsics,
pub executor: Executor,
store: Store,
/// The CommandRunners to use for execution, in ascending order of reliability (for the purposes
Expand Down Expand Up @@ -519,7 +517,6 @@ impl Core {
executor: Executor,
tasks: Tasks,
types: Types,
intrinsics: Intrinsics,
build_root: PathBuf,
ignore_patterns: Vec<String>,
use_gitignore: bool,
Expand Down Expand Up @@ -683,7 +680,6 @@ impl Core {
tasks,
rule_graph,
types,
intrinsics,
executor: executor.clone(),
store,
command_runners,
Expand Down
43 changes: 16 additions & 27 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType};
use pyo3::{create_exception, IntoPy, PyAny, PyRef};
use regex::Regex;
use remote::remote_cache::RemoteCacheWarningsBehavior;
use rule_graph::{self, DependencyKey, RuleGraph, RuleId};
use rule_graph::{self, RuleGraph};
use store::RemoteProvider;
use task_executor::Executor;
use workunit_store::{
Expand All @@ -49,14 +49,16 @@ use workunit_store::{

use crate::externs::fs::{possible_store_missing_digest, PyFileDigest};
use crate::externs::process::PyProcessExecutionEnvironment;
use crate::intrinsics;
use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination,
Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions,
Rule, Scheduler, Session, SessionCore, Tasks, TypeId, Types, Value,
Failure, Function, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
SessionCore, Tasks, TypeId, Types, Value,
};

#[pymodule]
fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> {
intrinsics::register(py, m)?;
externs::register(py, m)?;
externs::address::register(py, m)?;
externs::fs::register(m)?;
Expand Down Expand Up @@ -701,9 +703,7 @@ fn scheduler_create(
.borrow_mut()
.take()
.ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?;
let intrinsics = Intrinsics::new(&types);
let mut tasks = py_tasks.0.replace(Tasks::new());
tasks.intrinsics_set(&intrinsics);
let tasks = py_tasks.0.replace(Tasks::new());

// NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to
// use `tokio::spawn` since Python does not setup Tokio for the main thread. This also
Expand All @@ -716,7 +716,6 @@ fn scheduler_create(
py_executor.0.clone(),
tasks,
types,
intrinsics,
build_root,
ignore_patterns,
use_gitignore,
Expand Down Expand Up @@ -1022,22 +1021,13 @@ fn session_run_interactive_process(
let interactive_process: Value = interactive_process.into();
let process_config = Value::new(process_config_from_environment.into_py(py));
py.allow_threads(|| {
core.executor.clone().block_on(nodes::maybe_side_effecting(
true,
&Arc::new(std::sync::atomic::AtomicBool::new(true)),
core.intrinsics.run(
&Intrinsic {
id: RuleId::new("interactive_process"),
product: core.types.interactive_process_result,
inputs: vec![
DependencyKey::new(core.types.interactive_process),
DependencyKey::new(core.types.process_config_from_environment),
],
},
context,
vec![interactive_process, process_config],
),
))
core.executor
.clone()
.block_on(intrinsics::interactive_process_inner(
&context,
interactive_process,
process_config,
))
})
.map(|v| v.into())
.map_err(|e| PyException::new_err(e.to_string()))
Expand Down Expand Up @@ -1407,8 +1397,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
core.executor.enter(|| {
let result = PyDict::new(py);
for (rule, rule_dependencies) in core.rule_graph.rule_dependencies() {
let Rule::Task(task) = rule else { continue };

let task = rule.0;
let function = &task.func;
let mut dependencies = Vec::new();
for (dependency_key, rule) in rule_dependencies {
Expand All @@ -1421,7 +1410,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
{
continue;
}
let Rule::Task(task) = rule else { continue };
let function = &rule.0.func;

let provided_params = dependency_key
.provided_params
Expand All @@ -1431,7 +1420,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
dependencies.push((
dependency_key.product.as_py_type(py),
provided_params,
task.func.0.value.into_py(py),
function.0.value.into_py(py),
));
}
if dependencies.is_empty() {
Expand Down
50 changes: 50 additions & 0 deletions src/rust/engine/src/externs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::collections::BTreeMap;
use std::convert::TryInto;
use std::fmt;

use futures::future::{BoxFuture, Future};
use futures::FutureExt;
use lazy_static::lazy_static;
use pyo3::exceptions::{PyAssertionError, PyException, PyStopIteration, PyTypeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -42,6 +44,7 @@ pub mod workunits;

pub fn register(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyFailure>()?;
m.add_class::<PyGeneratorResponseNativeCall>()?;
m.add_class::<PyGeneratorResponseCall>()?;
m.add_class::<PyGeneratorResponseGet>()?;

Expand Down Expand Up @@ -334,6 +337,8 @@ pub(crate) fn generator_send(
// It isn't necessary to differentiate between `Get` and `Effect` here, as the static
// analysis of `@rule`s has already validated usage.
Ok(GeneratorResponse::Get(get.take()?))
} else if let Ok(call) = response.extract::<PyRef<PyGeneratorResponseNativeCall>>(py) {
Ok(GeneratorResponse::NativeCall(call.take()?))
} else if let Ok(get_multi) = response.downcast::<PySequence>(py) {
// Was an `All` or `MultiGet`.
let gogs = get_multi
Expand Down Expand Up @@ -463,6 +468,45 @@ fn interpret_get_inputs(
}
}

#[pyclass]
pub struct PyGeneratorResponseNativeCall(RefCell<Option<NativeCall>>);

impl PyGeneratorResponseNativeCall {
pub fn new(call: impl Future<Output = Result<Value, Failure>> + 'static + Send) -> Self {
Self(RefCell::new(Some(NativeCall { call: call.boxed() })))
}

fn take(&self) -> Result<NativeCall, String> {
self.0
.borrow_mut()
.take()
.ok_or_else(|| "A `NativeCall` may only be consumed once.".to_owned())
}
}

#[pymethods]
impl PyGeneratorResponseNativeCall {
fn __await__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> {
self_
}

fn __iter__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> {
self_
}

fn __next__(self_: PyRef<'_, Self>) -> Option<PyRef<'_, Self>> {
Some(self_)
}

fn send(&self, py: Python, value: PyObject) -> PyResult<()> {
// TODO: PyStopIteration::new_err(value) seems to attempt to interpret
// the value in some cases, which mangles the return type.
let locals = PyDict::new(py);
locals.set_item("v", value)?;
py.run("raise StopIteration(v)", None, Some(locals))
}
}

#[pyclass(subclass)]
pub struct PyGeneratorResponseCall(RefCell<Option<Call>>);

Expand Down Expand Up @@ -614,6 +658,10 @@ impl PyGeneratorResponseGet {
}
}

pub struct NativeCall {
pub call: BoxFuture<'static, Result<Value, Failure>>,
}

#[derive(Debug)]
pub struct Call {
pub rule_id: RuleId,
Expand Down Expand Up @@ -681,6 +729,8 @@ pub enum GetOrGenerator {
pub enum GeneratorResponse {
/// The generator has completed with the given value of the given type.
Break(Value, TypeId),
/// The generator is awaiting a call to a unknown native function.
NativeCall(NativeCall),
/// The generator is awaiting a call to a known rule.
Call(Call),
/// The generator is awaiting a call to an unknown rule.
Expand Down
Loading

0 comments on commit d26ef1d

Please sign in to comment.