Skip to content

Commit

Permalink
Probe-based OP Fusion & Reordering (#464)
Browse files Browse the repository at this point in the history
* * init adapter

* + add basic logic of workloads adaptation

* * update the adaptation logic

* + add unittests for Monitor

* + add unittests for Monitor

* * use multiprocessing to monitor resource utilization

* + add unittests for adapter

* * modification and fix for gece's comments

* * bug fixed: there is no attr _name in FusedFilter

* * support OP fusion based on probed speed of each OP

* * support OP fusion based on probed speed of each OP

* * fix bugs in fused OP speed calculation
+ add unit tests for probe-based OP fusion

* * bug fixed

* * bug fixed: enable batched when is_batched_op is True

* * bug fixed: enable batched when is_batched_op is True

* * expand the test dataset batch according to the num_proc

* * support OP-wise adaptive batch size setting

* * support batched processing for 4 image OPs

* * set mp method for each OP during probing as well

* + add visualization graphs for monitor results

* * share the same context space for a batch

* * share the same context space for a batch

* * share the same context space for a batch but with idx info

* * support batched process for fused filter

* * bug fixed: add idx info to inter vars of word repetition filter, reduce the logic value for each op in fused filter

* * extract batched to an outer func
* modified for text_length_filter

* * use a branch to decide which funcs are used

* * update context for each sample as well

* * modify for mapper and whitespace_normalization_mapper

* * modify for two filters with context

* * allow optional args for batched funcs

* * restore to batched version and rename to xxx_batched

* * restore to batched version and rename to xxx_batched

* * restore to batched version and rename to xxx_batched

* * update docs for this modification

* * DO NOT allow to override the compute_stats or process methods in the subclass of Mapper and Filter

* * rename the methods for the newly-added OP image_face_count_filter

* * rename FusedFilter with "_batched" suffix

* - bug fixed: update context organization for FusedFilter

* * merge main into this branch

* * support probe-based op fusion for ray mode

* * restore to separate contexts of different samples

* * restore to separate contexts of different samples

* * avoid storing contexts from different samples into the same dict object

* * set default fusion strategy to 'probe'
- remove useless arguments

* * set default fusion strategy to 'probe'
- remove useless arguments

* - skip tests with randomness
  • Loading branch information
HYLcool authored Nov 25, 2024
1 parent 2a3a8d4 commit 8ade9b5
Show file tree
Hide file tree
Showing 16 changed files with 1,211 additions and 78 deletions.
2 changes: 2 additions & 0 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ open_tracer: false # whether to open th
op_list_to_trace: [] # only ops in this list will be traced by tracer. If it's empty, all ops will be traced. Only available when tracer is opened.
trace_num: 10 # number of samples to show the differences between datasets before and after each op. Only available when tracer is opened.
op_fusion: false # whether to fuse operators that share the same intermediate variables automatically. Op fusion might reduce the memory requirements slightly but speed up the whole process.
fusion_strategy: 'probe' # OP fusion strategy. Support ['greedy', 'probe'] now. 'greedy' means keep the basic OP order and put the fused OP to the last of each fused OP group. 'probe' means Data-Juicer will probe the running speed for each OP at the beginning and reorder the OPs and fused OPs according to their probed speed (fast to slow). It's 'probe' in default.
cache_compress: null # the compression method of the cache file, which can be specified in ['gzip', 'zstd', 'lz4']. If this parameter is None, the cache file will not be compressed. We recommend you turn on this argument when your input dataset is larger than tens of GB and your disk space is not enough.
keep_stats_in_res_ds: false # whether to keep the computed stats in the result dataset. The intermediate fields to store the stats computed by Filters will be removed if it's False. It's False in default.
keep_hashes_in_res_ds: false # whether to keep the computed hashes in the result dataset. The intermediate fields to store the hashes computed by Deduplicators will be removed if it's False. It's False in default.
adaptive_batch_size: false # whether to use adaptive batch sizes for each OP according to the probed results. It's False in default.

# for multimodal data processing
image_key: 'images' # key name of field to store the list of sample image paths.
Expand Down
22 changes: 22 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from loguru import logger

from data_juicer.ops.base_op import OPERATORS
from data_juicer.ops.op_fusion import FUSION_STRATEGIES
from data_juicer.utils.logger_utils import setup_logger
from data_juicer.utils.mm_utils import SpecialTokens

Expand Down Expand Up @@ -275,6 +276,22 @@ def init_configs(args: Optional[List[str]] = None):
help='Whether to fuse operators that share the same intermediate '
'variables automatically. Op fusion might reduce the memory '
'requirements slightly but speed up the whole process.')
parser.add_argument(
'--fusion_strategy',
type=str,
default='probe',
help='OP fusion strategy. Support ["greedy", "probe"] now. "greedy" '
'means keep the basic OP order and put the fused OP to the last '
'of each fused OP group. "probe" means Data-Juicer will probe '
'the running speed for each OP at the beginning and reorder the '
'OPs and fused OPs according to their probed speed (fast to '
'slow). It\'s "probe" in default.')
parser.add_argument(
'--adaptive_batch_size',
type=bool,
default=False,
help='Whether to use adaptive batch sizes for each OP according to '
'the probed results. It\'s False in default.')
parser.add_argument(
'--process',
type=List[Dict],
Expand Down Expand Up @@ -436,6 +453,11 @@ def init_setup_from_cfg(cfg: Namespace):
# The checkpoint mode is not compatible with op fusion for now.
if cfg.op_fusion:
cfg.use_checkpoint = False
cfg.fusion_strategy = cfg.fusion_strategy.lower()
if cfg.fusion_strategy not in FUSION_STRATEGIES:
raise NotImplementedError(
f'Unsupported OP fusion strategy [{cfg.fusion_strategy}]. '
f'Should be one of {FUSION_STRATEGIES}.')

# update huggingface datasets cache directory only when ds_cache_dir is set
from datasets import config
Expand Down
51 changes: 41 additions & 10 deletions data_juicer/core/adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from datasets import concatenate_datasets
from datasets.config import DEFAULT_MAX_BATCH_SIZE

from data_juicer.core.monitor import Monitor
from data_juicer.ops import UNFORKABLE
from data_juicer.utils.process_utils import setup_mp


class Adapter:
Expand All @@ -27,28 +30,43 @@ def execute_and_probe(dataset, operators, sample_interval=0.5):
if operators is None or len(operators) == 0:
return []

# number of test samples
sample_num = len(dataset)

# resource utilization list
resource_util_list = []
# probe for each OP
unforkable_operators = set(UNFORKABLE.modules.keys())
for op in operators:
# set num_proc to 1 for each OP to focus on the influence of batch
# size only.
old_num_proc = op.num_proc
op.num_proc = 1
# select suitable mp method for each OP
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda() or op._name in unforkable_operators) else None
setup_mp(mp_context)
# expand the test dataset according to the runtime number of
# processes to ensure enough data for a batch and probe the true
# resource utilization for each OP
expanded_dataset = concatenate_datasets([dataset] *
op.runtime_np())

