Skip to content

Commit

Permalink
Merge branch 'master' into moe/gate
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams authored Feb 23, 2024
2 parents 1657955 + 4ec8762 commit 05b2262
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 19 deletions.
53 changes: 53 additions & 0 deletions .github/workflows/nv-human-eval.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: nv-human-eval

on:
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
unit-tests:
runs-on: [self-hosted, nvidia, a6000]
container:
image: nvcr.io/nvidia/pytorch:23.03-py3
ports:
- 80
options: --gpus all --shm-size "8G"

steps:
- uses: actions/checkout@v3

- name: Check container state
run: |
ldd --version
nvcc --version
nvidia-smi
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
- name: Install transformers
run: |
git clone --depth=1 https://github.com/huggingface/transformers
cd transformers
git rev-parse --short HEAD
python -m pip install .
- name: Clone Human Eval
run: |
git clone --depth=1 https://github.com/openai/human-eval.git
sed -i '/exec(check_program, exec_globals)/ s/^# //' human-eval/human_eval/execution.py
cd human-eval
git rev-parse --short HEAD
python -m pip install .
- name: Install deepspeed
run: |
python -m pip install .[dev,1bit,autotuning]
ds_report
- name: Python environment
run: |
python -m pip list
- name: Unit tests
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
python -m pytest --color=yes --durations=0 --verbose -rF -m 'evaluation' -k "test_human_eval" unit/ --torch_ver="2.0" --cuda_ver="12"
2 changes: 1 addition & 1 deletion blogs/deepspeed-chat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Furthermore, we would like to point out that our effective performance is 19x hi

***(II) Scalability Analysis.*** The best effective throughput for different model sizes is achieved at different GPU count. This is in part because some of the larger model sizes require more memory to run. However, a large part of this behavior stems from DeepSpeed-HE’s scalability properties that we discuss next.

Figure 7 shows that DeepSeed-RLHF has achieved good scaling overall on up to 64 GPUs. However, if we look more closely, it shows that DeepSpeed-RLHF training achieves super-linear scaling at small scale, followed by near linear or sub-linear scaling at larger scales. This is due to the interaction between memory availability and max global batch size.
Figure 7 shows that DeepSpeed-RLHF has achieved good scaling overall on up to 64 GPUs. However, if we look more closely, it shows that DeepSpeed-RLHF training achieves super-linear scaling at small scale, followed by near linear or sub-linear scaling at larger scales. This is due to the interaction between memory availability and max global batch size.

As DeepSpeed-HE is powered by ZeRO-based technology for training, it allows model states to be partitioned across the available GPUs. As a result, the memory consumption per GPU reduces with the increase in the number of GPUs, allowing DeepSpeed-HE to support a larger batch per GPU resulting in super-linear scaling. However, at large scale, while the available memory continues to increase, the maximum global batch size (1024, in our case, with a sequence length of 512) limits the batch size per GPU, resulting in near-linear or sub-linear scaling.
As a result, for a given max global batch size, DeepSpeed-HE achieves the best throughput and cost efficiency at the boundary of super-linear and sub-linear scalability, and the exact point is mostly determined by the largest batch size that can be run per GPU as the function of available memory and global batch size.
Expand Down
2 changes: 1 addition & 1 deletion blogs/deepspeed-chat/chinese/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ DeepSpeed-HE可以在训练和推理之间无缝更改模型分区,以支持

***(II) 可扩展性分析。*** 不同模型大小的最佳有效吞吐量取决于不同的 GPU 数量。部分原因是因为一些较大的模型大小需要更多的内存来运行。基于此,我们接下来讨论 DeepSpeed-HE 的可扩展性特性。

