Skip to content

Commit

Permalink
Merge branch 'main' into feat/cyruszhang/data-downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
cyruszhang committed Dec 9, 2024
2 parents 4fb6e17 + 4b8b436 commit 84803cd
Show file tree
Hide file tree
Showing 28 changed files with 608 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ omit =

# avoid measuring code of unittest
tests/*

[report]
ignore_errors = True
5 changes: 4 additions & 1 deletion .github/workflows/deploy_sphinx_docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ on:
jobs:
pages:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [ "3.9", "3.10" ]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@master
with:
python_version: ${{ matrix.python-version }}
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
56 changes: 56 additions & 0 deletions .github/workflows/perf-bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: performance_benchmark

on:
workflow_dispatch:
push:
branches:
- main

permissions:
contents: read

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
perf_bench:
runs-on: [GPU, unittest]
environment: Testing
steps:
- uses: actions/checkout@v3
with:
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose up -d
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head pip install -e .\[all\]
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head rm -rf /data/huggingface/dataset
- name: Run performance benchmark standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash tests/benchmark_performance/run.sh ${{ secrets.INTERNAL_WANDB_URL }} ${{ secrets.INTERNAL_WANDB_API_KEY }}
- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans
- name: Cleanup workspace
if: always()
run: |
rm -rf dj-${{ github.run_id }}
7 changes: 7 additions & 0 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ text_keys: 'text' # the key name of fi
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of Hugging Face datasets. It might take up lots of disk space when using cache
ds_cache_dir: null # cache dir for Hugging Face datasets. In default, it\'s the same as the environment variable `HF_DATASETS_CACHE`, whose default value is usually "~/.cache/huggingface/datasets". If this argument is set to a valid path by users, it will override the default cache dir
open_monitor: true # Whether to open the monitor to trace resource utilization for each OP during data processing. It\'s True in default.
use_checkpoint: false # whether to use the checkpoint management to save the latest version of dataset to work dir when processing. Rerun the same config will reload the checkpoint and skip ops before it. Cache will be disabled when using checkpoint. If args of ops before the checkpoint are changed, all ops will be rerun from the beginning.
temp_dir: null # the path to the temp directory to store intermediate caches when cache is disabled, these cache files will be removed on-the-fly. In default, it's None, so the temp dir will be specified by system. NOTICE: you should be caution when setting this argument because it might cause unexpected program behaviors when this path is set to an unsafe directory.
open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer
Expand Down Expand Up @@ -211,6 +212,7 @@ process:
radius: 2 # radius of blur kernel
- image_tagging_mapper: # Mapper to generate image tags.
tag_field_name: '__dj__image_tags__' # the field name to store the tags. It's "__dj__image_tags__" in default.
mem_required: '9GB'
- nlpaug_en_mapper: # simply augment texts in English based on the nlpaug library
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
Expand Down Expand Up @@ -257,6 +259,9 @@ process:
model_params: {} # Parameters for initializing the API model.
sampling_params: {} # Extra parameters passed to the API call.
- punctuation_normalization_mapper: # normalize unicode punctuations to English punctuations.
- python_python_mapper: # executing Python lambda function defined in a file.
file_path: '' # The path to the Python file containing the function to be executed.
function_name: 'process_single' # The name of the function defined in the file to be executed.
- python_lambda_mapper: # executing Python lambda function on data samples.
lambda_str: '' # A string representation of the lambda function to be executed on data samples. If empty, the identity function is used.
batched: False # A boolean indicating whether to process input data in batches.
Expand Down Expand Up @@ -378,6 +383,7 @@ process:
frame_sampling_method: 'all_keyframes' # sampling method of extracting frame images from the videos. Should be one of ["all_keyframes", "uniform"]. The former one extracts all key frames and the latter one extract specified number of frames uniformly from the video. Default: "all_keyframes".
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
mem_required: '9GB'
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.

# Filter ops
Expand Down Expand Up @@ -610,6 +616,7 @@ process:
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
any_or_all: any # keep this sample when any/all videos meet the filter condition
mem_required: '9GB'
- words_num_filter: # filter text with number of words out of specific range
lang: en # sample in which language
tokenization: false # whether to use model to tokenize documents
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.0.0'
__version__ = '1.0.1'

import os
import subprocess
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def init_configs(args: Optional[List[str]] = None):
help='The compression method of the cache file, which can be'
'specified in ["gzip", "zstd", "lz4"]. If this parameter is'
'None, the cache file will not be compressed.')
parser.add_argument(
'--open_monitor',
type=bool,
default=True,
help='Whether to open the monitor to trace resource utilization for '
'each OP during data processing. It\'s True in default.')
parser.add_argument(
'--use_checkpoint',
type=bool,
Expand Down
35 changes: 23 additions & 12 deletions data_juicer/core/data/dj_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,16 @@ def __getitem__(self, key):
res = super().__getitem__(key)
return nested_obj_factory(res)

def process(self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None):
def process(
self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None,
open_monitor=True,
):
if operators is None:
return self

Expand All @@ -179,7 +182,8 @@ def process(self,
unforkable_operators = set(UNFORKABLE.modules.keys())

# resource utilization monitor
resource_util_list = []
if open_monitor:
resource_util_list = []

dataset = self
try:
Expand All @@ -196,12 +200,16 @@ def process(self,
'exporter': exporter,
'tracer': tracer,
}
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
if open_monitor:
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
else:
dataset = op.run(**run_args)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._op_cfg)
resource_util_list.append(resource_util_per_op)
if open_monitor:
resource_util_list.append(resource_util_per_op)
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
Expand All @@ -215,7 +223,10 @@ def process(self,
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
if work_dir:
if work_dir and open_monitor:
# get the analyzed version
resource_util_list = Monitor.analyze_resource_util_list(
resource_util_list)
monitor_dir = os.path.join(work_dir, 'monitor')
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, 'monitor.json'),
Expand Down
13 changes: 8 additions & 5 deletions data_juicer/core/executor/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,14 @@ def run(self,
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
tstart = time()
dataset = dataset.process(ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
dataset = dataset.process(
ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer,
open_monitor=self.cfg.open_monitor,
)
tend = time()
logger.info(f'All OPs are done in {tend - tstart:.3f}s.')

Expand Down
5 changes: 4 additions & 1 deletion data_juicer/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ def monitor_func(func, args=None, sample_interval=0.5):
resource_util_dict = {}

# start monitor
ctx = get_context('fork')
start_method = 'fork'
if os.name == 'nt': # for Windows
start_method = 'spawn'
ctx = get_context(start_method)
with ctx.Manager() as manager:
mdict = manager.dict()
mdict['stop'] = False
Expand Down
5 changes: 2 additions & 3 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ def __init__(self, *args, **kwargs):
method = wrap_func_with_nested_access(method)
setattr(self, name, method)

@classmethod
def is_batched_op(cls):
return cls._batched_op
def is_batched_op(self):
return self._batched_op

def process(self, *args, **kwargs):
raise NotImplementedError
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .optimize_response_mapper import OptimizeResponseMapper
from .pair_preference_mapper import PairPreferenceMapper
from .punctuation_normalization_mapper import PunctuationNormalizationMapper
from .python_file_mapper import PythonFileMapper
from .python_lambda_mapper import PythonLambdaMapper
from .remove_bibliography_mapper import RemoveBibliographyMapper
from .remove_comments_mapper import RemoveCommentsMapper
Expand Down Expand Up @@ -76,8 +77,8 @@
'ImageTaggingMapper', 'NlpaugEnMapper', 'NlpcdaZhMapper',
'OptimizeQAMapper', 'OptimizeQueryMapper', 'OptimizeResponseMapper',
'PairPreferenceMapper', 'PunctuationNormalizationMapper',
'PythonLambdaMapper', 'RemoveBibliographyMapper', 'RemoveCommentsMapper',
'RemoveHeaderMapper', 'RemoveLongWordsMapper',
'PythonFileMapper', 'PythonLambdaMapper', 'RemoveBibliographyMapper',
'RemoveCommentsMapper', 'RemoveHeaderMapper', 'RemoveLongWordsMapper',
'RemoveNonChineseCharacterlMapper', 'RemoveRepeatSentencesMapper',
'RemoveSpecificCharsMapper', 'RemoveTableTextMapper',
'RemoveWordsWithIncorrectSubstringsMapper', 'ReplaceContentMapper',
Expand Down
97 changes: 97 additions & 0 deletions data_juicer/ops/mapper/python_file_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import importlib.util
import inspect
import os

from ..base_op import OPERATORS, Mapper

OP_NAME = 'python_file_mapper'


@OPERATORS.register_module(OP_NAME)
class PythonFileMapper(Mapper):
"""Mapper for executing Python function defined in a file."""

def __init__(self,
file_path: str = '',
function_name: str = 'process_single',
batched: bool = False,
**kwargs):
"""
Initialization method.
:param file_path: The path to the Python file containing the function
to be executed.
:param function_name: The name of the function defined in the file
to be executed.
:param batched: A boolean indicating whether to process input data in
batches.
:param kwargs: Additional keyword arguments passed to the parent class.
"""
self._batched_op = bool(batched)
super().__init__(**kwargs)

self.file_path = file_path
self.function_name = function_name
if not file_path:
self.func = lambda sample: sample
else:
self.func = self._load_function()

def _load_function(self):
if not os.path.isfile(self.file_path):
raise FileNotFoundError(
f"The file '{self.file_path}' does not exist.")

if not self.file_path.endswith('.py'):
raise ValueError(
f"The file '{self.file_path}' is not a Python file.")

# Load the module from the file
module_name = os.path.splitext(os.path.basename(self.file_path))[0]
spec = importlib.util.spec_from_file_location(module_name,
self.file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

# Fetch the specified function from the module
if not hasattr(module, self.function_name):
raise ValueError(
f"Function '{self.function_name}' not found in '{self.file_path}'." # noqa: E501
)

func = getattr(module, self.function_name)

if not callable(func):
raise ValueError(
f"The attribute '{self.function_name}' is not callable.")

# Check that the function has exactly one argument
argspec = inspect.getfullargspec(func)
if len(argspec.args) != 1:
raise ValueError(
f"The function '{self.function_name}' must take exactly one argument" # noqa: E501
)

return func

def process_single(self, sample):
"""Invoke the loaded function with the provided sample."""
result = self.func(sample)

if not isinstance(result, dict):
raise ValueError(
f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501
)

return result

def process_batched(self, samples):
"""Invoke the loaded function with the provided samples."""
result = self.func(samples)

if not isinstance(result, dict):
raise ValueError(
f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501
)

return result
5 changes: 5 additions & 0 deletions data_juicer/utils/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
'punkt.*.pickle':
'https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/'
'data_juicer/models/',

# ram
'ram_plus_swin_large_14m.pth':
'http://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/models/'
'ram_plus_swin_large_14m.pth',
}


Expand Down
Loading

0 comments on commit 84803cd

Please sign in to comment.