Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs on how to write a new executor #498

Open
TomNicholas opened this issue Jul 14, 2024 · 1 comment
Open

Docs on how to write a new executor #498

TomNicholas opened this issue Jul 14, 2024 · 1 comment
Labels
documentation Improvements or additions to documentation runtime

Comments

@TomNicholas
Copy link
Member

TomNicholas commented Jul 14, 2024

There are a lot of parallel frameworks that Cubed Plans could be converted to (e.g. #499). We have executors for dask & beam, but instead of trying to write more executors ourselves it would be nice to just more clearly document how to create a new executor so that others can contribute.

@TomNicholas TomNicholas added the documentation Improvements or additions to documentation label Jul 14, 2024
@tomwhite
Copy link
Member

That's a great suggestion. I'll sketch out something here to start with.

A Cubed executor is a subclass of DagExecutor:

class DagExecutor:
@property
def name(self) -> str:
raise NotImplementedError # pragma: no cover
def execute_dag(self, dag: MultiDiGraph, **kwargs) -> None:
raise NotImplementedError # pragma: no cover

The execute_dag method is responsible for taking a Cubed plan - which is a DAG of operations and arrays - and turning it into sets of parallel tasks that it runs in stages. It is also responsible for issuing callbacks whenever an operation starts or a task finishes, which the client uses to update progress bars, and record statistics.

The simplest example of a Cubed executor is the SingleThreadedExecutor which runs tasks sequentially:

class SingleThreadedExecutor(DagExecutor):
"""The default execution engine that runs tasks sequentially uses Python loops."""
@property
def name(self) -> str:
return "single-threaded"
def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
for name, node in visit_nodes(dag, resume=resume):
handle_operation_start_callbacks(callbacks, name)
pipeline: CubedPipeline = node["pipeline"]
for m in pipeline.mappable:
exec_stage_func(
m,
pipeline.function,
config=pipeline.config,
name=name,
compute_id=compute_id,
)
if callbacks is not None:
event = TaskEndEvent(name=name)
[callback.on_task_end(event) for callback in callbacks]

In practice backends will have the following characteristics that make them suitable targets for Cubed:

  1. Parallel. The ability to efficiently run multiple tasks in parallel. (The task inputs are generated by the the pipeline.mappable iterator.)
  2. Code distribution. The ability to run arbitrary code in the remote process via some distribution mechanism. (The task function is pipeline.function.)
  3. Memory guarantees. The backend should guarantee that the task gets a certain (configurable) amount of memory.
  4. Retries. The backend should have some way of retrying a task if it fails.
  5. Timeouts. Tasks should fail after a certain amount of time, so they can be retried.
  6. (Optional) Straggler mitigation. Very slow tasks are detected and retried with a backup task so as to not slow down the entire computation. Most backends do not have support for this, but it can be implemented as a part of the executor.

These features (and a few more) are discussed in #276, which also has a table showing which executor has each feature.

Some of the executors in Cubed use asyncio APIs to call the backend (e.g. Modal, Dask, local threads and processes), which makes implementing some of these features easier since the code can be shared (e.g. backup tasks). However, backends do not have to offer an asyncio API to be integrated with Cubed (e.g. Lithops).

All the executor implementations can be found in cubed.runtime.executors.

For testing, new executors can be conditionally added to the ALL_EXECUTORS list - if the backend dependency is present.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation runtime
Projects
None yet
Development

No branches or pull requests

2 participants