Skip to content

Commit

Permalink
WIP poe scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
nat-n committed Oct 12, 2024
1 parent 21ccd40 commit 6bea3ef
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 46 deletions.
11 changes: 10 additions & 1 deletion poethepoet/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,18 @@ def get_executor(
invocation: Tuple[str, ...],
env: "EnvVarsManager",
working_dir: Path,
*,
executor_config: Optional[Mapping[str, str]] = None,
capture_stdout: Union[str, bool] = False,
delegate_dry_run: bool = False,
) -> "PoeExecutor":
"""
Get an Executor object for use with this invocation.
if delegate_dry_run is set then the task will always be executed and be
entrusted to not have any side effects when the dry-run flag is provided.
"""

from .executor import PoeExecutor

if not executor_config:
Expand All @@ -108,5 +117,5 @@ def get_executor(
env=env,
working_dir=working_dir,
capture_stdout=capture_stdout,
dry=self.dry,
dry=False if delegate_dry_run else self.dry,
)
3 changes: 3 additions & 0 deletions poethepoet/env/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__( # TODO: check if we still need all these args!
self._vars["POE_CWD"] = self.cwd
self._vars["POE_PWD"] = self.cwd

if self._ui:
self._vars["POE_VERBOSITY"] = str(self._ui.verbosity)

self._git_repo = GitRepo(config.project_dir)

def __getitem__(self, key):
Expand Down
91 changes: 83 additions & 8 deletions poethepoet/helpers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
import ast
import re
import sys
from typing import Any, Collection, Container, Dict, Iterator, List, Optional, Tuple
from typing import (
Any,
Collection,
Container,
Dict,
Iterator,
List,
NamedTuple,
Optional,
Tuple,
cast,
)

from ..exceptions import ExpressionParseError

Expand Down Expand Up @@ -67,21 +78,77 @@
Substitution = Tuple[Tuple[int, int], str]


class FunctionCall(NamedTuple):
"""
Model for a python expression consisting of a function call
"""

expression: str
function_ref: str
referenced_args: Tuple[str, ...] = tuple()
referenced_globals: Tuple[str, ...] = tuple()

@classmethod
def parse(
cls,
source: str,
arguments: Container[str],
*,
args_prefix: str = "__args.",
allowed_vars: Container[str] = tuple(),
) -> "FunctionCall":
root_node = cast(ast.Call, parse_and_validate(source, True, "script"))
name_nodes = _validate_nodes_and_get_names(root_node, source)

substitutions: List[Substitution] = []
referenced_args: List[str] = []
referenced_globals: List[str] = []
for node in name_nodes:
if node.id in arguments:
substitutions.append(
(_get_name_node_abs_range(source, node), args_prefix + node.id)
)
referenced_args.append(node.id)
elif node.id in _ALLOWED_BUILTINS or node.id in allowed_vars:
referenced_globals.append(node.id)
else:
raise ExpressionParseError(
"Invalid variable reference in script: "
+ _get_name_source_segment(source, node)
)

# Prefix references to arguments with args_prefix
expression = _apply_substitutions(source, substitutions)

ref_parts = []
func_node = root_node.func
while isinstance(func_node, ast.Attribute):
ref_parts.append(func_node.attr)
func_node = func_node.value
assert isinstance(func_node, ast.Name)
function_ref = ".".join((func_node.id, *reversed(ref_parts)))

return cls(
expression=_clean_linebreaks(expression),
function_ref=function_ref,
referenced_args=tuple(referenced_args),
referenced_globals=tuple(referenced_globals),
)


def resolve_expression(
source: str,
arguments: Container[str],
*,
call_only: bool = True,
args_prefix: str = "__args.",
allowed_vars: Container[str] = tuple(),
):
) -> str:
"""
Validate function call and substitute references to arguments with their namespaced
counterparts (e.g. `my_arg` => `__args.my_arg`).
"""

task_type = "script" if call_only else "expr"
root_node = parse_and_validate(source, call_only, task_type)
root_node = parse_and_validate(source, False, "expr")
name_nodes = _validate_nodes_and_get_names(root_node, source)

substitutions: List[Substitution] = []
Expand All @@ -92,12 +159,12 @@ def resolve_expression(
)
elif node.id not in _ALLOWED_BUILTINS and node.id not in allowed_vars:
raise ExpressionParseError(
f"Invalid variable reference in {task_type}: "
"Invalid variable reference in expr: "
+ _get_name_source_segment(source, node)
)

# Prefix references to arguments with args_prefix
return _apply_substitutions(source, substitutions)
return _clean_linebreaks(_apply_substitutions(source, substitutions))


