Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Package upload plugin #8698

Open
dbalabka opened this issue Jun 13, 2024 · 3 comments · May be fixed by #8884
Open

Package upload plugin #8698

dbalabka opened this issue Jun 13, 2024 · 3 comments · May be fixed by #8884
Labels
enhancement Improve existing functionality or make things work better

Comments

@dbalabka
Copy link

dbalabka commented Jun 13, 2024

Dask's remote task execution is very straightforward, using a function not dependent on an external package. However, the most common use case relies on existing installed libraries or project packages. There are two types of dependencies:

  1. pip/poetry installed packages
  2. dependencies located in the project source/package

To deliver both dependencies to workers, we do the following:

  1. We create a custom dask image that contains all required extra pip packages. The primary assumption is that we don't change dependencies often, and project-specific dask images can remain untouched for a while. So, we don't rebuild images often. However, to simplify the process, we use some automation that extracts all required packages with poetry export -f requirements.txt --output requirements.txt and builds a docker image remotely using the Kubernetes driver. PipInstall plugin is another way to do it, but it might slow down the cluster starting time till minutes. In our case, it takes less than a minute after image warmup on Kubernetes nodes.
  2. The project source is more dynamic and requires to be uploaded each time we spin up a cluster. We use the existing client.upload_file() function that rely on UploadFile plugin plugin. To clarify, we keep running the cluster only during Python script execution and tear it down when the script finishes.

While we successfully solved the delivery of extra dependencies to remote worker nodes, this requires a deep understanding of Dask cluster deployment and extra helper functions that do not come with Dask out of the box. I propose improving the Developer's Experience in this direction. I would focus on local source delivery on worker nodes first. To be more specific:

  1. Creating a new function upload_package(module: ModuleType) as a complimentary function for existing upload_file(path).
  2. egg file automated creation by a new function upload_package().
  3. Possibility to update existing venv packages like Dask-specific modules on remove worker nodes that should simplify the debug process. In the scope of #11160 investigation, I already proved that is possible (please see Can not process datasets created by the older version of Dask dask#11160 (comment))

We already have a working prototype of the Worker/Scheduler plugin that performs all the above described. If there is a demand for such a plugin, we look forward to contributing our source. Any comments and suggestions are very welcome 🤗

Here are some usage examples:

Project source uploading to all workers:

import my_project_source

cluster = KubeCluster()  
client = cluster.get_client()

# Upload the entire project source to all worker nodes in a very convenient way
clients.register_plugin(UploadModule(my_project_source))

# It will be even more convenient with a new client function
client.upload_package(my_project_source)

We can replace part of the Dask source on all worker nodes for debugging purposes:

from dask.dataframe import backends

client.upload_package(backends)

Here is an example of an adjusted function:
dask/dask#11160 (comment)

@NewbiZ
Copy link

NewbiZ commented Jul 19, 2024

I would be very interested by such a plugin, there is no easy way at the moment to use a dask cluster in "development" mode, meaning when you are working on unpackaged source code.

@jlynchMicron
Copy link

jlynchMicron commented Oct 3, 2024

Spent a few days dealing with this python package upload issues and env/dependency management. Came up with this solution to upload my CWD project and it seems to be working:

dask_plugins.py:

from distributed.diagnostics.plugin import SchedulerPlugin,WorkerPlugin
from dask.utils import tmpfile
import os
import sys
import uuid
import zipfile

#Modeled after UploadDirectory nanny plugin.
#Needed since ~2023 Dask serialize/deserialize architecture now requires scheduler and workers to have dependencies present.
class UploadDirectory_Base():
    """A plugin template to upload a local directory to scheduler/workers.

    Parameters
    ----------
    path: str
        A path to the directory to upload
    skip_words:
        Sub folders to ignore
    skip:
        files to ignores
    update_path:
        Update Python path with uploaded directory.
    """

    def __init__(
        self, 
        path,
        skip_words=(".git", ".github", ".pytest_cache", "tests", "docs"),
        skip=(lambda fn: os.path.splitext(fn)[1] == ".pyc",),
        update_path=False,
    ):
        """
        Initialize the plugin mixin by reading in the data from the given file.
        """
        path = os.path.expanduser(path)
        self.path = os.path.split(path)[-1]
        self.name = "upload-directory-" + os.path.split(path)[-1]
        self.update_path = update_path

        with tmpfile(extension="zip") as fn:
            with zipfile.ZipFile(fn, "w", zipfile.ZIP_DEFLATED) as z:
                for root, dirs, files in os.walk(path):
                    for file in files:
                        filename = os.path.join(root, file)
                        if any(predicate(filename) for predicate in skip):
                            continue
                        dirs = filename.split(os.sep)
                        if any(word in dirs for word in skip_words):
                            continue

                        archive_name = os.path.relpath(
                            os.path.join(root, file), os.path.join(path, "..")
                        )
                        z.write(filename, archive_name)

            with open(fn, "rb") as f:
                self.data = f.read()
    
    async def extract(self, local_directory):
        """
        Extracts the bufferized and zipped folder into a local directory.
        """
        fn = os.path.join(local_directory, f"tmp-{uuid.uuid4()}.zip")
        with open(fn, "wb") as f:
            f.write(self.data)

        import zipfile

        tmp_dir = os.path.join(local_directory, f"tmp-{uuid.uuid4()}")
        os.mkdir(tmp_dir)
        with zipfile.ZipFile(fn) as z:
            z.extractall(path=tmp_dir)

        if self.update_path:
            path = tmp_dir
            #Add local_directory/tmp_dir to path
            if path not in sys.path:
                sys.path.insert(0, path)
            #Add local_directory/tmp_dir/{upload directory name} to path
            if os.path.join(tmp_dir, self.path) not in sys.path:
                sys.path.insert(0, os.path.join(tmp_dir, self.path))

        os.remove(fn)

class UploadDirectory_Scheduler(UploadDirectory_Base, SchedulerPlugin):
    async def start(self, scheduler):
        await self.extract(scheduler.local_directory)

class UploadDirectory_Worker(UploadDirectory_Base, WorkerPlugin):
    async def setup(self, worker):
        await self.extract(worker.local_directory)

Some file using dask client:

import cloudpickle
from . import dask_plugins
cloudpickle.register_pickle_by_value(dask_plugins)

...

plugin = dask_plugins.UploadDirectory_Scheduler(os.getcwd(), update_path=True)
client.register_plugin(plugin)
plugin = dask_plugins.UploadDirectory_Worker(os.getcwd(), update_path=True)
client.register_plugin(plugin)

I think all dependency uploaders now need to upload dependencies to BOTH the scheduler and workers due to new serialization changes (mentioned here): #7797

dbalabka added a commit to dbalabka/dask_distributed that referenced this issue Oct 4, 2024
@dbalabka dbalabka linked a pull request Oct 4, 2024 that will close this issue
2 tasks
@dbalabka
Copy link
Author

dbalabka commented Oct 4, 2024

I've added our implementation in PR #8884
I'll add more details about the implementation later.

@jacobtomlinson jacobtomlinson added enhancement Improve existing functionality or make things work better and removed needs triage labels Oct 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Improve existing functionality or make things work better
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants