Skip to content

Commit

Permalink
Merge branch 'master' into enable_gaudi2_ci
Browse files Browse the repository at this point in the history
  • Loading branch information
vshekhawat-hlab authored Apr 3, 2024
2 parents 0316764 + dc3554f commit c81f1dd
Show file tree
Hide file tree
Showing 31 changed files with 717 additions and 200 deletions.
71 changes: 70 additions & 1 deletion .github/workflows/hpu-gaudi2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,58 @@ jobs:
- 80
options: --runtime=habana -e HABANA_VISIBLE_DEVICES=all -e OMPI_MCA_btl_vader_single_copy_mechanism=none --cap-add=sys_nice

env:
PT_HPU_LAZY_MODE: 0
TEST_LIST: |
test_accelerator.py
test_autotuning.py
test_compression.py
test_dist.py
test_elastic.py
(test_intX_quantization.py and test_quantized_linear)
test_ds_arguments.py
test_run.py
test_multinode_runner.py
test_moe_tp.py
test_monitor.py
(test_zero_optimizer.py and (TestSaveTensorClone or TestZeRONonDistributed))
(test_latest_checkpoint.py and test_missing_latest)
test_reshape_checkpoint.py
test_shared_weights.py
test_sparse.py
test_tag_validation.py
test_pipe_module.py
(test_flops_profiler.py and test_flops_profiler_in_inference)
test_get_optim_files.py
test_groups.py
test_init_on_device.py
test_partition_balanced.py
(test_adamw.py and TestAdamConfigs)
test_coalesced_collectives.py
test_activation_checkpointing_non_reentrant.py
test_activation_checkpointing.py
test_data.py
(test_ds_config_dict.py and (TestBasicConfig or TestBatchConfig))
test_ds_config_model.py
test_mup_optimizers.py
(test_pld.py and test_pld_schedule)
test_runtime_utils.py
test_pipe_schedule.py
test_topology.py
(test_ds_initialize.py and (TestClientOptimizer or TestClientLrScheduler))
test_csr.py
(test_fp16.py and (TestZeroEmptyGrad or TestZeroAllowUntestedOptimizer))
(test_bf16.py and TestZeroDtypeCocktail)
test_partition.py
test_ignore_unused_parameters.py
test_zero_config.py
test_zero_context_ancestry.py
(test_zero_context.py and not TestSerialContext)
test_zero_dynamic_class.py
test_zero_nesting_init.py
test_zeropp.py
(test_zero.py and (TestZero3ParamPartitioningLargeParam or TestZero3ParamPartitioningLargeParam))
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
Expand All @@ -41,11 +93,28 @@ jobs:
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
- name: Install transformers
run: |
git clone https://github.com/huggingface/transformers
cd transformers
git rev-parse --short HEAD
pip install .
- name: Install deepspeed
run: |
pip install .[dev]
pip install .[dev,autotuning]
ds_report
- name: Python environment
run: |
pip list
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
export PT_HPU_LAZY_MODE=${PT_HPU_LAZY_MODE}
TEST_LIST=$(echo "$TEST_LIST" | awk 'NF{printf "%s%s", (NR>1 ? " or " : ""), $0} END{if (NR>1) print ""}')
echo "TEST_LIST ${TEST_LIST}"
echo "PT_HPU_LAZY_MODE ${PT_HPU_LAZY_MODE}"
pytest --verbose unit/ -k "${TEST_LIST}"
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ DeepSpeed has been integrated with several different popular open-source DL fram
| NVIDIA | [![nv-torch110-p40](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch110-p40.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch110-p40.yml) [![nv-torch110-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch110-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch110-v100.yml) [![nv-torch-latest-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-latest-v100.yml) [![nv-h100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-h100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-h100.yml) [![nv-inference](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-inference.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-inference.yml) [![nv-nightly](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-nightly.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-nightly.yml) |
| AMD | [![amd-mi200](https://github.com/microsoft/DeepSpeed/actions/workflows/amd-mi200.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/amd-mi200.yml) |
| CPU | [![torch-latest-cpu](https://github.com/microsoft/DeepSpeed/actions/workflows/cpu-torch-latest.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/cpu-torch-latest.yml) [![cpu-inference](https://github.com/microsoft/DeepSpeed/actions/workflows/cpu-inference.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/cpu-inference.yml) |
| Habana | [![hpu-gaudi2](https://github.com/microsoft/DeepSpeed/actions/workflows/hpu-gaudi2.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/hpu-gaudi2.yml) |
| Intel Gaudi | [![hpu-gaudi2](https://github.com/microsoft/DeepSpeed/actions/workflows/hpu-gaudi2.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/hpu-gaudi2.yml) |
| PyTorch Nightly | [![nv-torch-nightly-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml) |
| Integrations | [![nv-transformers-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-transformers-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-transformers-v100.yml) [![nv-lightning-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-lightning-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-lightning-v100.yml) [![nv-accelerate-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-accelerate-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-accelerate-v100.yml) [![nv-mii](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-mii.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-mii.yml) [![nv-ds-chat](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-ds-chat.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-ds-chat.yml) [![nv-sd](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-sd.yml/badge.svg)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-sd.yml) |
| Misc | [![Formatting](https://github.com/microsoft/DeepSpeed/actions/workflows/formatting.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/formatting.yml) [![pages-build-deployment](https://github.com/microsoft/DeepSpeed/actions/workflows/pages/pages-build-deployment/badge.svg)](https://github.com/microsoft/DeepSpeed/actions/workflows/pages/pages-build-deployment) [![Documentation Status](https://readthedocs.org/projects/deepspeed/badge/?version=latest)](https://deepspeed.readthedocs.io/en/latest/?badge=latest)[![python](https://github.com/microsoft/DeepSpeed/actions/workflows/python.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/python.yml) |
Expand Down Expand Up @@ -160,6 +160,7 @@ dynamically link them at runtime.

| Contributor | Hardware | Accelerator Name | Contributor validated | Upstream validated |
| ----------- | -------- | ---------------- | --------------------- | ------------------ |
| Intel | Intel(R) Gaudi(R) 2 AI accelerator | hpu | Yes | Yes |
| Intel | Intel(R) Xeon(R) Processors | cpu | Yes | Yes |
| Intel | Intel(R) Data Center GPU Max series | xpu | Yes | No |

Expand Down
9 changes: 7 additions & 2 deletions deepspeed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from . import module_inject

from .accelerator import get_accelerator
from .constants import TORCH_DISTRIBUTED_DEFAULT_PORT
from .runtime.engine import DeepSpeedEngine, DeepSpeedOptimizerCallable, DeepSpeedSchedulerCallable
from .runtime.engine import ADAM_OPTIMIZER, LAMB_OPTIMIZER
from .runtime.hybrid_engine import DeepSpeedHybridEngine
Expand All @@ -42,7 +43,6 @@
from .comm.comm import init_distributed

from .runtime import zero
from .runtime import DeepSpeedOptimizer, ZeROOptimizer
from .runtime.compiler import is_compile_supported

from .pipe import PipelineModule
Expand Down Expand Up @@ -72,6 +72,7 @@ def initialize(args=None,
model_parameters: Optional[torch.nn.Module] = None,
training_data: Optional[torch.utils.data.Dataset] = None,
lr_scheduler: Optional[Union[_LRScheduler, DeepSpeedSchedulerCallable]] = None,
distributed_port: int = TORCH_DISTRIBUTED_DEFAULT_PORT,
mpu=None,
dist_init_required: Optional[bool] = None,
collate_fn=None,
Expand All @@ -96,6 +97,8 @@ def initialize(args=None,
lr_scheduler: Optional: Learning Rate Scheduler Object or a Callable that takes an Optimizer and returns a Scheduler object.
The scheduler object should define a get_lr(), step(), state_dict(), and load_state_dict() methods
distributed_port: Optional: Master node (rank 0)'s free port that needs to be used for communication during distributed training
mpu: Optional: A model parallelism unit object that implements
get_{model,data}_parallel_{rank,group,world_size}()
Expand Down Expand Up @@ -137,7 +140,9 @@ def initialize(args=None,
global dist
from deepspeed import comm as dist
dist_backend = get_accelerator().communication_backend_name()
dist.init_distributed(dist_backend=dist_backend, dist_init_required=dist_init_required)
dist.init_distributed(dist_backend=dist_backend,
distributed_port=distributed_port,
dist_init_required=dist_init_required)

# Set config using config_params for backwards compat
if config is None and config_params is not None:
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/autotuning/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def find_replace_str(value, replace_dict):
if not isinstance(value, str):
return str(value)

matches = re.findall(r"\$[A-Za-z0-9_]+", value)
matches = re.findall(r"\$[\w]+", value)
for var in matches:
var_key = var.replace("$", "").lower()
if var_key == "nvme_path":
Expand Down
1 change: 1 addition & 0 deletions deepspeed/checkpoint/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
BASE_OPTIMIZER_STATE = 'base_optimizer_state'
BASE_OPTIMIZER_STATE_STEP = 'base_optimizer_state_step'
SINGLE_PARTITION_OF_FP32_GROUPS = "single_partition_of_fp32_groups"
PARAM_GROUPS = 'param_groups'
GROUP_PADDINGS = 'group_paddings'
PARTITION_COUNT = 'partition_count'
ZERO_STAGE = 'zero_stage'
Expand Down
54 changes: 41 additions & 13 deletions deepspeed/checkpoint/ds_to_universal.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
OPTIMIZER_STATE_DICT,
BASE_OPTIMIZER_STATE,
SINGLE_PARTITION_OF_FP32_GROUPS,
PARAM_GROUPS,
PARAM_SLICE_MAPPINGS,
PARAM_SHAPES,
PARAM,
Expand Down Expand Up @@ -110,6 +111,9 @@ def extract_zero_shards(dir, ds_checkpoint, indices_3D):
fp32=fp32_groups[param_group_id],
)

if "step" in state_groups[param_group_id]:
flat_state["step"] = state_groups[param_group_id]["step"]

for name, fragment_mapping in param_slice_mappings[param_group_id].items():
if pp_index > 0 and any(re.match(pattern, name) for pattern in pipeline_replicated_params):
# Skip tied weights that are replicated in first and last pp stages
Expand Down Expand Up @@ -138,17 +142,28 @@ def dump_param_fragment(dir, tp_index, dp_index, state_name, state_flat_tensor,

#print(f"{param_name}: {offset}: {numel} => {path}")

t = state_flat_tensor.narrow(0, offset, numel).clone()
_save_checkpoint(path, t)
# State might be a python int or a tensor
if state_name != "step" and torch.is_tensor(state_flat_tensor):
state_flat_tensor = state_flat_tensor.narrow(0, offset, numel).clone()
_save_checkpoint(path, state_flat_tensor)


def _merge_zero_shards(param_base_path, state, tp_degree, slice_shape):
slices = []
for tp_index in range(tp_degree):
prefix_path = os.path.join(param_base_path, str(tp_index), f"{state}")
paths = sorted(list(glob.glob(f"{prefix_path}.*")))
if len(paths) == 0:
continue

shards = [torch.load(p) for p in paths]
slice = torch.cat(shards, dim=0).reshape(slice_shape)

if state == "step":
assert all(v == shards[0] for v in shards), "All shards must have the same step value"
slice = shards[0]
else:
slice = torch.cat(shards, dim=0).reshape(slice_shape)

slices.append(slice)
return slices

Expand Down Expand Up @@ -177,6 +192,10 @@ def get_matched_pattern(patterns_, name_):
return pattern_
return None

step_merged = _merge_zero_shards(slice_base_path, "step", tp_degree, shape)
if step_merged:
_save_checkpoint(os.path.join(param_base_path, f"step.pt"), step_merged[0])

for state in ("fp32", "exp_avg", "exp_avg_sq"):
slices = _merge_zero_shards(slice_base_path, state, tp_degree, shape)
final_path = os.path.join(param_base_path, f"{state}.pt")
Expand Down Expand Up @@ -227,13 +246,21 @@ def _get_chunks(l, n):


def _do_parallel_work(do_work, work_chunks, num_workers):
pool = multiprocessing.Pool(num_workers)
results = []
for batch in tqdm.tqdm(work_chunks):
res = pool.map(do_work, batch)
results.extend(res)
pool.close()
pool.join()
if num_workers > 1:
pool = multiprocessing.Pool(num_workers)
results = []
for batch in tqdm.tqdm(work_chunks):
res = pool.map(do_work, batch)
results.extend(res)
pool.close()
pool.join()
else:
# No parallel pass for unit testing
# We can't create child processes in tests
results = []
for batch in tqdm.tqdm(work_chunks):
res = [do_work(x) for x in batch]
results.extend(res)
return results


Expand Down Expand Up @@ -273,6 +300,7 @@ def _save_optimizer_state(args, ds_checkpoint):

optim_sd = sd[OPTIMIZER_STATE_DICT]
output_sd = {k: v for k, v in optim_sd.items() if k not in sharded_states}
output_sd[PARAM_GROUPS] = optim_sd[BASE_OPTIMIZER_STATE][PARAM_GROUPS]
zero_output_folder = os.path.join(args.output_folder, "zero")
output_file_path = os.path.join(zero_output_folder, f"optimizer_state.pt")
_save_checkpoint(output_file_path, output_sd)
Expand All @@ -283,10 +311,9 @@ def _check_for_required_state(ds_checkpoint):
assert universal_checkpoint_info is not None, f'Required {UNIVERSAL_CHECKPOINT_INFO} state is missing in checkpoint. Verify that client creates this state.'


def main():
def main(args):
print(f'Convert DeepSpeed Checkpoint to Universal Checkpoint')

args = parse_arguments()
print(f'Converting DeepSpeed checkpoint in {args.input_folder} to Universal checkpoint in {args.output_folder}')

ds_checkpoint = DeepSpeedCheckpoint(args.input_folder)
Expand Down Expand Up @@ -332,4 +359,5 @@ def main():


if __name__ == "__main__":
main()
args = parse_arguments()
main(args)
21 changes: 19 additions & 2 deletions deepspeed/checkpoint/reshape_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
# DeepSpeed Team

import os
import re
import torch
from collections import OrderedDict
from .constants import (ZERO_FILE_PREFIX, FP16_ZERO_FILE_PREFIX, BF16_ZERO_FILE_PREFIX)
from .constants import (ZERO_FILE_PREFIX, FP16_ZERO_FILE_PREFIX, BF16_ZERO_FILE_PREFIX, MODEL_FILE_PREFIX)


def basic_folder_validation(dir):
Expand Down Expand Up @@ -38,12 +39,28 @@ def get_files(dir):
return file_list


def sort_zero_files(files, prefix):
pattern = f"{prefix}([0-9]+)_{MODEL_FILE_PREFIX}([0-9]+)"
rank_pairs = []
for f in files:
m = re.search(pattern, f)
if m:
dp_rank = int(m.group(1))
mp_rank = int(m.group(2))
rank_pairs.append((dp_rank, mp_rank, f))
else:
raise ValueError(f"Cannot parse dp_rank and mp_rank from {f}")

sorted_files = sorted(rank_pairs, key=lambda x: (x[0], x[1]))
return [f for _, _, f in sorted_files]


def get_zero_files(dir):
file_list = get_files(dir)
for prefix in [ZERO_FILE_PREFIX, FP16_ZERO_FILE_PREFIX, BF16_ZERO_FILE_PREFIX]:
zero_files = get_files_with_prefix(file_list, prefix)
if len(zero_files) > 0:
return zero_files
return sort_zero_files(zero_files, prefix)

return []

Expand Down
8 changes: 8 additions & 0 deletions deepspeed/checkpoint/universal_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ def load_hp_checkpoint_state(self, folder, tp_rank, tp_world_size):
if match:
hp_keys.append(match.group(1))

step = None
for key in hp_keys:
ckpt_file = os.path.join(folder, f"{key}.pt")
ckpt_dict = torch.load(ckpt_file)

if key == "step":
step = ckpt_dict
continue

full_hp_param = ckpt_dict[PARAM]

# need to deal with slices that were averaged.
Expand Down Expand Up @@ -103,6 +109,8 @@ def load_hp_checkpoint_state(self, folder, tp_rank, tp_world_size):

hp_mapping.optim_fragment[key] = tp_hp_fragment.clone().detach()

return step


def enable_universal_checkpoint(param_list):
for param in param_list:
Expand Down
4 changes: 3 additions & 1 deletion deepspeed/checkpoint/zero_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ def _strip_tensor_paddings(self, sd):
if group_paddings[key] == 0:
continue
for state_name, state_value in group_state.items():
if torch.is_tensor(state_value):
if state_name != "step" and torch.is_tensor(state_value):
raw_length = state_value.numel() - group_paddings[key]
group_state[state_name] = torch.narrow(state_value, 0, 0, raw_length).clone()
else:
group_state[state_name] = state_value

def _clear_group_paddings(self, sd):
group_paddings = self._get_optimizer_state(sd, GROUP_PADDINGS)
Expand Down
3 changes: 2 additions & 1 deletion deepspeed/launcher/multinode_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def name(self):
def get_cmd(self, environment, active_resources):
environment['PDSH_RCMD_TYPE'] = 'ssh'
if self.args.ssh_port is not None: # only specify ssh port if it is specified
environment["PDSH_SSH_ARGS_APPEND"] += f" -p {self.args.ssh_port}"
environment["PDSH_SSH_ARGS_APPEND"] = f"{environment.get('PDSH_SSH_ARGS_APPEND', '')} \
-p {self.args.ssh_port}"

active_workers = ",".join(active_resources.keys())
logger.info("Running on the following workers: %s" % active_workers)
Expand Down
Loading

0 comments on commit c81f1dd

Please sign in to comment.