def parse_and_validate(
Expand Down Expand Up @@ -246,7 +313,7 @@ def _validate_nodes_and_get_names(
)


def _apply_substitutions(content: str, subs: List[Substitution]):
def _apply_substitutions(content: str, subs: List[Substitution]) -> str:
"""
Returns a copy of content with all of the substitutions applied.
Uses a single pass for efficiency.
Expand Down Expand Up @@ -319,3 +386,11 @@ def _get_name_source_segment(source: str, node: ast.Name):
partial_result = partial_result[:-1]

return partial_result


def _clean_linebreaks(expression: str):
"""
Strip out any new lines because they can be problematic on windows
"""
expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression)
return re.sub(r"(\r\n|\r|\n)", " ", expression)
48 changes: 48 additions & 0 deletions poethepoet/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# ruff: noqa: E501
from pathlib import Path
from typing import Union


def rm(
*patterns: str,
cwd: str = ".",
verbosity: Union[int, str] = 0,
dry_run: bool = False,
):
"""
This function is intended for use in a script task to delete files and directories
matching the given patterns, as a platform agnostic alternative to the `rm -rf`
Example usage:
.. code-block:: toml
[tool.poe.tasks.clean]
script = "poethepoet.scripts:rm('.mypy_cache', '.pytest_cache', './**/__pycache__')"
"""
verbosity = int(verbosity)

for pattern in patterns:
matches = list(Path(cwd).glob(pattern))
if verbosity > 0 and not matches:
print(f"No files or directories to delete matching {pattern!r}")
elif verbosity >= 0 and len(matches) > 1:
print(f"Deleting paths matching {pattern!r}")

for match in matches:
_delete_path(match, verbosity, dry_run)


def _delete_path(path: Path, verbosity: int, dry_run: bool):
import shutil

if path.is_dir():
if verbosity > 0:
print(f"Deleting directory '{path}'")
if not dry_run:
shutil.rmtree(path)
else:
if verbosity > 0:
print(f"Deleting file '{path}'")
if not dry_run:
path.unlink()
9 changes: 8 additions & 1 deletion poethepoet/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,13 +485,20 @@ def _handle_run(
"""
raise NotImplementedError

def _get_executor(self, context: "RunContext", env: "EnvVarsManager"):
def _get_executor(
self,
context: "RunContext",
env: "EnvVarsManager",
*,
delegate_dry_run: bool = False,
):
return context.get_executor(
self.invocation,
env,
working_dir=self.get_working_dir(env),
executor_config=self.spec.options.get("executor"),
capture_stdout=self.capture_stdout,
delegate_dry_run=delegate_dry_run,
)

def get_working_dir(
Expand Down
1 change: 0 additions & 1 deletion poethepoet/task/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def parse_content(
expression = resolve_expression(
source=expression,
arguments=set(args or tuple()),
call_only=False,
allowed_vars={"sys", "__env", *imports},
)
# Strip out any new lines because they can be problematic on windows
Expand Down
45 changes: 24 additions & 21 deletions poethepoet/task/script.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
import shlex
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple

Expand All @@ -9,6 +8,7 @@
from ..config import PoeConfig
from ..context import RunContext
from ..env.manager import EnvVarsManager
from ..helpers.python import FunctionCall
from .base import TaskSpecFactory


Expand Down Expand Up @@ -68,7 +68,7 @@ def _handle_run(
# TODO: do something about extra_args, error?

target_module, function_call = self.parse_content(named_arg_values)
function_ref = function_call[: function_call.index("(")]
function_ref = function_call.function_ref

argv = [
self.name,
Expand All @@ -78,16 +78,20 @@ def _handle_run(
# TODO: check whether the project really does use src layout, and don't do
# sys.path.append('src') if it doesn't

has_dry_run_ref = "_dry_run" in function_call.referenced_globals
dry_run = self.ctx.ui["dry_run"]

script = [
"import asyncio,os,sys;",
"from inspect import iscoroutinefunction as _c;",
"from os import environ;",
"from importlib import import_module as _i;",
f"_dry_run = {'True' if dry_run else 'False'};" if has_dry_run_ref else "",
f"sys.argv = {argv!r}; sys.path.append('src');",
f"{format_class(named_arg_values)}",
f"_m = _i('{target_module}');",
f"_r = asyncio.run(_m.{function_call}) if _c(_m.{function_ref})",
f" else _m.{function_call};",
f"_r = asyncio.run(_m.{function_call.expression}) if _c(_m.{function_ref})",
f" else _m.{function_call.expression};",
]

if self.spec.options.get("print_result"):
Expand All @@ -99,19 +103,21 @@ def _handle_run(
cmd = ("python", "-c", "".join(script))

self._print_action(shlex.join(argv), context.dry)
return self._get_executor(context, env).execute(
cmd, use_exec=self.spec.options.get("use_exec", False)
)
return self._get_executor(
context, env, delegate_dry_run=has_dry_run_ref
).execute(cmd, use_exec=self.spec.options.get("use_exec", False))

def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]:
def parse_content(
self, args: Optional[Dict[str, Any]]
) -> Tuple[str, "FunctionCall"]:
"""
Returns the module to load, and the function call to execute.
Will raise an exception if the function call contains invalid syntax or
references variables that are not in scope.
"""

from ..helpers.python import resolve_expression
from ..helpers.python import FunctionCall

try:
target_module, target_ref = self.spec.content.strip().split(":", 1)
Expand All @@ -122,17 +128,14 @@ def parse_content(self, args: Optional[Dict[str, Any]]) -> Tuple[str, str]:

if target_ref.isidentifier():
if args:
return target_module, f"{target_ref}(**({args}))"
return target_module, f"{target_ref}()"

function_call = resolve_expression(
target_ref,
set(args or tuple()),
call_only=True,
allowed_vars={"sys", "os", "environ"},
)
# Strip out any new lines because they can be problematic on windows
function_call = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", function_call)
function_call = re.sub(r"(\r\n|\r|\n)", " ", function_call)
function_call = FunctionCall(f"{target_ref}(**({args}))", target_ref)
else:
function_call = FunctionCall(f"{target_ref}()", target_ref)
else:
function_call = FunctionCall.parse(
source=target_ref,
arguments=set(args or tuple()),
allowed_vars={"sys", "os", "environ", "_dry_run"},
)

return target_module, function_call
Loading

0 comments on commit 6bea3ef

Please sign in to comment.