Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
Cathy0908 committed Dec 10, 2024
2 parents 5191cf9 + 4b8b436 commit 93e0ddc
Show file tree
Hide file tree
Showing 38 changed files with 801 additions and 131 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ omit =

# avoid measuring code of unittest
tests/*

[report]
ignore_errors = True
5 changes: 4 additions & 1 deletion .github/workflows/deploy_sphinx_docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ on:
jobs:
pages:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [ "3.9", "3.10" ]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@master
with:
python_version: ${{ matrix.python-version }}
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
56 changes: 56 additions & 0 deletions .github/workflows/perf-bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: performance_benchmark

on:
workflow_dispatch:
push:
branches:
- main

permissions:
contents: read

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
perf_bench:
runs-on: [GPU, unittest]
environment: Testing
steps:
- uses: actions/checkout@v3
with:
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose up -d
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head pip install -e .\[all\]
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head rm -rf /data/huggingface/dataset
- name: Run performance benchmark standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash tests/benchmark_performance/run.sh ${{ secrets.INTERNAL_WANDB_URL }} ${{ secrets.INTERNAL_WANDB_API_KEY }}
- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans
- name: Cleanup workspace
if: always()
run: |
rm -rf dj-${{ github.run_id }}
10 changes: 10 additions & 0 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ text_keys: 'text' # the key name of fi
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of Hugging Face datasets. It might take up lots of disk space when using cache
ds_cache_dir: null # cache dir for Hugging Face datasets. In default, it\'s the same as the environment variable `HF_DATASETS_CACHE`, whose default value is usually "~/.cache/huggingface/datasets". If this argument is set to a valid path by users, it will override the default cache dir
open_monitor: true # Whether to open the monitor to trace resource utilization for each OP during data processing. It\'s True in default.
use_checkpoint: false # whether to use the checkpoint management to save the latest version of dataset to work dir when processing. Rerun the same config will reload the checkpoint and skip ops before it. Cache will be disabled when using checkpoint. If args of ops before the checkpoint are changed, all ops will be rerun from the beginning.
temp_dir: null # the path to the temp directory to store intermediate caches when cache is disabled, these cache files will be removed on-the-fly. In default, it's None, so the temp dir will be specified by system. NOTICE: you should be caution when setting this argument because it might cause unexpected program behaviors when this path is set to an unsafe directory.
open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer
Expand Down Expand Up @@ -211,6 +212,7 @@ process:
radius: 2 # radius of blur kernel
- image_tagging_mapper: # Mapper to generate image tags.
tag_field_name: '__dj__image_tags__' # the field name to store the tags. It's "__dj__image_tags__" in default.
mem_required: '9GB'
- nlpaug_en_mapper: # simply augment texts in English based on the nlpaug library
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
Expand Down Expand Up @@ -257,6 +259,12 @@ process:
model_params: {} # Parameters for initializing the API model.
sampling_params: {} # Extra parameters passed to the API call.
- punctuation_normalization_mapper: # normalize unicode punctuations to English punctuations.
- python_python_mapper: # executing Python lambda function defined in a file.
file_path: '' # The path to the Python file containing the function to be executed.
function_name: 'process_single' # The name of the function defined in the file to be executed.
- python_lambda_mapper: # executing Python lambda function on data samples.
lambda_str: '' # A string representation of the lambda function to be executed on data samples. If empty, the identity function is used.
batched: False # A boolean indicating whether to process input data in batches.
- remove_bibliography_mapper: # remove bibliography from Latex text.
- remove_comments_mapper: # remove comments from Latex text, code, etc.
doc_type: tex # comment type you want to remove. Only support 'tex' for now.
Expand Down Expand Up @@ -375,6 +383,7 @@ process:
frame_sampling_method: 'all_keyframes' # sampling method of extracting frame images from the videos. Should be one of ["all_keyframes", "uniform"]. The former one extracts all key frames and the latter one extract specified number of frames uniformly from the video. Default: "all_keyframes".
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
mem_required: '9GB'
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.

# Filter ops
Expand Down Expand Up @@ -607,6 +616,7 @@ process:
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
any_or_all: any # keep this sample when any/all videos meet the filter condition
mem_required: '9GB'
- words_num_filter: # filter text with number of words out of specific range
lang: en # sample in which language
tokenization: false # whether to use model to tokenize documents
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.0.0'
__version__ = '1.0.1'

import os
import subprocess
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ def init_configs(args: Optional[List[str]] = None):
help='The compression method of the cache file, which can be'
'specified in ["gzip", "zstd", "lz4"]. If this parameter is'
'None, the cache file will not be compressed.')
parser.add_argument(
'--open_monitor',
type=bool,
default=True,
help='Whether to open the monitor to trace resource utilization for '
'each OP during data processing. It\'s True in default.')
parser.add_argument(
'--use_checkpoint',
type=bool,
Expand Down
90 changes: 39 additions & 51 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,16 @@ def __getitem__(self, key):
res = super().__getitem__(key)
return nested_obj_factory(res)

def process(self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None):
def process(
self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None,
open_monitor=True,
):
if operators is None:
return self

Expand All @@ -179,7 +182,8 @@ def process(self,
unforkable_operators = set(UNFORKABLE.modules.keys())

# resource utilization monitor
resource_util_list = []
if open_monitor:
resource_util_list = []

dataset = self
try:
Expand All @@ -196,12 +200,16 @@ def process(self,
'exporter': exporter,
'tracer': tracer,
}
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
if open_monitor:
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
else:
dataset = op.run(**run_args)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._op_cfg)
resource_util_list.append(resource_util_per_op)
if open_monitor:
resource_util_list.append(resource_util_per_op)
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
Expand All @@ -215,7 +223,10 @@ def process(self,
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
if work_dir:
if work_dir and open_monitor:
# get the analyzed version
resource_util_list = Monitor.analyze_resource_util_list(
resource_util_list)
monitor_dir = os.path.join(work_dir, 'monitor')
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, 'monitor.json'),
Expand All @@ -225,9 +236,7 @@ def process(self,
monitor_dir)
return dataset

def map(self, *args, **kargs):
"""Override the map func, which is called by most common operations,
such that the processed samples can be accessed by nested manner."""
def update_args(self, args, kargs, is_filter=False):
if args:
args = list(args)
# the first positional para is function
Expand All @@ -253,15 +262,17 @@ def map(self, *args, **kargs):
# batched is required for fault-tolerant or batched OP
if callable(getattr(
called_func.__self__,
'is_batched_op')) and called_func.__self__.is_batched_op(
) or not getattr(called_func.__self__, 'turbo', False):
'is_batched_op')) and called_func.__self__.is_batched_op():
kargs['batched'] = True
kargs['batch_size'] = kargs.pop('batch_size', 1)
elif not getattr(called_func.__self__, 'turbo', False):
kargs['batched'] = True
kargs['batch_size'] = 1
else:
kargs['batched'] = False

# rank is required for cuda model loading
if callable(
# rank is required for cuda model loading for map
if not is_filter and callable(
getattr(called_func.__self__,
'use_cuda')) and called_func.__self__.use_cuda():
kargs['with_rank'] = True
Expand All @@ -270,6 +281,14 @@ def map(self, *args, **kargs):
new_fingerprint = generate_fingerprint(self, *args, **kargs)
kargs['new_fingerprint'] = new_fingerprint

return args, kargs

def map(self, *args, **kargs):
"""Override the map func, which is called by most common operations,
such that the processed samples can be accessed by nested manner."""

args, kargs = self.update_args(args, kargs)

if cache_utils.CACHE_COMPRESS:
decompress(self, kargs['new_fingerprint'],
kargs['num_proc'] if 'num_proc' in kargs else 1)
Expand All @@ -288,38 +307,7 @@ def map(self, *args, **kargs):
def filter(self, *args, **kargs):
"""Override the filter func, which is called by most common operations,
such that the processed samples can be accessed by nested manner."""
if args:
args = list(args)
# the first positional para is function
if args[0] is None:
args[0] = lambda x: nested_obj_factory(x)
else:
args[0] = wrap_func_with_nested_access(args[0])
called_func = args[0]
else:
if 'function' not in kargs or kargs['function'] is None:
kargs['function'] = lambda x: nested_obj_factory(x)
else:
kargs['function'] = wrap_func_with_nested_access(
kargs['function'])
called_func = kargs['function']

# For wrapped function, try to get its unwrapped (bound) method
while not inspect.ismethod(called_func) and hasattr(
called_func, '__wrapped__'):
called_func = called_func.__wrapped__

# Batched is always required for fault tolerance
if inspect.ismethod(called_func):
if callable(getattr(
called_func.__self__,
'is_batched_op')) and called_func.__self__.is_batched_op():
kargs['batched'] = True
kargs['batch_size'] = kargs.pop('batch_size', 1)

if 'new_fingerprint' not in kargs or kargs['new_fingerprint'] is None:
new_fingerprint = generate_fingerprint(self, *args, **kargs)
kargs['new_fingerprint'] = new_fingerprint
args, kargs = self.update_args(args, kargs, is_filter=True)

# For filter, it involves a map and a filter operations, so the final
# cache files includes two sets with different fingerprint (before and
Expand Down
13 changes: 8 additions & 5 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,14 @@ def run(self,
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
tstart = time()
dataset = dataset.process(ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
dataset = dataset.process(
ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer,
open_monitor=self.cfg.open_monitor,
)
tend = time()
logger.info(f'All OPs are done in {tend - tstart:.3f}s.')

Expand Down
5 changes: 4 additions & 1 deletion data_juicer/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ def monitor_func(func, args=None, sample_interval=0.5):
resource_util_dict = {}

# start monitor
ctx = get_context('fork')
start_method = 'fork'
if os.name == 'nt': # for Windows
start_method = 'spawn'
ctx = get_context(start_method)
with ctx.Manager() as manager:
mdict = manager.dict()
mdict['stop'] = False
Expand Down
17 changes: 10 additions & 7 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def wrapper(samples, *args, **kwargs):
return wrapper


def catch_map_single_exception(method):
def catch_map_single_exception(method, return_sample=True):
"""
For single-map sample-level fault tolerance.
The input sample is expected batch_size = 1.
Expand All @@ -92,8 +92,11 @@ def wrapper(sample, *args, **kwargs):
if is_batched(sample):
try:
sample = convert_dict_list_to_list_dict(sample)[0]
res_sample = method(sample, *args, **kwargs)
return convert_list_dict_to_dict_list([res_sample])
res = method(sample, *args, **kwargs)
if return_sample:
return convert_list_dict_to_dict_list([res])
else:
return [res]
except Exception as e:
from loguru import logger
logger.error(
Expand Down Expand Up @@ -166,9 +169,8 @@ def __init__(self, *args, **kwargs):
method = wrap_func_with_nested_access(method)
setattr(self, name, method)

@classmethod
def is_batched_op(cls):
return cls._batched_op
def is_batched_op(self):
return self._batched_op

def process(self, *args, **kwargs):
raise NotImplementedError
Expand Down Expand Up @@ -326,7 +328,8 @@ def __init__(self, *args, **kwargs):
else:
self.compute_stats = catch_map_single_exception(
self.compute_stats_single)
self.process = catch_map_single_exception(self.process_single)
self.process = catch_map_single_exception(self.process_single,
return_sample=False)

# set the process method is not allowed to be overridden
def __init_subclass__(cls, **kwargs):
Expand Down
Loading

0 comments on commit 93e0ddc

Please sign in to comment.