图 7 显示 DeepSeed-RLHF 在多达 64 个 GPU的集群 上实现了良好的整体扩展。然而,如果我们仔细观察,可以发现 DeepSpeed-RLHF 训练在小规模时实现了超线性扩展,随后在较大规模时实现了接近线性或次线性扩展。这是由于内存可用性和最大全局批量大小之间的相互作用。
图 7 显示 DeepSpeed-RLHF 在多达 64 个 GPU的集群 上实现了良好的整体扩展。然而,如果我们仔细观察,可以发现 DeepSpeed-RLHF 训练在小规模时实现了超线性扩展,随后在较大规模时实现了接近线性或次线性扩展。这是由于内存可用性和最大全局批量大小之间的相互作用。

DeepSpeed-HE 的核心技术基于 ZeRO,用于训练过程中将模型状态分割到每个GPU上。这意味着随着 GPU 数量的增加,每个 GPU 的内存消耗会减少,使得 DeepSpeed-HE 能够在每个 GPU 上支持更大的批量,从而实现超线性扩展。然而,在大规模情况下,尽管可用内存持续增加,但最大全局批量大小仍然限制了每个 GPU 的批量大小,导致接近线性或次线性扩展。因此,在给定的最大全局批量大小(例如,我们设置为 1024 个句子,每个句子长度为 512)下,DeepSpeed-HE 在超线性和次线性可扩展性之间实现了最佳的吞吐量和成本效益。具体的平衡点主要取决于每个 GPU 上可运行的最大批量大小,而这又受到可用内存和全局批量大小的函数所决定。

Expand Down
2 changes: 1 addition & 1 deletion blogs/deepspeed-chat/japanese/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ DeepSpeed-RLHFは、Colossal-AIや、ネイティブのPyTorchを用いたHuggin

***(II) スケーラビリティ分析*** モデルサイズごとに、最良のスループットを得られるGPU数は異なります。これは、モデルサイズが大きくなると、実行に多くのメモリを必要とすることに加え、以下に説明する DeepSpeed-HE のスケーラビリティ特性にも起因しています。

図7は、DeepSeed-RLHF が最大 64 GPU で全体的に良好なスケーラビリティを達成したことを示しています。しかし、より詳細に見ると、DeepSpeed-RLHFの訓練では、小規模な環境では超線形(super linear)なスケーリングを達成し、大規模では線形(linear)またはそれ以下のスケーラビリティになっていることが分かります。これは、メモリの可用性と最大グローバルバッチサイズとの間の相互作用によるものです。
図7は、DeepSpeed-RLHF が最大 64 GPU で全体的に良好なスケーラビリティを達成したことを示しています。しかし、より詳細に見ると、DeepSpeed-RLHFの訓練では、小規模な環境では超線形(super linear)なスケーリングを達成し、大規模では線形(linear)またはそれ以下のスケーラビリティになっていることが分かります。これは、メモリの可用性と最大グローバルバッチサイズとの間の相互作用によるものです。

DeepSpeed-HEはトレーニングにZeROの技術を採用しているため、利用可能なGPU間でモデルを分割することが可能です。その結果、GPUあたりのメモリ消費量はGPU数の増加とともに減少し、DeepSpeed-HEはGPUあたりでより大きなバッチサイズをサポートできるようになり、超線形のスケーリングが実現できます。しかし、より大規模になると、利用可能なメモリが増加し続ける一方で、最大グローバルバッチサイズが制限されているため、GPUあたりのバッチサイズを小さくすることになり、線形またはそれ以下のスケーリングになります。その結果、与えられた最大グローバルバッチサイズに対して、DeepSpeed-HEは、スーパーリニアとサブリニアのスケーラビリティの境界で最高のスループットとコスト効率を達成し、正確なポイントは、利用可能なメモリとグローバルバッチサイズの関数としてGPUごとに実行できる最大バッチサイズによってほぼ決定されます。

Expand Down
25 changes: 17 additions & 8 deletions deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import torch
from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset

