Skip to content

Commit

Permalink
Refactor OP & Dataset (#336)
Browse files Browse the repository at this point in the history
* modelscope-sora news (#323)

* News/modelscope sora (#327)

* modelscope-sora news

* remove empower

* debug for gpu rank for analyser (#329)

* debug for gpu rank for analyser

* spec_numprocs -> num_proc

* Add more unittest  (#304)

* add unittest env with gpu

* fix unittest yml

* add environment for unittest

* update workflow trigger

* update install step

* fix install command

* update working dir

* update container

* update working dir

* change working directory

* change working directory

* change working directory

* change working directory

* change unittest

* use test tag

* finish tag support

* support run op with different executro

* fix pre-commit

* add hf mirror

* add hf mirror

* run all test in standalone mode by default

* ignore image face ratio

* update tags

* add ray testcase

* add ray test in workflow

* update ray unittest workflow

* delete old unittest

---------

Co-authored-by: root <panxuchen>

* Add source tag (#317)

* add source tag for some mapper op

* fix no attribute 'current_tag' when executing local tests

* move op process logic from executor to base op

* fix typo

* move export outside op

* init refactor

* update analyser

* fix format

* clean up

* bring back batch mapper

* Improve fault tolerance & Fix Ray executor

* fix wrapper

* fix batched filter

* Remove use_actor as it is not compatible with the refactored OP clas, unless the dataset class is refactored

* make wrappers work with unittests

* Compatible with unit tests and works with ray

* fix unittest

* fix wrappers with ray, map, filter

* unify unittests

* wrap deduplicators

* Compatible with non-batched calls

* Class-level wrappers

- compatible with dataset.filter
- bring back nested wrappers

* Instance-level wrappers

* Refined instance-level wrappers

- Remove incomplete dataset.filter wrappers
- Simplify code
- Stack wrappers

* fix use_cuda

* Refactor dataset (#348)

* refactor dataset

* update unittest with DJDataset

* fix unittest

* update ray data load

* add test

* ray read json

* update docker image version

* actor is no longer supported

* Regress filter's stats export logic

---------

Co-authored-by: BeachWang <[email protected]>
Co-authored-by: Xuchen Pan <[email protected]>
Co-authored-by: chenhesen <[email protected]>
Co-authored-by: garyzhang99 <[email protected]>
  • Loading branch information
5 people authored Jul 17, 2024
1 parent da79345 commit b4cef5d
Show file tree
Hide file tree
Showing 108 changed files with 1,012 additions and 592 deletions.
65 changes: 65 additions & 0 deletions .github/workflows/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: '3'
services:
ray-head:
image: data-juicer-unittest:0.2.1
pull_policy: never
command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
- RAY_ADDRESS=auto
working_dir: /workspace
networks:
- ray-network
volumes:
- huggingface_cache:/data
- ../../..:/workspace
ports:
- "6379:6379"
- "8265:8265"
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0', '1']
capabilities: [gpu]

ray-worker:
image: data-juicer-unittest:0.2.1
pull_policy: never
command: ray start --address=ray-head:6379 --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
working_dir: /workspace
volumes:
- huggingface_cache:/data
- ../../..:/workspace
depends_on:
- ray-head
networks:
- ray-network
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['2', '3']
capabilities: [gpu]

networks:
ray-network:
driver: bridge

volumes:
huggingface_cache:
external: true
87 changes: 46 additions & 41 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
@@ -1,58 +1,63 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: Unit Test
name: unittest

on: [push, pull_request, workflow_dispatch]
on:
workflow_dispatch:
pull_request:
push:
branches:
- main

permissions:
contents: read

jobs:
build:

runs-on: ubuntu-latest
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
unittest-single:
runs-on: [self-hosted, linux]
environment: Testing
steps:
- uses: actions/checkout@v3
- name: Check disk space
run: |
df -h
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Check disk space
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Install dependencies
docker compose up -d
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
sudo apt-get install ffmpeg
python -m pip install --upgrade pip
pip install -v -e .[all]
pip install -v -e .[sandbox]
- name: Increase swapfile
docker compose exec ray-head pip install -e .\[all\]
docker compose exec ray-worker pip install -e .\[all\]
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
free -h
sudo swapoff -a
sudo fallocate -l 12G /mnt/swapfile
sudo chmod 600 /mnt/swapfile
sudo mkswap /mnt/swapfile
sudo swapon /mnt/swapfile
sudo swapon --show
- name: Clean data-juicer assets and models after cached
uses: webiny/[email protected]
with:
run: rm -rf ~/.cache/data_juicer
- name: Cache data-juicer assets and models
uses: actions/cache@v3
with:
path: ~/.cache/data_juicer
key: dj-assets-models
- name: Check disk space
docker compose exec ray-head rm -rf /data/huggingface/dataset
- name: Run unittest standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag standalone
- name: Run unittest ray
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Run the test
docker compose exec ray-head python tests/run.py --tag ray
- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans
- name: Cleanup workspace
if: always()
run: |
python tests/run.py
rm -rf dj-${{ github.run_id }}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ or [DingDing group](https://qr.dingtalk.com/action/joingroup?spm=a2c22.12281976.
----

## News
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-06-01] ModelScope-Sora "Data Directors" creative sprint—Our third data-centric LLM competition has kicked off! Please visit the competition's [official website](https://tianchi.aliyun.com/competition/entrance/532219) for more information.
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-03-07] We release **Data-Juicer [v0.2.0](https://github.com/alibaba/data-juicer/releases/tag/v0.2.0)** now!
In this new version, we support more features for **multimodal data (including video now)**, and introduce **[DJ-SORA](docs/DJ_SORA.md)** to provide open large-scale, high-quality datasets for SORA-like models.
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-02-20] We have actively maintained an *awesome list of LLM-Data*, welcome to [visit](docs/awesome_llm_data.md) and contribute!
Expand Down
1 change: 1 addition & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Data-Juicer(包含[DJ-SORA](docs/DJ_SORA_ZH.md))正在积极更新和维护
----

## 新消息
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-06-01] ModelScope-Sora“数据导演”创意竞速——第三届Data-Juicer大模型数据挑战赛已经正式启动!立即访问[竞赛官网](https://tianchi.aliyun.com/competition/entrance/532219),了解赛事详情。
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-03-07] 我们现在发布了 **Data-Juicer [v0.2.0](https://github.com/alibaba/data-juicer/releases/tag/v0.2.0)**! 在这个新版本中,我们支持了更多的 **多模态数据(包括视频)** 相关特性。我们还启动了 **[DJ-SORA](docs/DJ_SORA_ZH.md)** ,为SORA-like大模型构建开放的大规模高质量数据集!
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-02-20] 我们在积极维护一份关于LLM-Data的*精选列表*,欢迎[访问](docs/awesome_llm_data.md)并参与贡献!
- ![new](https://img.alicdn.com/imgextra/i4/O1CN01kUiDtl1HVxN6G56vN_!!6000000000764-2-tps-43-19.png) [2024-02-05] 我们的论文被SIGMOD'24 industrial track接收!
Expand Down
21 changes: 5 additions & 16 deletions data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from data_juicer.format import load_formatter
from data_juicer.ops import Filter, load_ops
from data_juicer.utils import cache_utils
from data_juicer.utils.constant import Fields

from .data import add_same_content_to_new_column
from .exporter import Exporter


Expand Down Expand Up @@ -87,21 +85,12 @@ def run(self, load_data_np=None, skip_export=False):
# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
stats_collected = False
for op_cfg, op in zip(self.cfg.process, self.ops):
op_name = list(op_cfg.keys())[0]
for op in self.ops:
if isinstance(op, Filter):
if Fields.stats not in dataset.features:
# only add stats when calling filter op
dataset = dataset.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=self.cfg.np,
desc='Adding new column for stats')
dataset = dataset.map(op.compute_stats,
num_proc=self.cfg.np,
desc=op_name + '_compute_stats')
original_process = op.process
op.process = None
dataset = dataset.process(op)
op.process = original_process
stats_collected = True
if not stats_collected:
logger.warning('No stats collected. Please add some Filter ops to '
Expand Down
68 changes: 59 additions & 9 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import copy
import inspect
from abc import ABC, abstractmethod
from functools import wraps
from time import time
from typing import Union

from datasets import Dataset, DatasetDict, is_caching_enabled
Expand All @@ -14,6 +18,21 @@
from data_juicer.utils.fingerprint_utils import generate_fingerprint


class DJDataset(ABC):
"""Base dataset of DJ"""

@abstractmethod
def process(
self,
operators, # TODO: add type hint
*,
exporter=None,
checkpointer=None,
tracer=None) -> DJDataset:
"""process a list of operators on the dataset."""
pass


def wrap_func_with_nested_access(f):
"""
Before conducting actual function `f`, wrap its args and kargs into nested
Expand Down Expand Up @@ -116,7 +135,7 @@ def map(self, **args):
return super().map(**args)


class NestedDataset(Dataset):
class NestedDataset(Dataset, DJDataset):
"""Enhanced HuggingFace-Dataset for better usability and efficiency."""

def __init__(self, *args, **kargs):
Expand All @@ -139,6 +158,37 @@ def __getitem__(self, key):
res = super().__getitem__(key)
return nested_obj_factory(res)

def process(self,
operator,
*,
exporter=None,
checkpointer=None,
tracer=None):
if operator is None:
return self

if not isinstance(operator, list):
ops = [operator]
else:
ops = operator

start = time()
tstart = start
dataset = self
for op in ops:
dataset = op(dataset,
exporter=exporter,
checkpointer=checkpointer,
tracer=tracer)
end = time()
logger.info(
f'OP [{op._name}] Done in {"%.3f" % (end - start)}(s). '
f'Left {len(dataset)} samples.')
start = end
tend = time()
logger.info(f'All OPs are done in {"%.3f" % (tend - tstart)}(s).')
return dataset

def map(self, *args, **kargs):
"""Override the map func, which is called by most common operations,
such that the processed samples can be accessed by nested manner."""
Expand All @@ -158,16 +208,16 @@ def map(self, *args, **kargs):
kargs['function'])
called_func = kargs['function']

# For wrapped function, try to get its original unwrapped method
while hasattr(called_func, '__wrapped__'):
# For wrapped function, try to get its unwrapped (bound) method
while not inspect.ismethod(called_func) and hasattr(
called_func, '__wrapped__'):
called_func = called_func.__wrapped__
# Does the called function belong to a batched OP?
if inspect.ismethod(called_func) \
and 'is_batched_op' in dir(called_func.__self__) \
and callable(getattr(called_func.__self__, 'is_batched_op')) \
and called_func.__self__.is_batched_op():

# Batched is always required for fault tolerance
if inspect.ismethod(called_func):
kargs['batched'] = True
kargs['batch_size'] = 1
kargs['batch_size'] = kargs.pop(
'batch_size', 1) if called_func.__self__.is_batched_op() else 1

if 'new_fingerprint' not in kargs or kargs['new_fingerprint'] is None:
new_fingerprint = generate_fingerprint(self, *args, **kargs)
Expand Down
Loading

0 comments on commit b4cef5d

Please sign in to comment.