Skip to content

Commit

Permalink
debug for gpu rank for analyser (#329)
Browse files Browse the repository at this point in the history
* debug for gpu rank for analyser

* spec_numprocs -> num_proc
  • Loading branch information
BeachWang authored Jun 25, 2024
1 parent 1c7bdc4 commit a8305bc
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
12 changes: 11 additions & 1 deletion data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from loguru import logger

from data_juicer import use_cuda
from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.ops import Filter, load_ops
from data_juicer.utils import cache_utils
from data_juicer.utils.constant import Fields
from data_juicer.utils.process_utils import calculate_np

from .data import add_same_content_to_new_column
from .exporter import Exporter
Expand Down Expand Up @@ -89,6 +91,13 @@ def run(self, load_data_np=None, skip_export=False):
stats_collected = False
for op_cfg, op in zip(self.cfg.process, self.ops):
op_name = list(op_cfg.keys())[0]
with_rank = use_cuda() and op._accelerator == 'cuda'
if op.num_proc != 0:
op_proc = op.num_proc
logger.info(f'Op [{op_name}] running with sepcified '
f'number of procs:{op.num_proc}')
else:
op_proc = calculate_np(self.cfg.np, op, op_name)
if isinstance(op, Filter):
if Fields.stats not in dataset.features:
# only add stats when calling filter op
Expand All @@ -100,7 +109,8 @@ def run(self, load_data_np=None, skip_export=False):
num_proc=self.cfg.np,
desc='Adding new column for stats')
dataset = dataset.map(op.compute_stats,
num_proc=self.cfg.np,
num_proc=op_proc,
with_rank=with_rank,
desc=op_name + '_compute_stats')
stats_collected = True
if not stats_collected:
Expand Down
6 changes: 3 additions & 3 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ def run(self, load_data_np=None):
op_name, op_args = list(op_cfg.items())[0]
prev = dataset # record last dataset
with_rank = use_cuda() and op._accelerator == 'cuda'
if op.spec_numprocs != 0:
op_proc = op.spec_numprocs
if op.num_proc != 0:
op_proc = op.num_proc
logger.info(f'Op [{op_name}] running with sepcified '
f'number of procs:{op.spec_numprocs}')
f'number of procs:{op.num_proc}')
else:
op_proc = calculate_np(self.cfg.np, op, op_name)
try:
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, *args, **kwargs):
self._accelerator = kwargs.get('accelerator', 'cpu')

# parameters to determind the number of procs for this op
self.spec_numprocs = kwargs.get('spec_numprocs', 0)
self.num_proc = kwargs.get('num_proc', 0)
self.cpu_required = kwargs.get('cpu_required', 1)
self.mem_required = kwargs.get('mem_required', 0)
if isinstance(self.mem_required, str):
Expand Down
16 changes: 8 additions & 8 deletions tests/config/test_config_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_yaml_cfg_file(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
Expand All @@ -63,7 +63,7 @@ def test_yaml_cfg_file(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand Down Expand Up @@ -130,7 +130,7 @@ def test_mixture_cfg(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand All @@ -147,7 +147,7 @@ def test_mixture_cfg(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand All @@ -164,7 +164,7 @@ def test_mixture_cfg(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand All @@ -181,7 +181,7 @@ def test_mixture_cfg(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand All @@ -198,7 +198,7 @@ def test_mixture_cfg(self):
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'num_proc': 0,
'stats_export_path': None,
'cpu_required': 1,
'mem_required': 0,
Expand All @@ -213,7 +213,7 @@ def test_op_params_parsing(self):

base_class_params = {
'text_key', 'image_key', 'audio_key', 'video_key', 'accelerator',
'spec_numprocs', 'cpu_required', 'mem_required', 'use_actor',
'num_proc', 'cpu_required', 'mem_required', 'use_actor',
}

parser = ArgumentParser(default_env=True, default_config_files=None)
Expand Down

0 comments on commit a8305bc

Please sign in to comment.