from deepspeed.utils import logger, groups
from deepspeed.utils import logger
import deepspeed.comm as dist
from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes
from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype
Expand Down Expand Up @@ -482,17 +482,17 @@ def __init__(
dist.init_distributed()

# comm_group and worker_id+num_workers are mutually exclusive
if comm_group is not None:
self.comm_group = comm_group
self.num_workers = self.comm_group.size()
self.worker_id = self.comm_group.rank()
self.comm_group = comm_group
if self.comm_group is None:
# self.comm_group = deepspeed.utils.groups._clone_world_group()
self.num_workers = num_workers
self.worker_id = worker_id
else:
self.comm_group = groups._clone_world_group()
self.num_workers = self.comm_group.size()
self.worker_id = self.comm_group.rank()

if self.worker_id == 0:
logger.info(f"Data analyzer initialized with {self.num_workers} workers.")
logger.info(f"Distributed data analyzer initialized with {self.num_workers} workers.")

def run_map_reduce(self):

Expand Down Expand Up @@ -635,9 +635,18 @@ def file_write_ordered(self, tensor_list, fname, numpy_dtype):
# method to deserializes a buffer into rows of different lengths and write them to file
def write_buffer_to_file(buff, src, builder):
assert self.worker_id == 0, "only rank 0 can write to file"

# # write one buffer at a time
# for row_len in row_lens[src]:
# builder.add_item(buff[:row_len].cpu())
# buff = buff[row_len:]

# collect all buffers and write them all at once
buffer_list = []
for row_len in row_lens[src]:
builder.add_item(buff[:row_len].cpu())
buffer_list.append(buff[:row_len].cpu())
buff = buff[row_len:]
builder.add_items(buffer_list)

# 5. rank 0 prepares output folder and file
if self.worker_id == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,18 @@ def __init__(self, out_file, dtype=np.int64):
self._doc_idx = [0]

def add_item(self, tensor):
""" write the tensor to the file and update its size in the index"""
np_array = np.array(tensor.numpy(), dtype=self._dtype)
self._data_file.write(np_array.tobytes(order='C'))
self._sizes.append(np_array.size)

def add_items(self, tensor_list):
""" write a list of tensors to the file and update their sizes in the index"""
np_arrays = [np.array(t.numpy(), dtype=self._dtype) for t in tensor_list]
self._data_file.writelines([arr.tobytes(order='C') for arr in np_arrays])
for arr in np_arrays:
self._sizes.append(arr.size)

def add_item_numpy(self, np_array):
if np_array.dtype != self._dtype:
np_array = np_array.astype(self._dtype)
Expand Down
9 changes: 5 additions & 4 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,9 +1911,6 @@ def print_forward_breakdown(self, fwd_time):

@instrument_w_nvtx
def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE):
assert not (self.bfloat16_enabled() and self.pipeline_parallelism), \
f'allreduce_gradients() is not valid when bfloat+pipeline_parallelism is enabled'

# Pass (PP) gas boundary flag to optimizer (required for zero)
self.optimizer.is_gradient_accumulation_boundary = self.is_gradient_accumulation_boundary()
# ZeRO stage >= 2 communicates during non gradient accumulation boundaries as well
Expand All @@ -1926,7 +1923,11 @@ def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE):
self.optimizer, 'reduce_gradients'):
self.optimizer.reduce_gradients(pipeline_parallel=self.pipeline_parallelism)
else:
self.buffered_allreduce_fallback(elements_per_buffer=bucket_size)
grads = None
if hasattr(self.optimizer, "get_grads_for_reduction"):
# This is currently for BF16 optimizer
grads = self.optimizer.get_grads_for_reduction()
self.buffered_allreduce_fallback(grads=grads, elements_per_buffer=bucket_size)

@instrument_w_nvtx
def backward(self, loss, allreduce_gradients=True, release_loss=False, retain_graph=False, scale_wrt_gas=True):
Expand Down
4 changes: 3 additions & 1 deletion deepspeed/runtime/zero/stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,9 @@ def _create_fp32_partitions(self):
self.device).clone().float().detach())