# set the test batch size and save the old one
if op.is_batched_op():
old_batch_size = op.batch_size
op.batch_size = sample_num

# number of test samples
sample_num = len(dataset)
# run single op and monitor the resource utilization
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=(dataset, ), sample_interval=sample_interval)
_, resource_util_per_op = Monitor.monitor_func(
op.run,
args=(expanded_dataset, ),
sample_interval=sample_interval)

# calculate speed
resource_util_per_op[
'speed'] = sample_num / resource_util_per_op['time']
resource_util_list.append(resource_util_per_op)

# restore to the original num_proc
op.num_proc = old_num_proc
# # restore the batch size
if op.is_batched_op():
op.batch_size = old_batch_size

return resource_util_list

Expand Down Expand Up @@ -96,18 +114,31 @@ def probe_small_batch(self, dataset, operators):
current load and estimated OP speed, returning load factors and speed
ranks for each OP.
Notice: the probe should be run with cache enabled.
:param dataset: The dataset to pre-execute small batch on
:param operators: The OP list to be pre-execution and probe
:return: A list of probe results for each OP and the length of data
batch to probe.
"""
# record the cache state and enable the cache
from datasets import (disable_caching, enable_caching,
is_caching_enabled)
previous_state = is_caching_enabled()
if not previous_state:
enable_caching()

# take a small batch
data_batch = self.take_batch(dataset, self.cfg)
# process and monitor the resource utilization
resource_util_list = self.execute_and_probe(data_batch, operators)
# analyze resource utilization
analysis_res = Monitor.analyze_resource_util_list(resource_util_list)

# if the cache is disabled before, disable it again
if not previous_state:
disable_caching()

return analysis_res, len(data_batch)

def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9):
Expand Down
15 changes: 14 additions & 1 deletion data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.ops import Filter, load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils import cache_utils

from .adapter import Adapter
from .exporter import Exporter


Expand Down Expand Up @@ -88,7 +90,18 @@ def run(self,

# extract processes
logger.info('Preparing process operators...')
ops = load_ops(self.cfg.process, self.cfg.op_fusion)
ops = load_ops(self.cfg.process)

if self.cfg.op_fusion:
probe_res = None
if self.cfg.fusion_strategy == 'probe':
logger.info('Probe the OP speed for OP reordering...')
adapter = Adapter(self.cfg)
probe_res, _ = adapter.probe_small_batch(dataset, ops)

logger.info(f'Start OP fusion and reordering with strategy '
f'[{self.cfg.fusion_strategy}]...')
ops = fuse_operators(ops, probe_res)

# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
Expand Down
11 changes: 7 additions & 4 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,13 @@ def process(self,
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
if work_dir:
with open(os.path.join(work_dir, 'monitor.json'), 'w') as out:
monitor_dir = os.path.join(work_dir, 'monitor')
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, 'monitor.json'),
'w') as out:
json.dump(resource_util_list, out)
Monitor.draw_resource_util_graph(resource_util_list,
monitor_dir)
return dataset

def map(self, *args, **kargs):
Expand Down Expand Up @@ -251,9 +256,7 @@ def map(self, *args, **kargs):
'is_batched_op')) and called_func.__self__.is_batched_op(
) or not getattr(called_func.__self__, 'turbo', False):
kargs['batched'] = True
kargs['batch_size'] = kargs.pop('batch_size', 1) if hasattr(
called_func.__self__, 'is_batched_op'
) and called_func.__self__.is_batched_op() else 1
kargs['batch_size'] = kargs.pop('batch_size', 1)
else:
kargs['batched'] = False

Expand Down
30 changes: 28 additions & 2 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from data_juicer.format.load import load_formatter
from data_juicer.format.mixture_formatter import MixtureFormatter
from data_juicer.ops import OPERATORS, load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils import cache_utils
from data_juicer.utils.ckpt_utils import CheckpointManager

from ..ops.selector.frequency_specified_field_selector import \
FrequencySpecifiedFieldSelector
from ..ops.selector.topk_specified_field_selector import \
TopkSpecifiedFieldSelector
from .adapter import Adapter
from .exporter import Exporter
from .tracer import Tracer

Expand All @@ -43,6 +45,8 @@ def __init__(self, cfg: Optional[Namespace] = None):
self.tracer = None
self.ckpt_manager = None

self.adapter = Adapter(self.cfg)

# only enable it when using cache
if self.cfg.use_cache:
logger.info(f'Using cache compression method: '
Expand Down Expand Up @@ -158,9 +162,31 @@ def run(self,
load_data_np = self.cfg.np
dataset = self.formatter.load_dataset(load_data_np, self.cfg)

# 2. extract processes
# 2. extract processes and optimize their orders
logger.info('Preparing process operators...')
ops = load_ops(self.cfg.process, self.cfg.op_fusion)
ops = load_ops(self.cfg.process)

# OP fusion
if self.cfg.op_fusion:
probe_res = None
if self.cfg.fusion_strategy == 'probe':
logger.info('Probe the OP speed for OP reordering...')
probe_res, _ = self.adapter.probe_small_batch(dataset, ops)

logger.info(f'Start OP fusion and reordering with strategy '
f'[{self.cfg.fusion_strategy}]...')
ops = fuse_operators(ops, probe_res)

# adaptive batch size
if self.cfg.adaptive_batch_size:
# calculate the adaptive batch size
bs_per_op = self.adapter.adapt_workloads(dataset, ops)
assert len(bs_per_op) == len(ops)
# update the adaptive batch size
logger.info(f'Adapt batch sizes for each OP to {bs_per_op}')
for i, op in enumerate(ops):
if op.is_batched_op():
op.batch_size = bs_per_op[i]

# 3. data process
# - If tracer is open, trace each op after it's processed
Expand Down
24 changes: 24 additions & 0 deletions data_juicer/core/monitor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
from functools import partial
from multiprocessing import get_context
Expand Down Expand Up @@ -28,6 +29,7 @@ class Monitor:
'''python
{
'time': 10,
'sampling interval': 0.5,
'resource': [
{
'timestamp': xxx,
Expand All @@ -50,6 +52,7 @@ class Monitor:
'''python
{
'time': 10,
'sampling interval': 0.5,
'resource': [...],
'resource_analysis': {
'GPU free mem.': {
Expand Down Expand Up @@ -118,6 +121,24 @@ def monitor_current_resources():

return resource_dict

@staticmethod
def draw_resource_util_graph(resource_util_list, store_dir):
import matplotlib.pyplot as plt
for idx, resource_util_dict in enumerate(resource_util_list):
resource_list = resource_util_dict['resource']
interval = resource_util_dict['sampling interval']
for focus_metric in Monitor.DYNAMIC_FIELDS:
fn = f'func_{idx}_{focus_metric.replace(" ", "_")}.jpg'
ylbl = '%' if focus_metric.endswith('util.') else 'MB'
metric_list = [item[focus_metric] for item in resource_list]
plt.plot([i * interval for i in range(len(metric_list))],
metric_list)
plt.title(focus_metric)
plt.xlabel('Time (s)')
plt.ylabel(ylbl)
plt.savefig(os.path.join(store_dir, fn), bbox_inches='tight')
plt.clf()

@staticmethod
def analyze_resource_util_list(resource_util_list):
"""
Expand Down Expand Up @@ -209,6 +230,9 @@ def monitor_func(func, args=None, sample_interval=0.5):

