Skip to content

Commit

Permalink
pycompss basic translator
Browse files Browse the repository at this point in the history
  • Loading branch information
danielrosendo committed Dec 16, 2024
1 parent c543825 commit de98541
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
105 changes: 105 additions & 0 deletions wfcommons/wfbench/translator/pycompss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import pathlib
import shutil

from logging import Logger
from typing import Dict, Optional, Union

from .abstract_translator import Translator
from ...common import Workflow

this_dir = pathlib.Path(__file__).resolve().parent


class PyCompssTranslator(Translator):
"""
A WfFormat parser for creating PyCOMPSs workflow applications.
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)
self.parsed_tasks = []
self.task_counter = 1
self.output_files_map = {}

def translate(self, output_folder: pathlib.Path) -> None:
"""
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
:type output_folder: pathlib.Path
"""
self.script = "\n# workflow tasks\n"
# PyCOMPSs translator
self._pycompss_code()

# Generates pycompss workflow file: template + script
with open(this_dir.joinpath("templates/pycompss_template.py")) as fp:
run_workflow_code = fp.read()
run_workflow_code = run_workflow_code.replace("# Generated code goes here", self.script)
# write benchmark files
output_folder.mkdir(parents=True)
with open(output_folder.joinpath("pycompss_workflow.py"), "w") as fp:
fp.write(run_workflow_code)
# additional files
self._copy_binary_files(output_folder)
self._generate_input_files(output_folder)

def _pycompss_code(self) -> None:
# GENERATES PYCOMPSS TASKS (functions)
all_pycompss_tasks_as_functions = []
for task in self.tasks.values():
# @task parameters
if len(task.input_files) > 0 and len(task.output_files) > 0:
self.script += f"@task(filePath=FILE_INOUT)\n"
elif len(task.input_files) > 0:
self.script += f"@task(filePath=FILE_IN)\n"
elif len(task.output_files) > 0:
self.script += f"@task(filePath=FILE_OUT)\n"
else:
self.script += f"@task\n"
# function name
# function_name = f"{task.name}_{task.task_id}"
function_name = task.name
# function parameters
function_parameter_names = ""
for i in range(len(task.input_files)):
if len(task.input_files) == 1:
function_parameter_names += f"file{i}"
else:
if i == 0:
function_parameter_names += f"file{i}"
else:
function_parameter_names += f", file{i}"
self.script += f"def {function_name}"
self.script += "("
self.script += function_parameter_names
self.script += "):\n"
# function body
self.script += f"\tpass\n\n"
# PYCOMPSS TASKS METHOD CALL DEFINITION
function_parameters = ""
for i in range(len(task.input_files)):
if len(task.input_files) == 1:
function_parameters += f"{task.input_files[i].file_id}"
else:
if i == 0:
function_parameters += f"{task.input_files[i].file_id}"
else:
function_parameters += f", {task.input_files[i].file_id}"
all_pycompss_tasks_as_functions.append(f"{function_name}({function_parameters})")

# INVOKE PYCOMPSS TASKS (functions)
self.script += f"\n\ndef main_program():\n"
for func in all_pycompss_tasks_as_functions:
self.script += f"\t{func}\n"

# CALL TO MAIN METHOD
self.script += f"\n\nif __name__ == \"__main__\":\n"
self.script += f"\tmain_program()\n"
32 changes: 32 additions & 0 deletions wfcommons/wfbench/translator/templates/pycompss_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# IMPORTS
import sys

# PyCOMPSs imports
# All task arguments: https://compss-doc.readthedocs.io/en/stable/Sections/02_App_Development/02_Python/01_1_Task_definition/Sections/04_Task_parameters_summary.html
from pycompss.api.task import task
from pycompss.api.constraint import constraint
from pycompss.api.binary import binary
from pycompss.api.mpi import mpi
from pycompss.api.parameter import *
# All API functions: https://compss-doc.readthedocs.io/en/stable/Sections/02_App_Development/02_Python/01_2_Synchronization/01_API.html?highlight=compss_open#api-summary
from pycompss.api.api import compss_open
from pycompss.api.api import compss_wait_on
from pycompss.api.api import compss_barrier

# @binary
# @constraint(computing_units=24)
# @mpi(runner="mpirun", binary="gmx_mpi", computing_nodes=1)
# @task
# def task_id():
# pass
#
# def main_program(arg1, arg2):
# # Execute task
# task_id()
#
# if __name__ == "__main__":
# arg1 = sys.argv[1]
# arg2 = sys.argv[2]
# main_program(arg1, arg2)

# Generated code goes here

0 comments on commit de98541

Please sign in to comment.