self.fp32_partitioned_groups_flat[i].requires_grad = True # keep this in case internal optimizer uses it
self.fp32_partitioned_groups_flat[i].ds_id = '_'.join(map(str, self.fp16_partitioned_groups_flat_id[i]))
ds_id_begin = str(self.fp16_partitioned_groups_flat_id[i][0])
ds_id_end = str(self.fp16_partitioned_groups_flat_id[i][-1])
self.fp32_partitioned_groups_flat[i].ds_id = ds_id_begin + '_' + ds_id_end

if len(swappable_fp32_tensors) > 0:
self.optimizer_swapper.initialize_parameters(parameters=swappable_fp32_tensors,
Expand Down
7 changes: 4 additions & 3 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[pytest]
addopts = -m "not sequential and not nightly and not inference and not seq_inference and not inference_ops and not inference_v2 and not inference_v2_ops and not stable_diffusion"
addopts = -m "not sequential and not nightly and not inference and not seq_inference and not inference_ops and not inference_v2 and not inference_v2_ops and not stable_diffusion and not evaluation"
markers =
sequential:Tests that need to be run sequentially
inference:Inference model tests
inference_ops:Individual inference operator tests
inference_v2: Inference tests for the v2 stack
inference_v2_ops: Op tests for the v2 stack
inference_v2:Inference tests for the v2 stack
inference_v2_ops:Op tests for the v2 stack
seq_inference:Inference model tests to run sequentially
nightly:Tests that should be run nightly
world_size:Change world size of individual tests in a class
stable_diffusion:Tests that run Stable Diffusion
evaluation:Tests that evaluate model correctness
73 changes: 73 additions & 0 deletions tests/unit/inference/test_human_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import pytest
import os
import torch
from deepspeed.accelerator import get_accelerator


@pytest.mark.evaluation
@pytest.mark.parametrize("model_name", ["codellama/CodeLlama-7b-Python-hf"])
def test_human_eval(model_name):
import mii
import numpy
from transformers import pipeline
from human_eval.data import write_jsonl, read_problems
from human_eval.evaluation import evaluate_functional_correctness

def generate_base_completion(pipe, problem_prompt: str) -> str:
return pipe(problem_prompt, do_sample=True)[0]["generated_text"]

def generate_mii_completion(pipe, problem_prompt: str) -> str:
return pipe(problem_prompt, max_new_tokens=512)[0].generated_text

def generate_samples(pipe, generation_function):
samples = [
dict(task_id=task_id, completion=generation_function(pipe, problems[task_id]["prompt"]))
for task_id in problems for _ in range(num_samples_per_task)
]
return samples

# Loading Problems
problems = read_problems("../../human-eval/data/HumanEval.jsonl.gz")
num_samples_per_task = 20

# Initializing HuggingFace Pipeline
local_rank = os.getenv("LOCAL_RANK", "0")
device = torch.device(get_accelerator().device_name(local_rank))
base_pipe = pipeline(model=model_name,
device=torch.device(get_accelerator().device_name(local_rank)),
max_length=512,
return_full_text=False)

# Generating Base Samples
base_samples = generate_samples(base_pipe, generate_base_completion)

# Base Pipeline Teardown
del base_pipe
get_accelerator().empty_cache()

# Initializing DeepSpeed-MII Pipeline
mii_pipe = mii.pipeline(model_name)

# Generating MII Samples
mii_samples = generate_samples(mii_pipe, generate_mii_completion)

# MII Pipeline Teardown
mii_pipe.destroy()

# Writing Samples
write_jsonl("base_samples.jsonl", base_samples)
write_jsonl("mii_samples.jsonl", mii_samples)

# Evaluating Samples
base_results = evaluate_functional_correctness("base_samples.jsonl")
mii_results = evaluate_functional_correctness("mii_samples.jsonl")

# Executing Assertions
for key in base_results.keys():
assert numpy.allclose(base_results[key], mii_results[key], rtol=0.10), \
f"Base result: {base_results[key]}, MII result: {mii_results[key]}, outside of rtol."

0 comments on commit 05b2262

Please sign in to comment.