Replies: 3 comments 3 replies
-
Thanks for the proposal @andylizf ! It is awesome. Several suggestions: In Approach 1,
In Approach 2, can we quickly investigate how does airflow implements data movement? I think it would help us to design the API ;) Also, one design principle would be imagining an user who dont want to change their existing code structure (including where to output the data). Can we support this situation? Related to the extensions, I think those are awesome but one concern is that the user needs to deeply integrate with our library in the code. cc @Michaelvll for some inputs here. |
Beta Was this translation helpful? Give feedback.
-
After extensive offline discussion with @cblmemo, we found Approach 1 has clear issues with multi-upstream and multi-downstream setups. For example, specifying We believe Approach 2’s added complexity is worthwhile. With syntax like (preprocess >> [train_a, train_b]).with_data('/data', size_gb=2.0) we can simplify common cases while reducing verbosity—addressing Approach 2's main drawback in simple scenarios. This also means modifying the current YAML format to specify data flow edge by edge, instead of listing downstreams per node. |
Beta Was this translation helpful? Give feedback.
-
Approach 0: The IdealClaim: Users understand dependencies, not necessarily dags. @Task
def preprocess():
# preprocessing ...
@Task
def train_a():
preprocess()
# do training
@Task
def train_b():
preprocess()
# do training
launch(train_b)
launch(train_a) Other examples of this approach, regent/legion, parsl, dask Pros:
Cons:
Approach 3: ReasonableInspired by Approach 2 in andy's proposal In yaml dependencies should be specified within the task defintion name: preprocess
resources:
cloud: aws
setup: |
pip install -r requirements.txt
run: |
python3 preprocess.py name: train_a
resources:
cloud: aws
setup: |
pip install -r requirements.txt
dependson:
preprocess:
- /data/train_a
run: |
python3 train_a.py name: train_b
resources:
cloud: aws
setup: |
pip install -r requirements.txt
dependson:
preprocess:
- /data/train_b
run: |
python3 train_b.py Similarly, in the Python API, task dependencies should be specified within the task definition preprocess = Task(name="preprocess", run="python3 preprocess.py")
train_a = Task(name="train_a", run="python3 train_a.py", depends_on=["preprocess:/data/train_a"])
train_b = Task(name="train_b", run="python3 train_a.py", depends_on=["preprocess:/data/train_b"]) |
Beta Was this translation helpful? Give feedback.
-
Background
Currently, data transfer between tasks lacks proper abstraction:
Proposed Approaches
Approach 1: Task Output Registration
This approach introduces automatic data transfer by establishing default paths and enabling customization with
set_output
andset_input
. Environment variables provide easy path access, reducing hardcoded paths.Custom Path Configuration with
set_output
andset_input
For cases where users prefer not to modify existing scripts with fixed paths,
set_output
andset_input
allow specifying custom paths directly in the DAG. Supposepreprocess.py
saves data to/mnt/volume1/features.parquet
andtrain.py
expects the data in/mnt/data/features.parquet
:Previously, a manual script was required to move data between clusters, but now we can configure this directly and enables automatic data transfer without modifying existing scripts:
Default Path Mapping with Environment Variables
For newly created scripts, the default path setup minimizes the need for custom configurations. By default, each task binds its output to
.sky/{task_name}/output
, making this path available to downstream tasks. In the following DAG,preprocess
has its output bound to.sky/preprocess/output
, and this same path structure is mapped totrain
as its input:In this setup, users should configure
preprocess.py
to write output data to.sky/preprocess/output
andtrain.py
to read from this location. Additionally, environment variables provide dynamic access to paths, allowing user scripts to remain unaffected by any changes to input or output paths set in the DAG. By referencingTASK_{TASK_NAME}_OUTPUT_PATH
, users avoid hardcoded paths, creating a setup where scripts are path-agnostic:This configuration allows users to modify paths solely within the DAG while scripts automatically adapt, providing seamless data transfer with minimal setup.
Pros & Cons
✓ Simple, intuitive API
✓ Transparent path handling
✓ Automatic data transfer
✓ Clean separation of mechanism and convenience
✗ Lacks support for partitioned outputs to multiple downstreams
Approach 2: Edge-Based Data Flow
Specify data transfer on edges between tasks, enabling different paths for different downstream tasks.
Pros & Cons
✓ Explicit data routing
✓ Different paths for different downstream tasks
✓ Clear data flow visualization
✗ More verbose for simple cases
Recommendation
Approach 1 provides the best balance of simplicity and functionality.
Possible Extensions & Discussion
Dynamic Resource Requirements: Scale downstream resources based on actual data size. Particularly useful when data size varies significantly between runs.
Python-Level Data API: Replace file operations with Python objects for cleaner data access.
Computation Hiding: Built on Python-level API, enable downstream tasks to start as soon as upstream produces partial results.
Data Contracts: Validate and filter data dependencies between tasks. Also enhances approach 1 with a cleaner abstraction for multi-downstream data routing.
Discussion
We welcome feedback on the proposed designs and extensions.
Beta Was this translation helpful? Give feedback.
All reactions