From a9e6213b2da89c58e26f06dcf2ed05293ca8960f Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 24 Oct 2024 16:10:24 -0700 Subject: [PATCH 01/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/README.md | 1 + .../flytekitplugins/memray/__init__.py | 15 +++++ .../flytekitplugins/memray/profiling.py | 57 +++++++++++++++++++ plugins/flytekit-memray/setup.py | 37 ++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 plugins/flytekit-memray/README.md create mode 100644 plugins/flytekit-memray/flytekitplugins/memray/__init__.py create mode 100644 plugins/flytekit-memray/flytekitplugins/memray/profiling.py create mode 100644 plugins/flytekit-memray/setup.py diff --git a/plugins/flytekit-memray/README.md b/plugins/flytekit-memray/README.md new file mode 100644 index 0000000000..eca9dab7c6 --- /dev/null +++ b/plugins/flytekit-memray/README.md @@ -0,0 +1 @@ +# Memray \ No newline at end of file diff --git a/plugins/flytekit-memray/flytekitplugins/memray/__init__.py b/plugins/flytekit-memray/flytekitplugins/memray/__init__.py new file mode 100644 index 0000000000..a24c7c51d7 --- /dev/null +++ b/plugins/flytekit-memray/flytekitplugins/memray/__init__.py @@ -0,0 +1,15 @@ +""" +.. currentmodule:: flytekitplugins.wandb + +This package contains things that are useful when extending Flytekit. + +.. autosummary:: + :template: custom.rst + :toctree: generated/ + + wandb_init +""" + +from .profiling import mem_profiling + +__all__ = ["mem_profiling"] diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py new file mode 100644 index 0000000000..e092db76c8 --- /dev/null +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -0,0 +1,57 @@ +import os +from typing import Callable, Optional, Union +import memray +import time +from flytekit.core.context_manager import FlyteContextManager +from flytekit.core.utils import ClassDecorator +from flytekit import Deck + + +class mem_profiling(ClassDecorator): + + def __init__( + self, + task_function: Optional[Callable] = None, + **init_kwargs: dict, + ): + """Memray Profiling Plugin. + Args: + """ + self.init_kwargs = init_kwargs + + # All kwargs need to be passed up so that the function wrapping works for both + # `@wandb_init` and `@wandb_init(...)` + super().__init__( + task_function, + **init_kwargs, + ) + + def execute(self, *args, **kwargs): + ctx = FlyteContextManager.current_context() + is_local_execution = ctx.execution_state.is_local_execution() + + dir_name = "memray" + + if not os.path.exists(dir_name): + os.makedirs(dir_name) + + bin_filepath = f"{dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" + with memray.Tracker(bin_filepath): + output = self.task_function(*args, **kwargs) + + os.system(f"memray flamegraph {bin_filepath}") + with open(bin_filepath, "r", encoding="ISO-8859-1") as file: + html_content = file.read() + + Deck("flamegraph", html_content) + + # os.system(f"memray flamegraph {bin_filepath}") + # with open(bin_filepath, "r", encoding="ISO-8859-1") as file: + # html_content = file.read() + + # Deck("flamegraph", html_content) + + return output + + def get_extra_config(self): + return {} diff --git a/plugins/flytekit-memray/setup.py b/plugins/flytekit-memray/setup.py new file mode 100644 index 0000000000..10f8513b04 --- /dev/null +++ b/plugins/flytekit-memray/setup.py @@ -0,0 +1,37 @@ +from setuptools import setup + +PLUGIN_NAME = "memray" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["flytekit>=1.12.0", "memray"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="This package enables memory profiling for tasks with memray", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) From 08562b633c5ea5a267e6e6420a8f2a88d1b1c0d3 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 24 Oct 2024 16:29:52 -0700 Subject: [PATCH 02/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index e092db76c8..fb6de7bc2f 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -36,11 +36,13 @@ def execute(self, *args, **kwargs): os.makedirs(dir_name) bin_filepath = f"{dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" + html_filepath = bin_filepath.replace(".bin", ".html") + with memray.Tracker(bin_filepath): output = self.task_function(*args, **kwargs) - os.system(f"memray flamegraph {bin_filepath}") - with open(bin_filepath, "r", encoding="ISO-8859-1") as file: + os.system(f"memray flamegraph -o {html_filepath} {bin_filepath}") + with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() Deck("flamegraph", html_content) From d3a5b3c57ab93c235193e67c8ae9c63627a29693 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 24 Oct 2024 18:49:25 -0700 Subject: [PATCH 03/23] wip Signed-off-by: Jan Fiedler --- .../flytekitplugins/memray/profiling.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index fb6de7bc2f..ba15798de8 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -1,8 +1,7 @@ import os -from typing import Callable, Optional, Union +from typing import Callable, Optional import memray import time -from flytekit.core.context_manager import FlyteContextManager from flytekit.core.utils import ClassDecorator from flytekit import Deck @@ -27,33 +26,32 @@ def __init__( ) def execute(self, *args, **kwargs): - ctx = FlyteContextManager.current_context() - is_local_execution = ctx.execution_state.is_local_execution() dir_name = "memray" + memray_html_reporter = ["flamegraph", "table"] if not os.path.exists(dir_name): os.makedirs(dir_name) bin_filepath = f"{dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" - html_filepath = bin_filepath.replace(".bin", ".html") with memray.Tracker(bin_filepath): output = self.task_function(*args, **kwargs) - os.system(f"memray flamegraph -o {html_filepath} {bin_filepath}") - with open(html_filepath, "r", encoding="utf-8") as file: - html_content = file.read() + for reporter in memray_html_reporter: + self.generate_flytedeck_html(reporter=reporter, bin_filepath=bin_filepath) - Deck("flamegraph", html_content) - - # os.system(f"memray flamegraph {bin_filepath}") - # with open(bin_filepath, "r", encoding="ISO-8859-1") as file: - # html_content = file.read() + return output - # Deck("flamegraph", html_content) + def generate_flytedeck_html(self, reporter, bin_filepath): + html_filepath = bin_filepath.replace( + self.task_function.__name__, f"{reporter}.{self.task_function.__name__}" + ).replace(".bin", ".html") + os.system(f"memray {reporter} -o {html_filepath} {bin_filepath}") + with open(html_filepath, "r", encoding="utf-8") as file: + html_content = file.read() - return output + Deck(f"Memray {reporter.capitalize()}", html_content) def get_extra_config(self): return {} From 2917fea62bb9474d122e0f508e12a2d452134130 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 24 Oct 2024 22:12:14 -0700 Subject: [PATCH 04/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index ba15798de8..52f1110acd 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -50,6 +50,7 @@ def generate_flytedeck_html(self, reporter, bin_filepath): os.system(f"memray {reporter} -o {html_filepath} {bin_filepath}") with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() + html_content = html_content.replace("const packed_data", "var packed_data") Deck(f"Memray {reporter.capitalize()}", html_content) From 5e8f035815f864f7a18cbb37ba15af5317dc58f7 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 24 Oct 2024 22:27:29 -0700 Subject: [PATCH 05/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 52f1110acd..540f224555 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -50,7 +50,11 @@ def generate_flytedeck_html(self, reporter, bin_filepath): os.system(f"memray {reporter} -o {html_filepath} {bin_filepath}") with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() + # print(html_content.find("packed_data")) html_content = html_content.replace("const packed_data", "var packed_data") + html_content = html_content.replace( + "const merge_threads", "var merge_threads" + ) Deck(f"Memray {reporter.capitalize()}", html_content) From c9b903c00acd51f9eadae664f67631b79b2c47b4 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 25 Oct 2024 09:56:02 -0700 Subject: [PATCH 06/23] wip Signed-off-by: Jan Fiedler --- .../flytekit-memray/flytekitplugins/memray/profiling.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 540f224555..4b4adf96fa 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -28,7 +28,7 @@ def __init__( def execute(self, *args, **kwargs): dir_name = "memray" - memray_html_reporter = ["flamegraph", "table"] + memray_html_reporter = ["flamegraph"] if not os.path.exists(dir_name): os.makedirs(dir_name) @@ -50,11 +50,6 @@ def generate_flytedeck_html(self, reporter, bin_filepath): os.system(f"memray {reporter} -o {html_filepath} {bin_filepath}") with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() - # print(html_content.find("packed_data")) - html_content = html_content.replace("const packed_data", "var packed_data") - html_content = html_content.replace( - "const merge_threads", "var merge_threads" - ) Deck(f"Memray {reporter.capitalize()}", html_content) From 509ee63f89f679daa4788bb3c246fe775f85f081 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 25 Oct 2024 11:37:02 -0700 Subject: [PATCH 07/23] wip Signed-off-by: Jan Fiedler --- .../flytekitplugins/memray/profiling.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 4b4adf96fa..d9f6c11959 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -44,6 +44,13 @@ def execute(self, *args, **kwargs): return output def generate_flytedeck_html(self, reporter, bin_filepath): + html_reporter_constants = [ + "packed_data", + "merge_threads", + "memory_records", + "inverted", + "temporal", + ] html_filepath = bin_filepath.replace( self.task_function.__name__, f"{reporter}.{self.task_function.__name__}" ).replace(".bin", ".html") @@ -51,6 +58,12 @@ def generate_flytedeck_html(self, reporter, bin_filepath): with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() + for constant in html_reporter_constants: + html_content = html_content.replace(f"const {constant}", f"var {constant}") + + with open("output.html", "w") as f: + f.write(html_content) + Deck(f"Memray {reporter.capitalize()}", html_content) def get_extra_config(self): From 77fcfdd9b40d7c7943f482b363c84c9ea209be92 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 25 Oct 2024 11:38:04 -0700 Subject: [PATCH 08/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index d9f6c11959..06b0429f31 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -28,7 +28,7 @@ def __init__( def execute(self, *args, **kwargs): dir_name = "memray" - memray_html_reporter = ["flamegraph"] + memray_html_reporter = ["flamegraph", "table"] if not os.path.exists(dir_name): os.makedirs(dir_name) @@ -61,8 +61,8 @@ def generate_flytedeck_html(self, reporter, bin_filepath): for constant in html_reporter_constants: html_content = html_content.replace(f"const {constant}", f"var {constant}") - with open("output.html", "w") as f: - f.write(html_content) + # with open("output.html", "w") as f: + # f.write(html_content) Deck(f"Memray {reporter.capitalize()}", html_content) From 7744e45e696a66dbbdffd1d5d557a0eb94e267a5 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 25 Oct 2024 13:11:31 -0700 Subject: [PATCH 09/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 06b0429f31..313e3de80e 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -59,7 +59,7 @@ def generate_flytedeck_html(self, reporter, bin_filepath): html_content = file.read() for constant in html_reporter_constants: - html_content = html_content.replace(f"const {constant}", f"var {constant}") + html_content = html_content.replace(f"const {constant}", f"let {constant}") # with open("output.html", "w") as f: # f.write(html_content) From 7ed2420b7f17f0f291ec1841421c12f1a42d0b04 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 25 Oct 2024 13:20:06 -0700 Subject: [PATCH 10/23] wip Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 313e3de80e..4f133f0469 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -48,7 +48,7 @@ def generate_flytedeck_html(self, reporter, bin_filepath): "packed_data", "merge_threads", "memory_records", - "inverted", + # "inverted", "temporal", ] html_filepath = bin_filepath.replace( @@ -59,7 +59,7 @@ def generate_flytedeck_html(self, reporter, bin_filepath): html_content = file.read() for constant in html_reporter_constants: - html_content = html_content.replace(f"const {constant}", f"let {constant}") + html_content = html_content.replace(f"{constant}", f"{reporter}_{constant}") # with open("output.html", "w") as f: # f.write(html_content) From 45d9094e89b1c049390750bdbb1673cb3f249a74 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 28 Oct 2024 20:34:58 -0700 Subject: [PATCH 11/23] rename memray_profiling Signed-off-by: Jan Fiedler --- .../flytekitplugins/memray/__init__.py | 4 +- .../flytekitplugins/memray/profiling.py | 58 ++++++++++--------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/__init__.py b/plugins/flytekit-memray/flytekitplugins/memray/__init__.py index a24c7c51d7..e70d47ebe4 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/__init__.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/__init__.py @@ -10,6 +10,6 @@ wandb_init """ -from .profiling import mem_profiling +from .profiling import memray_profiling -__all__ = ["mem_profiling"] +__all__ = ["memray_profiling"] diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 4f133f0469..8f8d8c6bef 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -1,70 +1,72 @@ import os -from typing import Callable, Optional +from typing import Callable, Optional, List import memray import time from flytekit.core.utils import ClassDecorator from flytekit import Deck -class mem_profiling(ClassDecorator): +class memray_profiling(ClassDecorator): def __init__( self, task_function: Optional[Callable] = None, - **init_kwargs: dict, + memray_html_reporter: str = "flamegraph", + memray_reporter_args: Optional[List[str]] = [], ): """Memray Profiling Plugin. Args: """ - self.init_kwargs = init_kwargs + if memray_html_reporter not in ["flamegraph", "table"]: + raise ValueError( + f"{memray_html_reporter} is not a supported html reporter." + ) + + self.dir_name = "memray" + self.memray_html_reporter = memray_html_reporter + self.memray_reporter_args = memray_reporter_args # All kwargs need to be passed up so that the function wrapping works for both # `@wandb_init` and `@wandb_init(...)` super().__init__( task_function, - **init_kwargs, + memray_html_reporter=memray_html_reporter, + memray_reporter_args=memray_reporter_args, ) def execute(self, *args, **kwargs): - dir_name = "memray" - memray_html_reporter = ["flamegraph", "table"] - - if not os.path.exists(dir_name): - os.makedirs(dir_name) + if not os.path.exists(self.dir_name): + os.makedirs(self.dir_name) - bin_filepath = f"{dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" + bin_filepath = f"{self.dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" with memray.Tracker(bin_filepath): output = self.task_function(*args, **kwargs) - for reporter in memray_html_reporter: - self.generate_flytedeck_html(reporter=reporter, bin_filepath=bin_filepath) + self.generate_flytedeck_html( + reporter=self.memray_html_reporter, bin_filepath=bin_filepath + ) return output def generate_flytedeck_html(self, reporter, bin_filepath): - html_reporter_constants = [ - "packed_data", - "merge_threads", - "memory_records", - # "inverted", - "temporal", - ] html_filepath = bin_filepath.replace( self.task_function.__name__, f"{reporter}.{self.task_function.__name__}" ).replace(".bin", ".html") - os.system(f"memray {reporter} -o {html_filepath} {bin_filepath}") - with open(html_filepath, "r", encoding="utf-8") as file: - html_content = file.read() - for constant in html_reporter_constants: - html_content = html_content.replace(f"{constant}", f"{reporter}_{constant}") + memray_reporter_args_str = " ".join(self.memray_reporter_args) - # with open("output.html", "w") as f: - # f.write(html_content) + if ( + os.system( + f"memray {reporter} -o {html_filepath} {memray_reporter_args_str} {bin_filepath}" + ) + == 0 + ): + with open(html_filepath, "r", encoding="utf-8") as file: + html_content = file.read() - Deck(f"Memray {reporter.capitalize()}", html_content) + Deck(f"Memray {reporter.capitalize()}", html_content) def get_extra_config(self): return {} From fab1c5770f520e0ea7d4f15e51ac4e5ce3e799a2 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 29 Oct 2024 09:50:09 -0700 Subject: [PATCH 12/23] finish readme Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/README.md | 56 ++++++++++++++++++- .../flytekitplugins/memray/profiling.py | 20 +++++-- .../memray/tests/test_memray_profiling.py | 39 +++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) create mode 100644 plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py diff --git a/plugins/flytekit-memray/README.md b/plugins/flytekit-memray/README.md index eca9dab7c6..1919011441 100644 --- a/plugins/flytekit-memray/README.md +++ b/plugins/flytekit-memray/README.md @@ -1 +1,55 @@ -# Memray \ No newline at end of file +# Memray Profiling Plugin + +Memray tracks and reports memory allocations, both in python code and in compilled extension modules. +This Memray Profiling plugin enables memory tracking on the Flyte task level and renders a memgraph profiling graph on Flyte Deck. + +To install the plugin, run the following command: + +```bash +pip install flytekitplugins-memray +``` + +Example +```python +from flytekit import workflow, task, ImageSpec +from flytekitplugins.memray import memray_profiling +import time + + +image = ImageSpec( + name="memray_demo", + packages=["flytekitplugins_memray"], + env={"PYTHONMALLOC": "malloc"}, + registry="", +) + + +def generate_data(n: int): + leak_list = [] + for _ in range(n): # Arbitrary large number for demonstration + large_data = " " * 10**6 # 1 MB string + leak_list.append(large_data) # Keeps appending without releasing + time.sleep(0.1) # Slow down the loop to observe memory changes + + +@task(container_image=image, enable_deck=True) +@memray_profiling(memray_html_reporter="table") +def memory_usage(n: int) -> str: + generate_data(n=n) + + return "Well" + + +@task(container_image=image, enable_deck=True) +@memray_profiling(memray_reporter_args=["--leaks"]) +def memory_leakage(n: int) -> str: + generate_data(n=n) + + return "Well" + + +@workflow +def wf(n: int = 500): + memory_usage(n=n) + memory_leakage(n=n) +``` diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 8f8d8c6bef..70f0233273 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -12,22 +12,34 @@ def __init__( self, task_function: Optional[Callable] = None, memray_html_reporter: str = "flamegraph", - memray_reporter_args: Optional[List[str]] = [], + memray_reporter_args: List[str] = [], ): - """Memray Profiling Plugin. + """Memray profiling plugin. Args: + task_function (function, optional): The user function to be decorated. Defaults to None. + memray_html_reporter (str): The name of the memray reporter which generates an html report. + Today there is only 'flamegraph' & 'table'. + memray_reporter_args (List[str], optional): A list of arguments to pass to the reporter commands. + See the [flamegraph](https://bloomberg.github.io/memray/flamegraph.html#reference) + and [table](https://bloomberg.github.io/memray/table.html#cli-reference) docs for details on supported arguments. """ + if memray_html_reporter not in ["flamegraph", "table"]: raise ValueError( f"{memray_html_reporter} is not a supported html reporter." ) + if not all( + isinstance(arg, str) and "--" in arg for arg in memray_reporter_args + ): + raise ValueError( + f"unrecognized arguments for {memray_html_reporter} reporter. Please check https://bloomberg.github.io/memray/{memray_html_reporter}.html" + ) + self.dir_name = "memray" self.memray_html_reporter = memray_html_reporter self.memray_reporter_args = memray_reporter_args - # All kwargs need to be passed up so that the function wrapping works for both - # `@wandb_init` and `@wandb_init(...)` super().__init__( task_function, memray_html_reporter=memray_html_reporter, diff --git a/plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py new file mode 100644 index 0000000000..776ddd4219 --- /dev/null +++ b/plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py @@ -0,0 +1,39 @@ +from unittest.mock import Mock, patch +import pytest +from flytekit import task, current_context +from flytekitplugins.memray import memray_profiling + + +@task(enable_deck=True) +@memray_profiling +def heavy_compute(i: int) -> int: + return i + 1 + + +def test_local_exec(): + heavy_compute(i=7) + assert ( + len(current_context().decks) == 6 + ) # memray flamegraph, timeline, input, and output, source code, dependencies + + +def test_errors(): + reporter = "summary" + with pytest.raises( + ValueError, match=f"{reporter} is not a supported html reporter." + ): + memray_profiling(memray_html_reporter=reporter) + + reporter = "flamegraph" + with pytest.raises( + ValueError, + match=f"unrecognized arguments for {reporter} reporter. Please check https://bloomberg.github.io/memray/{reporter}.html", + ): + memray_profiling(memray_reporter_args=["--leaks", "trash"]) + + reporter = "flamegraph" + with pytest.raises( + ValueError, + match=f"unrecognized arguments for {reporter} reporter. Please check https://bloomberg.github.io/memray/{reporter}.html", + ): + memray_profiling(memray_reporter_args=[0, 1, 2]) From 4b242283d703fc5a9dc52abdfefcc520f83fc2c8 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 29 Oct 2024 10:59:37 -0700 Subject: [PATCH 13/23] adjust memray_reporter_args type Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 70f0233273..bbf55ad22d 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -12,7 +12,7 @@ def __init__( self, task_function: Optional[Callable] = None, memray_html_reporter: str = "flamegraph", - memray_reporter_args: List[str] = [], + memray_reporter_args: Optional[List[str]] = None, ): """Memray profiling plugin. Args: @@ -29,7 +29,7 @@ def __init__( f"{memray_html_reporter} is not a supported html reporter." ) - if not all( + if memray_reporter_args is not None and not all( isinstance(arg, str) and "--" in arg for arg in memray_reporter_args ): raise ValueError( @@ -38,7 +38,7 @@ def __init__( self.dir_name = "memray" self.memray_html_reporter = memray_html_reporter - self.memray_reporter_args = memray_reporter_args + self.memray_reporter_args = memray_reporter_args if memray_reporter_args else [] super().__init__( task_function, From 4b4b3714a1adf969544e69758bfaebc02a9ef439 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 29 Oct 2024 11:05:01 -0700 Subject: [PATCH 14/23] ruff check --fix Signed-off-by: Jan Fiedler --- .../flytekit-memray/flytekitplugins/memray/profiling.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index bbf55ad22d..53bc8aef01 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -1,9 +1,10 @@ import os -from typing import Callable, Optional, List -import memray import time -from flytekit.core.utils import ClassDecorator +from typing import Callable, List, Optional + +import memray from flytekit import Deck +from flytekit.core.utils import ClassDecorator class memray_profiling(ClassDecorator): From c9fa0644e95bc3c8bed725e2a16d6f714c50de6a Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 29 Oct 2024 11:06:01 -0700 Subject: [PATCH 15/23] ruff format Signed-off-by: Jan Fiedler --- .../flytekitplugins/memray/profiling.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 53bc8aef01..5baf785051 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -8,7 +8,6 @@ class memray_profiling(ClassDecorator): - def __init__( self, task_function: Optional[Callable] = None, @@ -26,9 +25,7 @@ def __init__( """ if memray_html_reporter not in ["flamegraph", "table"]: - raise ValueError( - f"{memray_html_reporter} is not a supported html reporter." - ) + raise ValueError(f"{memray_html_reporter} is not a supported html reporter.") if memray_reporter_args is not None and not all( isinstance(arg, str) and "--" in arg for arg in memray_reporter_args @@ -48,7 +45,6 @@ def __init__( ) def execute(self, *args, **kwargs): - if not os.path.exists(self.dir_name): os.makedirs(self.dir_name) @@ -57,9 +53,7 @@ def execute(self, *args, **kwargs): with memray.Tracker(bin_filepath): output = self.task_function(*args, **kwargs) - self.generate_flytedeck_html( - reporter=self.memray_html_reporter, bin_filepath=bin_filepath - ) + self.generate_flytedeck_html(reporter=self.memray_html_reporter, bin_filepath=bin_filepath) return output @@ -70,12 +64,7 @@ def generate_flytedeck_html(self, reporter, bin_filepath): memray_reporter_args_str = " ".join(self.memray_reporter_args) - if ( - os.system( - f"memray {reporter} -o {html_filepath} {memray_reporter_args_str} {bin_filepath}" - ) - == 0 - ): + if os.system(f"memray {reporter} -o {html_filepath} {memray_reporter_args_str} {bin_filepath}") == 0: with open(html_filepath, "r", encoding="utf-8") as file: html_content = file.read() From 8e67334f983c945861656445ed0923692f3a6751 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 29 Oct 2024 11:19:23 -0700 Subject: [PATCH 16/23] codespell Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-memray/README.md b/plugins/flytekit-memray/README.md index 1919011441..b2b8bc0f24 100644 --- a/plugins/flytekit-memray/README.md +++ b/plugins/flytekit-memray/README.md @@ -1,6 +1,6 @@ # Memray Profiling Plugin -Memray tracks and reports memory allocations, both in python code and in compilled extension modules. +Memray tracks and reports memory allocations, both in python code and in compiled extension modules. This Memray Profiling plugin enables memory tracking on the Flyte task level and renders a memgraph profiling graph on Flyte Deck. To install the plugin, run the following command: From 00f13cae550734066cef3b661993b1e669c9bc84 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 11:26:40 -0800 Subject: [PATCH 17/23] add flytekit-memray to pythonbuild workflows Signed-off-by: Jan Fiedler --- .github/workflows/pythonbuild.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 6721e9afff..d4e1830884 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -342,6 +342,7 @@ jobs: - flytekit-kf-mpi - flytekit-kf-pytorch - flytekit-kf-tensorflow + - flytekit-memray - flytekit-mlflow - flytekit-mmcloud - flytekit-modin From 55350c62d27dfba815018c7f4c3c83ab99c72c9d Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 11:46:13 -0800 Subject: [PATCH 18/23] allow memray.Tracker arguments in profiling Signed-off-by: Jan Fiedler --- .../flytekitplugins/memray/profiling.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 5baf785051..7df30847c1 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -11,6 +11,10 @@ class memray_profiling(ClassDecorator): def __init__( self, task_function: Optional[Callable] = None, + native_traces: bool = False, + trace_python_allocators: bool = False, + follow_fork: bool = False, + memory_interval_ms: int = 10, memray_html_reporter: str = "flamegraph", memray_reporter_args: Optional[List[str]] = None, ): @@ -34,12 +38,20 @@ def __init__( f"unrecognized arguments for {memray_html_reporter} reporter. Please check https://bloomberg.github.io/memray/{memray_html_reporter}.html" ) + self.native_traces = native_traces + self.trace_python_allocators = trace_python_allocators + self.follow_fork = follow_fork + self.memory_interval_ms = memory_interval_ms self.dir_name = "memray" self.memray_html_reporter = memray_html_reporter self.memray_reporter_args = memray_reporter_args if memray_reporter_args else [] super().__init__( task_function, + native_traces=native_traces, + trace_python_allocators=trace_python_allocators, + follow_fork=follow_fork, + memory_interval_ms=memory_interval_ms, memray_html_reporter=memray_html_reporter, memray_reporter_args=memray_reporter_args, ) @@ -50,7 +62,13 @@ def execute(self, *args, **kwargs): bin_filepath = f"{self.dir_name}/{self.task_function.__name__}.{time.strftime('%Y%m%d%H%M%S')}.bin" - with memray.Tracker(bin_filepath): + with memray.Tracker( + bin_filepath, + native_traces=self.native_traces, + trace_python_allocators=self.trace_python_allocators, + follow_fork=self.follow_fork, + memory_interval_ms=self.memory_interval_ms, + ): output = self.task_function(*args, **kwargs) self.generate_flytedeck_html(reporter=self.memray_html_reporter, bin_filepath=bin_filepath) From d08cc59a0c563eae15697f8ffb0dcceb1ed7944e Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 11:53:27 -0800 Subject: [PATCH 19/23] extend memray_profiling args description Signed-off-by: Jan Fiedler --- .../flytekit-memray/flytekitplugins/memray/profiling.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 7df30847c1..4e14887654 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -21,6 +21,13 @@ def __init__( """Memray profiling plugin. Args: task_function (function, optional): The user function to be decorated. Defaults to None. + native_traces (bool): Whether or not to capture native stack frames, in addition to Python stack frames (see [Native tracking](https://bloomberg.github.io/memray/run.html#native-tracking)) + trace_python_allocators (bool): Whether or not to trace Python allocators as independent allocations. (see [Python allocators](https://bloomberg.github.io/memray/python_allocators.html#python-allocators)) + follow_fork (bool): Whether or not to continue tracking in a subprocess that is forked from the tracked process (see [Tracking across forks](https://bloomberg.github.io/memray/run.html#tracking-across-forks)) + memory_interval_ms (int): How many milliseconds to wait between sending periodic resident set size updates. + By default, every 10 milliseconds a record is written that contains the current timestamp and the total number of bytes of virtual memory allocated by the process. + These records are used to create the graph of memory usage over time that appears at the top of the flame graph, for instance. + This parameter lets you adjust the frequency between updates, though you shouldnt need to change it. memray_html_reporter (str): The name of the memray reporter which generates an html report. Today there is only 'flamegraph' & 'table'. memray_reporter_args (List[str], optional): A list of arguments to pass to the reporter commands. From 14d5af0990db23905170d225b725e5bcad4284a9 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 11:58:12 -0800 Subject: [PATCH 20/23] spelling Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/flytekitplugins/memray/profiling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py index 4e14887654..7957804e23 100644 --- a/plugins/flytekit-memray/flytekitplugins/memray/profiling.py +++ b/plugins/flytekit-memray/flytekitplugins/memray/profiling.py @@ -27,7 +27,7 @@ def __init__( memory_interval_ms (int): How many milliseconds to wait between sending periodic resident set size updates. By default, every 10 milliseconds a record is written that contains the current timestamp and the total number of bytes of virtual memory allocated by the process. These records are used to create the graph of memory usage over time that appears at the top of the flame graph, for instance. - This parameter lets you adjust the frequency between updates, though you shouldnt need to change it. + This parameter lets you adjust the frequency between updates, though you shouldn't need to change it. memray_html_reporter (str): The name of the memray reporter which generates an html report. Today there is only 'flamegraph' & 'table'. memray_reporter_args (List[str], optional): A list of arguments to pass to the reporter commands. From 53037b3f4390b4c9ca708f80255974228f4dfc53 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 14:24:43 -0800 Subject: [PATCH 21/23] move tests Signed-off-by: Jan Fiedler --- .../flytekitplugins/{memray => }/tests/test_memray_profiling.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plugins/flytekit-memray/flytekitplugins/{memray => }/tests/test_memray_profiling.py (100%) diff --git a/plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py b/plugins/flytekit-memray/flytekitplugins/tests/test_memray_profiling.py similarity index 100% rename from plugins/flytekit-memray/flytekitplugins/memray/tests/test_memray_profiling.py rename to plugins/flytekit-memray/flytekitplugins/tests/test_memray_profiling.py From 7b1c0bc0f3274b4f48211f01c187bc21faef67bd Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 15:25:57 -0800 Subject: [PATCH 22/23] move tests again :clown_face: Signed-off-by: Jan Fiedler --- .../{flytekitplugins => }/tests/test_memray_profiling.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plugins/flytekit-memray/{flytekitplugins => }/tests/test_memray_profiling.py (100%) diff --git a/plugins/flytekit-memray/flytekitplugins/tests/test_memray_profiling.py b/plugins/flytekit-memray/tests/test_memray_profiling.py similarity index 100% rename from plugins/flytekit-memray/flytekitplugins/tests/test_memray_profiling.py rename to plugins/flytekit-memray/tests/test_memray_profiling.py From ed5cdaf195efcb2ded066ce61e62d832fa1c96e4 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 4 Nov 2024 15:28:15 -0800 Subject: [PATCH 23/23] adjust README.md to not use PYMALLOC env variable Signed-off-by: Jan Fiedler --- plugins/flytekit-memray/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/flytekit-memray/README.md b/plugins/flytekit-memray/README.md index b2b8bc0f24..a231fb2a1b 100644 --- a/plugins/flytekit-memray/README.md +++ b/plugins/flytekit-memray/README.md @@ -19,7 +19,6 @@ import time image = ImageSpec( name="memray_demo", packages=["flytekitplugins_memray"], - env={"PYTHONMALLOC": "malloc"}, registry="", ) @@ -41,7 +40,7 @@ def memory_usage(n: int) -> str: @task(container_image=image, enable_deck=True) -@memray_profiling(memray_reporter_args=["--leaks"]) +@memray_profiling(trace_python_allocators=True, memray_reporter_args=["--leaks"]) def memory_leakage(n: int) -> str: generate_data(n=n)