resource_util_dict['resource'] = mdict['resource']

# record interval
resource_util_dict['sampling interval'] = sample_interval

# calculate speed
resource_util_dict['time'] = end - start

Expand Down
17 changes: 16 additions & 1 deletion data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from data_juicer.config import init_configs
from data_juicer.core.ray_data import RayDataset
from data_juicer.ops import load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils.lazy_loader import LazyLoader

from .adapter import Adapter

ray = LazyLoader('ray', 'ray')
rd = LazyLoader('rd', 'ray.data')

Expand All @@ -33,6 +36,8 @@ def __init__(self, cfg=None):

self.work_dir = self.cfg.work_dir

self.adapter = Adapter(self.cfg)

# init ray
logger.info('Initing Ray ...')
ray.init(self.cfg.ray_address)
Expand Down Expand Up @@ -62,7 +67,17 @@ def run(self, load_data_np=None):
dataset = RayDataset(dataset, self.cfg.dataset_path, self.cfg)
# 2. extract processes
logger.info('Preparing process operators...')
ops = load_ops(self.cfg.process, self.cfg.op_fusion)
ops = load_ops(self.cfg.process)

if self.cfg.op_fusion:
probe_res = None
if self.cfg.fusion_strategy == 'probe':
logger.info('Probe the OP speed for OP reordering...')
probe_res, _ = self.adapter.probe_small_batch(dataset, ops)

