From c749a2837b4591f5b1042deb80aa32c7b19ecd97 Mon Sep 17 00:00:00 2001 From: Xuchen Pan <32844285+pan-x-c@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:51:56 +0800 Subject: [PATCH] Add more unittest (#304) * add unittest env with gpu * fix unittest yml * add environment for unittest * update workflow trigger * update install step * fix install command * update working dir * update container * update working dir * change working directory * change working directory * change working directory * change working directory * change unittest * use test tag * finish tag support * support run op with different executro * fix pre-commit * add hf mirror * add hf mirror * run all test in standalone mode by default * ignore image face ratio * update tags * add ray testcase * add ray test in workflow * update ray unittest workflow * delete old unittest --------- Co-authored-by: root --- .github/workflows/docker/docker-compose.yml | 65 ++++++++++ .github/workflows/unit-test.yml | 84 ++++++------ data_juicer/core/ray_executor.py | 122 +++++++++--------- data_juicer/utils/unittest_utils.py | 80 ++++++++++++ tests/ops/filter/test_alphanumeric_filter.py | 27 ++-- .../ops/filter/test_audio_duration_filter.py | 47 ++++--- .../filter/test_image_face_ratio_filter.py | 3 +- tests/run.py | 66 +++++----- 8 files changed, 325 insertions(+), 169 deletions(-) create mode 100644 .github/workflows/docker/docker-compose.yml diff --git a/.github/workflows/docker/docker-compose.yml b/.github/workflows/docker/docker-compose.yml new file mode 100644 index 000000000..fb2aa7a3e --- /dev/null +++ b/.github/workflows/docker/docker-compose.yml @@ -0,0 +1,65 @@ +version: '3' +services: + ray-head: + image: data-juicer-unittest:0.2.0 + pull_policy: never + command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block + environment: + - HF_HOME=/data/huggingface + - HF_ENDPOINT=https://hf-mirror.com + - TORCH_HOME=/data/torch + - NLTK_DATA=/data/nltk + - DATA_JUICER_CACHE_HOME=/data/dj + - RAY_ADDRESS=auto + working_dir: /workspace + networks: + - ray-network + volumes: + - huggingface_cache:/data + - ../../..:/workspace + ports: + - "6379:6379" + - "8265:8265" + shm_size: "64G" + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['0', '1'] + capabilities: [gpu] + + ray-worker: + image: data-juicer-unittest:0.2.0 + pull_policy: never + command: ray start --address=ray-head:6379 --block + environment: + - HF_HOME=/data/huggingface + - HF_ENDPOINT=https://hf-mirror.com + - TORCH_HOME=/data/torch + - NLTK_DATA=/data/nltk + - DATA_JUICER_CACHE_HOME=/data/dj + working_dir: /workspace + volumes: + - huggingface_cache:/data + - ../../..:/workspace + depends_on: + - ray-head + networks: + - ray-network + shm_size: "64G" + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['2', '3'] + capabilities: [gpu] + +networks: + ray-network: + driver: bridge + +volumes: + huggingface_cache: + external: true diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 4962d78dd..28b2e6f3f 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -1,58 +1,60 @@ # This workflow will install Python dependencies, run tests and lint with a single version of Python # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python -name: Unit Test +name: unittest -on: [push, pull_request, workflow_dispatch] +on: + workflow_dispatch: + pull_request: + push: + branches: + - main permissions: contents: read jobs: - build: - - runs-on: ubuntu-latest - + unittest-single: + runs-on: [self-hosted, linux] + environment: Testing steps: - uses: actions/checkout@v3 - - name: Check disk space - run: | - df -h - - name: Set up Python 3.8 - uses: actions/setup-python@v3 with: - python-version: "3.8" - - name: Check disk space + path: dj-${{ github.run_id }} + + - name: Setup docker compose + working-directory: dj-${{ github.run_id }}/.github/workflows/docker run: | - df -h - - name: Install dependencies + docker compose up -d + + - name: Install data-juicer + working-directory: dj-${{ github.run_id }}/.github/workflows/docker run: | - sudo apt-get install ffmpeg - python -m pip install --upgrade pip - pip install -v -e .[all] - pip install -v -e .[sandbox] - - name: Increase swapfile + docker compose exec ray-head pip install -e .\[all\] + docker compose exec ray-worker pip install -e .\[all\] + + - name: Clean dataset cache + working-directory: dj-${{ github.run_id }}/.github/workflows/docker run: | - df -h - free -h - sudo swapoff -a - sudo fallocate -l 12G /mnt/swapfile - sudo chmod 600 /mnt/swapfile - sudo mkswap /mnt/swapfile - sudo swapon /mnt/swapfile - sudo swapon --show - - name: Clean data-juicer assets and models after cached - uses: webiny/action-post-run@3.1.0 - with: - run: rm -rf ~/.cache/data_juicer - - name: Cache data-juicer assets and models - uses: actions/cache@v3 - with: - path: ~/.cache/data_juicer - key: dj-assets-models - - name: Check disk space + docker compose exec ray-head rm -rf /data/huggingface/dataset + + - name: Run unittest standalone + working-directory: dj-${{ github.run_id }}/.github/workflows/docker + run: | + docker compose exec ray-head python tests/run.py --tag standalone + + - name: Run unittest ray + working-directory: dj-${{ github.run_id }}/.github/workflows/docker run: | - df -h - - name: Run the test + docker compose exec ray-head python tests/run.py --tag ray + + - name: Remove docker compose + working-directory: dj-${{ github.run_id }}/.github/workflows/docker + if: always() + run: | + docker compose down --remove-orphans + + - name: Cleanup workspace + if: always() run: | - python tests/run.py + rm -rf dj-${{ github.run_id }} diff --git a/data_juicer/core/ray_executor.py b/data_juicer/core/ray_executor.py index d42d72f95..1c066ea02 100644 --- a/data_juicer/core/ray_executor.py +++ b/data_juicer/core/ray_executor.py @@ -102,6 +102,65 @@ def get_num_gpus(self, op, op_proc): proc_per_gpu = op_proc / cuda_device_count() return 1.0 / proc_per_gpu + def run_op(self, op, op_cfg, dataset): + op_name, op_args = list(op_cfg.items())[0] + op_cls = OPERATORS.modules[op_name] + op_proc = calculate_np(self.cfg.np, op, op_name) + num_gpus = self.get_num_gpus(op, op_proc) + use_actor = op.use_actor() or num_gpus + try: + if isinstance(op, Mapper): + if op.is_batched_op(): + if use_actor: + dataset = dataset.map_batches( + op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + batch_format='pyarrow', + num_gpus=num_gpus, + batch_size=1) + # The batch size here is same as in data.py + else: + dataset = dataset.map_batches(partial( + ray_batch_mapper_wrapper, fn=op.process), + batch_format='pyarrow', + num_gpus=num_gpus, + batch_size=1) + # The batch size here is same as in data.py + else: + if use_actor: + dataset = dataset.map(op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) + else: + dataset = dataset.map(op.process, num_gpus=num_gpus) + + elif isinstance(op, Filter): + if use_actor: + dataset = dataset.map(op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) + else: + dataset = dataset.map(op.compute_stats, num_gpus=num_gpus) + if op.stats_export_path is not None: + dataset.write_json(op.stats_export_path, force_ascii=False) + dataset = dataset.filter(op.process) + else: + logger.error( + 'Ray executor only support Filter and Mapper OPs for ' + 'now') + raise NotImplementedError + except: # noqa: E722 + logger.error(f'An error occurred during Op [{op_name}].') + import traceback + traceback.print_exc() + exit(1) + def run(self, load_data_np=None): """ Running the dataset process pipeline. @@ -140,68 +199,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table: logger.info('Processing data...') tstart = time.time() for op_cfg, op in zip(self.process_list, self.ops): - op_name, op_args = list(op_cfg.items())[0] - op_cls = OPERATORS.modules[op_name] - op_proc = calculate_np(self.cfg.np, op, op_name) - num_gpus = self.get_num_gpus(op, op_proc) - use_actor = op.use_actor() or num_gpus - try: - if isinstance(op, Mapper): - if op.is_batched_op(): - if use_actor: - dataset = dataset.map_batches( - op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - batch_format='pyarrow', - num_gpus=num_gpus, - batch_size=1) - # The batch size here is same as in data.py - else: - dataset = dataset.map_batches( - partial(ray_batch_mapper_wrapper, - fn=op.process), - batch_format='pyarrow', - num_gpus=num_gpus, - batch_size=1) - # The batch size here is same as in data.py - else: - if use_actor: - dataset = dataset.map( - op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) - else: - dataset = dataset.map(op.process, - num_gpus=num_gpus) - - elif isinstance(op, Filter): - if use_actor: - dataset = dataset.map(op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) - else: - dataset = dataset.map(op.compute_stats, - num_gpus=num_gpus) - if op.stats_export_path is not None: - dataset.write_json(op.stats_export_path, - force_ascii=False) - dataset = dataset.filter(op.process) - else: - logger.error( - 'Ray executor only support Filter and Mapper OPs for ' - 'now') - raise NotImplementedError - except: # noqa: E722 - logger.error(f'An error occurred during Op [{op_name}].') - import traceback - traceback.print_exc() - exit(1) + dataset = self.run_op(op, op_cfg, dataset) # 4. data export logger.info('Exporting dataset to disk...') diff --git a/data_juicer/utils/unittest_utils.py b/data_juicer/utils/unittest_utils.py index b9d18dbf1..d7ee7d87d 100644 --- a/data_juicer/utils/unittest_utils.py +++ b/data_juicer/utils/unittest_utils.py @@ -2,11 +2,30 @@ import shutil import unittest +import numpy +import pyarrow as pa +import ray.data as rd +from datasets import Dataset + +from data_juicer.ops import Filter +from data_juicer.utils.constant import Fields from data_juicer.utils.registry import Registry SKIPPED_TESTS = Registry('SkippedTests') +def TEST_TAG(*tags): + """Tags for test case. + Currently, `standalone`, `ray` are supported. + """ + + def decorator(func): + setattr(func, '__test_tags__', tags) + return func + + return decorator + + class DataJuicerTestCaseBase(unittest.TestCase): @classmethod @@ -32,3 +51,64 @@ def tearDownClass(cls, hf_model_name=None) -> None: if os.path.exists(transformers.TRANSFORMERS_CACHE): print('CLEAN all TRANSFORMERS_CACHE') shutil.rmtree(transformers.TRANSFORMERS_CACHE) + + def generate_dataset(self, data): + """Generate dataset for a specific executor. + + Args: + type (str, optional): "standalone" or "ray". + Defaults to "standalone". + """ + if self.current_tag.startswith('standalone'): + return Dataset.from_list(data) + elif self.current_tag.startswith('ray'): + dataset = rd.from_items(data) + if Fields.stats not in dataset.columns(fetch_if_missing=False): + + def process_batch_arrow(table: pa.Table) -> pa.Table: + new_column_data = [{} for _ in range(len(table))] + new_talbe = table.append_column(Fields.stats, + [new_column_data]) + return new_talbe + + dataset = dataset.map_batches(process_batch_arrow, + batch_format='pyarrow') + return dataset + else: + raise ValueError('Unsupported type') + + def run_single_op(self, dataset, op, column_names): + """Run operator in the specific executor.""" + if self.current_tag.startswith('standalone'): + if isinstance(op, Filter) and Fields.stats not in dataset.features: + dataset = dataset.add_column(name=Fields.stats, + column=[{}] * dataset.num_rows) + dataset = dataset.map(op.compute_stats) + dataset = dataset.filter(op.process) + dataset = dataset.select_columns(column_names=column_names) + return dataset.to_list() + elif self.current_tag.startswith('ray'): + dataset = dataset.map(op.compute_stats) + dataset = dataset.filter(op.process) + dataset = dataset.to_pandas().get(column_names) + if dataset is None: + return [] + return dataset.to_dict(orient='records') + else: + raise ValueError('Unsupported type') + + def assertDatasetEqual(self, first, second): + + def convert_record(rec): + for key in rec.keys(): + # Convert incomparable `list` to comparable `tuple` + if isinstance(rec[key], numpy.ndarray) or isinstance( + rec[key], list): + rec[key] = tuple(rec[key]) + return rec + + first = [convert_record(d) for d in first] + second = [convert_record(d) for d in second] + first = sorted(first, key=lambda x: tuple(sorted(x.items()))) + second = sorted(second, key=lambda x: tuple(sorted(x.items()))) + return self.assertEqual(first, second) diff --git a/tests/ops/filter/test_alphanumeric_filter.py b/tests/ops/filter/test_alphanumeric_filter.py index efca696c2..594432207 100644 --- a/tests/ops/filter/test_alphanumeric_filter.py +++ b/tests/ops/filter/test_alphanumeric_filter.py @@ -4,24 +4,12 @@ from data_juicer.ops.filter.alphanumeric_filter import AlphanumericFilter from data_juicer.utils.constant import Fields -from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG class AlphanumericFilterTest(DataJuicerTestCaseBase): - def _run_alphanumeric_filter(self, dataset: Dataset, target_list, op): - if Fields.stats not in dataset.features: - # TODO: - # this is a temp solution, - # only add stats when calling filter op - dataset = dataset.add_column(name=Fields.stats, - column=[{}] * dataset.num_rows) - dataset = dataset.map(op.compute_stats) - dataset = dataset.filter(op.process) - dataset = dataset.select_columns(column_names=['text']) - res_list = dataset.to_list() - self.assertEqual(res_list, target_list) - + @TEST_TAG("standalone", "ray") def test_case(self): ds_list = [{ @@ -50,10 +38,12 @@ def test_case(self): }, { 'text': 'emoji表情测试下😊,😸31231\n' }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AlphanumericFilter(min_ratio=0.2, max_ratio=0.9) - self._run_alphanumeric_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, ["text"]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_token_case(self): ds_list = [{ @@ -76,9 +66,10 @@ def test_token_case(self): }, { 'text': 'Do you need a cup of coffee?' }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AlphanumericFilter(tokenization=True, min_ratio=1.5) - self._run_alphanumeric_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, ["text"]) + self.assertDatasetEqual(result, tgt_list) if __name__ == '__main__': diff --git a/tests/ops/filter/test_audio_duration_filter.py b/tests/ops/filter/test_audio_duration_filter.py index 91a39bfd8..f7363969d 100644 --- a/tests/ops/filter/test_audio_duration_filter.py +++ b/tests/ops/filter/test_audio_duration_filter.py @@ -5,7 +5,7 @@ from data_juicer.ops.filter.audio_duration_filter import AudioDurationFilter from data_juicer.utils.constant import Fields -from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG class AudioDurationFilterTest(DataJuicerTestCaseBase): @@ -30,6 +30,7 @@ def _run_audio_duration_filter(self, res_list = dataset.to_list() self.assertEqual(res_list, target_list) + @TEST_TAG("standalone", "ray") def test_default_filter(self): ds_list = [{ @@ -46,10 +47,13 @@ def test_default_filter(self): }, { 'audios': [self.aud3_path] }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter() - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + + @TEST_TAG("standalone", "ray") def test_filter_long_audios(self): ds_list = [{ @@ -60,10 +64,12 @@ def test_filter_long_audios(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud1_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(max_duration=10) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_short_audios(self): ds_list = [{ @@ -74,10 +80,12 @@ def test_filter_short_audios(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud3_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=60) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_audios_within_range(self): ds_list = [{ @@ -88,12 +96,13 @@ def test_filter_audios_within_range(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud2_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_any(self): - ds_list = [{ 'audios': [self.aud1_path, self.aud2_path] }, { @@ -106,12 +115,15 @@ def test_any(self): }, { 'audios': [self.aud2_path, self.aud3_path] }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20, any_or_all='any') - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + print(result) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_all(self): ds_list = [{ @@ -122,12 +134,14 @@ def test_all(self): 'audios': [self.aud1_path, self.aud3_path] }] tgt_list = [] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20, any_or_all='all') - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_in_parallel(self): ds_list = [{ @@ -138,9 +152,10 @@ def test_filter_in_parallel(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud2_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20) - self._run_audio_duration_filter(dataset, tgt_list, op, np=2) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) if __name__ == '__main__': diff --git a/tests/ops/filter/test_image_face_ratio_filter.py b/tests/ops/filter/test_image_face_ratio_filter.py index 2a2327b8f..2c6cf5b40 100644 --- a/tests/ops/filter/test_image_face_ratio_filter.py +++ b/tests/ops/filter/test_image_face_ratio_filter.py @@ -5,9 +5,10 @@ from data_juicer.ops.filter.image_face_ratio_filter import ImageFaceRatioFilter from data_juicer.utils.constant import Fields -from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, SKIPPED_TESTS +@SKIPPED_TESTS.register_module() class ImageFaceRatioFilterTest(DataJuicerTestCaseBase): maxDiff = None diff --git a/tests/run.py b/tests/run.py index 8ff91e459..81028ee01 100644 --- a/tests/run.py +++ b/tests/run.py @@ -19,7 +19,9 @@ sys.path.append(file_dir) parser = argparse.ArgumentParser('test runner') -parser.add_argument('--list_tests', action='store_true', help='list all tests') +parser.add_argument('--tag', choices=["standalone", "ray"], + default="standalone", + help="the tag of tests being run") parser.add_argument('--pattern', default='test_*.py', help='test file pattern') parser.add_argument('--test_dir', default='tests', @@ -27,45 +29,47 @@ args = parser.parse_args() -def gather_test_cases(test_dir, pattern, list_tests): - test_suite = unittest.TestSuite() - discover = unittest.defaultTestLoader.discover(test_dir, - pattern=pattern, - top_level_dir=None) +class TaggedTestLoader(unittest.TestLoader): + def __init__(self, tag="standalone"): + super().__init__() + self.tag = tag + + def loadTestsFromTestCase(self, testCaseClass): + # set tag to testcase class + setattr(testCaseClass, 'current_tag', self.tag) + test_names = self.getTestCaseNames(testCaseClass) + loaded_suite = self.suiteClass() + for test_name in test_names: + test_case = testCaseClass(test_name) + test_method = getattr(test_case, test_name) + if self.tag in getattr(test_method, '__test_tags__', ["standalone"]): + loaded_suite.addTest(test_case) + return loaded_suite + +def gather_test_cases(test_dir, pattern, tag): + test_to_run = unittest.TestSuite() + test_loader = TaggedTestLoader(tag) + discover = test_loader.discover(test_dir, pattern=pattern, top_level_dir=None) print(f'These tests will be skipped due to some reasons: ' f'{SKIPPED_TESTS.modules}') for suite_discovered in discover: - - for test_case in suite_discovered: - logger.info(f'Prepare for test [{test_case}]') - # filter out those tests that need to be skipped - filtered_test_suite = unittest.TestSuite() - for tc in test_case: - if type(tc) in SKIPPED_TESTS.modules.values(): + for test_suite in suite_discovered: + for test_case in test_suite: + if type(test_case) in SKIPPED_TESTS.modules.values(): continue - filtered_test_suite.addTest(tc) - if filtered_test_suite.countTestCases() == 0: - continue - - test_suite.addTest(test_case) - if hasattr(test_case, '__iter__'): - for subcase in test_case: - if list_tests: - print(subcase) - else: - if list_tests: - print(test_case) - return test_suite + logger.info(f'Add test case [{test_case._testMethodName}]' + f' from {test_case.__class__.__name__}') + test_to_run.addTest(test_case) + return test_to_run def main(): runner = unittest.TextTestRunner() test_suite = gather_test_cases(os.path.abspath(args.test_dir), - args.pattern, args.list_tests) - if not args.list_tests: - res = runner.run(test_suite) - if not res.wasSuccessful(): - exit(1) + args.pattern, args.tag) + res = runner.run(test_suite) + if not res.wasSuccessful(): + exit(1) if __name__ == '__main__':