Skip to content

Commit

Permalink
Add more unittest (#304)
Browse files Browse the repository at this point in the history
* add unittest env with gpu

* fix unittest yml

* add environment for unittest

* update workflow trigger

* update install step

* fix install command

* update working dir

* update container

* update working dir

* change working directory

* change working directory

* change working directory

* change working directory

* change unittest

* use test tag

* finish tag support

* support run op with different executro

* fix pre-commit

* add hf mirror

* add hf mirror

* run all test in standalone mode by default

* ignore image face ratio

* update tags

* add ray testcase

* add ray test in workflow

* update ray unittest workflow

* delete old unittest

---------

Co-authored-by: root <panxuchen>
  • Loading branch information
pan-x-c authored Jun 26, 2024
1 parent a8305bc commit c749a28
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 169 deletions.
65 changes: 65 additions & 0 deletions .github/workflows/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: '3'
services:
ray-head:
image: data-juicer-unittest:0.2.0
pull_policy: never
command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
- RAY_ADDRESS=auto
working_dir: /workspace
networks:
- ray-network
volumes:
- huggingface_cache:/data
- ../../..:/workspace
ports:
- "6379:6379"
- "8265:8265"
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0', '1']
capabilities: [gpu]

ray-worker:
image: data-juicer-unittest:0.2.0
pull_policy: never
command: ray start --address=ray-head:6379 --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
working_dir: /workspace
volumes:
- huggingface_cache:/data
- ../../..:/workspace
depends_on:
- ray-head
networks:
- ray-network
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['2', '3']
capabilities: [gpu]

networks:
ray-network:
driver: bridge

volumes:
huggingface_cache:
external: true
84 changes: 43 additions & 41 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
@@ -1,58 +1,60 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: Unit Test
name: unittest

on: [push, pull_request, workflow_dispatch]
on:
workflow_dispatch:
pull_request:
push:
branches:
- main

permissions:
contents: read

jobs:
build:

runs-on: ubuntu-latest

unittest-single:
runs-on: [self-hosted, linux]
environment: Testing
steps:
- uses: actions/checkout@v3
- name: Check disk space
run: |
df -h
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Check disk space
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Install dependencies
docker compose up -d
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
sudo apt-get install ffmpeg
python -m pip install --upgrade pip
pip install -v -e .[all]
pip install -v -e .[sandbox]
- name: Increase swapfile
docker compose exec ray-head pip install -e .\[all\]
docker compose exec ray-worker pip install -e .\[all\]
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
free -h
sudo swapoff -a
sudo fallocate -l 12G /mnt/swapfile
sudo chmod 600 /mnt/swapfile
sudo mkswap /mnt/swapfile
sudo swapon /mnt/swapfile
sudo swapon --show
- name: Clean data-juicer assets and models after cached
uses: webiny/[email protected]
with:
run: rm -rf ~/.cache/data_juicer
- name: Cache data-juicer assets and models
uses: actions/cache@v3
with:
path: ~/.cache/data_juicer
key: dj-assets-models
- name: Check disk space
docker compose exec ray-head rm -rf /data/huggingface/dataset
- name: Run unittest standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag standalone
- name: Run unittest ray
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Run the test
docker compose exec ray-head python tests/run.py --tag ray
- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans
- name: Cleanup workspace
if: always()
run: |
python tests/run.py
rm -rf dj-${{ github.run_id }}
122 changes: 60 additions & 62 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,65 @@ def get_num_gpus(self, op, op_proc):
proc_per_gpu = op_proc / cuda_device_count()
return 1.0 / proc_per_gpu

def run_op(self, op, op_cfg, dataset):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(partial(
ray_batch_mapper_wrapper, fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process, num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats, num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path, force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)

def run(self, load_data_np=None):
"""
Running the dataset process pipeline.
Expand Down Expand Up @@ -140,68 +199,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table:
logger.info('Processing data...')
tstart = time.time()
for op_cfg, op in zip(self.process_list, self.ops):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)
dataset = self.run_op(op, op_cfg, dataset)

# 4. data export
logger.info('Exporting dataset to disk...')
Expand Down
80 changes: 80 additions & 0 deletions data_juicer/utils/unittest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,30 @@
import shutil
import unittest

import numpy
import pyarrow as pa
import ray.data as rd
from datasets import Dataset

from data_juicer.ops import Filter
from data_juicer.utils.constant import Fields
from data_juicer.utils.registry import Registry

SKIPPED_TESTS = Registry('SkippedTests')


def TEST_TAG(*tags):
"""Tags for test case.
Currently, `standalone`, `ray` are supported.
"""

def decorator(func):
setattr(func, '__test_tags__', tags)
return func

return decorator


class DataJuicerTestCaseBase(unittest.TestCase):

@classmethod
Expand All @@ -32,3 +51,64 @@ def tearDownClass(cls, hf_model_name=None) -> None:
if os.path.exists(transformers.TRANSFORMERS_CACHE):
print('CLEAN all TRANSFORMERS_CACHE')
shutil.rmtree(transformers.TRANSFORMERS_CACHE)

def generate_dataset(self, data):
"""Generate dataset for a specific executor.
Args:
type (str, optional): "standalone" or "ray".
Defaults to "standalone".
"""
if self.current_tag.startswith('standalone'):
return Dataset.from_list(data)
elif self.current_tag.startswith('ray'):
dataset = rd.from_items(data)
if Fields.stats not in dataset.columns(fetch_if_missing=False):

def process_batch_arrow(table: pa.Table) -> pa.Table:
new_column_data = [{} for _ in range(len(table))]
new_talbe = table.append_column(Fields.stats,
[new_column_data])
return new_talbe

dataset = dataset.map_batches(process_batch_arrow,
batch_format='pyarrow')
return dataset
else:
raise ValueError('Unsupported type')

def run_single_op(self, dataset, op, column_names):
"""Run operator in the specific executor."""
if self.current_tag.startswith('standalone'):
if isinstance(op, Filter) and Fields.stats not in dataset.features:
dataset = dataset.add_column(name=Fields.stats,
column=[{}] * dataset.num_rows)
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.select_columns(column_names=column_names)
return dataset.to_list()
elif self.current_tag.startswith('ray'):
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.to_pandas().get(column_names)
if dataset is None:
return []
return dataset.to_dict(orient='records')
else:
raise ValueError('Unsupported type')

def assertDatasetEqual(self, first, second):

def convert_record(rec):
for key in rec.keys():
# Convert incomparable `list` to comparable `tuple`
if isinstance(rec[key], numpy.ndarray) or isinstance(
rec[key], list):
rec[key] = tuple(rec[key])
return rec

first = [convert_record(d) for d in first]
second = [convert_record(d) for d in second]
first = sorted(first, key=lambda x: tuple(sorted(x.items())))
second = sorted(second, key=lambda x: tuple(sorted(x.items())))
return self.assertEqual(first, second)
Loading

0 comments on commit c749a28

Please sign in to comment.