logger.info(f'Start OP fusion and reordering with strategy '
f'[{self.cfg.fusion_strategy}]...')
ops = fuse_operators(ops, probe_res)

# 3. data process
logger.info('Processing data...')
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/ops/filter/image_aspect_ratio_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class ImageAspectRatioFilter(Filter):
AspectRatio = W / H.
"""

_batched_op = True

def __init__(self,
min_ratio: float = 0.333,
max_ratio: float = 3.0,
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/ops/filter/image_shape_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class ImageShapeFilter(Filter):
"""Filter to keep samples with image shape (w, h) within specific ranges.
"""

_batched_op = True

def __init__(self,
min_width: int = 1,
max_width: int = sys.maxsize,
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/ops/filter/image_size_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class ImageSizeFilter(Filter):
specific range.
"""

_batched_op = True

def __init__(self,
min_size: str = '0',
max_size: str = '1TB',
Expand Down
1 change: 1 addition & 0 deletions data_juicer/ops/filter/image_text_similarity_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ImageTextSimilarityFilter(Filter):
within a specific range."""

_accelerator = 'cuda'
_batched_op = True

def __init__(self,
hf_clip: str = 'openai/clip-vit-base-patch32',
Expand Down
Loading

0 comments on commit 8ade9b5

Please sign in to comment.