From f4cb866c2eba60a72d79c3741d12d098fe4e45b8 Mon Sep 17 00:00:00 2001 From: Nadav Elyahu <88962733+nelyahu@users.noreply.github.com> Date: Tue, 4 Jun 2024 11:25:36 +0300 Subject: [PATCH 1/4] estimate_zero2_model_states_mem_needs: fixing memory estiamtion (#5099) was considering 4 bytes per model param, and 4 bytes per gradient. fixed it to 2 bytes - under the assumption of FP16/BF16 --------- Co-authored-by: Olatunji Ruwase --- deepspeed/runtime/zero/stage_1_and_2.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deepspeed/runtime/zero/stage_1_and_2.py b/deepspeed/runtime/zero/stage_1_and_2.py index 3d5ff5e6b43e..61652854211d 100755 --- a/deepspeed/runtime/zero/stage_1_and_2.py +++ b/deepspeed/runtime/zero/stage_1_and_2.py @@ -2432,7 +2432,9 @@ def estimate_zero2_model_states_mem_needs(total_params, gpu_mem = 2 * total_params cpu_mem = total_params * max(4 * total_gpus, 16) * additional_buffer_factor else: - gpu_mem = 4 * total_params + int(16 * total_params / total_gpus) + # GPU's total_params multipliers: 2 = params_16bit, + # 18 = 2_grads_16bit + 4_grads_32bit + 4_params_32bit + 8_optimizer_states_32bit(momentum and variance) + gpu_mem = 2 * total_params + int(18 * total_params / total_gpus) cpu_mem = total_params * 4 * num_gpus_per_node * additional_buffer_factor return int(cpu_mem), int(gpu_mem) From af4356b348c64d227d08cdc192ac7c053a0c10b6 Mon Sep 17 00:00:00 2001 From: Liangliang-Ma Date: Wed, 5 Jun 2024 23:08:29 +0800 Subject: [PATCH 2/4] Fix cuda hardcode for inference woq (#5565) This is a simple fix for inference woq part, changing from `'cuda'` to `get_accelerator().device_name()`. --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- deepspeed/inference/quantization/utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/deepspeed/inference/quantization/utils.py b/deepspeed/inference/quantization/utils.py index 712abc384a44..a5e8f28bdec9 100644 --- a/deepspeed/inference/quantization/utils.py +++ b/deepspeed/inference/quantization/utils.py @@ -14,14 +14,14 @@ device = get_accelerator().device_name() if get_accelerator().is_available() else 'cpu' -quantizer_cuda_module = None +quantizer_module = None -def get_quantizer_cuda_module(): - global quantizer_cuda_module - if quantizer_cuda_module is None: - quantizer_cuda_module = deepspeed.ops.op_builder.QuantizerBuilder().load() - return quantizer_cuda_module +def get_quantizer_module(): + global quantizer_module + if quantizer_module is None: + quantizer_module = deepspeed.ops.op_builder.QuantizerBuilder().load() + return quantizer_module def tensor_clamp(tensor: Tensor, min, max) -> Tensor: @@ -107,19 +107,19 @@ def dequantize(self, tensor: Tensor, quant_scale: Tensor, quant_min: Tensor) -> if self.config['group_size'] % 8 == 0 and \ (self.config['num_bits'] == 4 or self.config['num_bits'] == 8) and \ self.config['group_dim'] == len(tensor.shape) - 1 and \ - self.dtype == torch.float16 and device == 'cuda': + self.dtype == torch.float16 and device == get_accelerator().device_name(): last_dimension_size = self.config['group_size'] if self.config['num_bits'] == 4: last_dimension_size = last_dimension_size // 2 - quantized_tensor = get_quantizer_cuda_module().dequantize_int4_to_half_experimental( + quantized_tensor = get_quantizer_module().dequantize_int4_to_half_experimental( tensor.reshape(-1, last_dimension_size), quant_scale, quant_min, tensor.numel() // last_dimension_size, self.config['group_size']) shape = list(tensor.shape) shape[-1] = shape[-1] * 2 elif self.config['num_bits'] == 8: # last_dimension_size = last_dimension_size // 2 - quantized_tensor = get_quantizer_cuda_module().dequantize_int8_to_half_experimental( + quantized_tensor = get_quantizer_module().dequantize_int8_to_half_experimental( tensor.reshape(-1, last_dimension_size), quant_scale, quant_min, tensor.numel() // last_dimension_size, self.config['group_size']) shape = list(tensor.shape) From 6b6d64185708e194ad06a8b6e08a15af3f59d1d4 Mon Sep 17 00:00:00 2001 From: inkcherry Date: Thu, 6 Jun 2024 01:23:54 +0800 Subject: [PATCH 3/4] fix sequence parallel(Ulysses) grad scale for zero0 (#5555) use dp_world_size for grad reduction, instead of seq_dp_world_size. Currently, for zero0, only sparse tensors use the correct world_size. tiny model with sp=4 grad norm test: grad_norm | step1 | step2 | step3 | step4 |step5 | step100 -- | -- | -- | -- | -- | --| -- zero1 | 15.825 | 16.646|15.853 | 16.159 | 17.333 | 15.555 zero0 | 3.956 | 4.161 | 3.963 | 4.040 | 4.333| 3.889 zero0(this patch) | 15.825 | 16.646 | 15.853| 16.159 | 17.333 | 15.554 --- deepspeed/runtime/engine.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index be153b4b4948..08ab05d79b6a 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -2407,18 +2407,22 @@ def _reduce_non_expert_gradients(self, grads, elements_per_buffer): split_sparse_tensor_buckets, split_dense_tensor_buckets = split_half_float_double_sparse(grads) if self.pipeline_parallelism: dp_group = self.mpu.get_data_parallel_group() + dp_world_size = dist.get_world_size(dp_group) else: dp_group = groups._get_sequence_data_parallel_group() - + dp_world_size = dist.get_world_size(dp_group) / float(self.sequence_parallel_size) for _, sparse_bucket_tuple in enumerate(split_sparse_tensor_buckets): if sparse_bucket_tuple: bucket_type, sparse_bucket = sparse_bucket_tuple - self.sparse_allreduce_no_retain(sparse_bucket, dp_group=dp_group) + self.sparse_allreduce_no_retain(sparse_bucket, dp_group=dp_group, dp_world_size=dp_world_size) for _, dense_bucket_tuple in enumerate(split_dense_tensor_buckets): if dense_bucket_tuple: bucket_type, dense_bucket = dense_bucket_tuple - self.allreduce_no_retain(dense_bucket, dp_group=dp_group, numel_per_bucket=elements_per_buffer) + self.allreduce_no_retain(dense_bucket, + dp_group=dp_group, + numel_per_bucket=elements_per_buffer, + dp_world_size=dp_world_size) def _reduce_expert_gradients(self, expert_grads, elements_per_buffer): # to maintain the gradients value unaffected by ep_size setting, @@ -2490,9 +2494,9 @@ def sparse_allreduce(self, sparse, dp_group, dp_world_size=None): dp_world_size = dist.get_world_size(group=dp_group) if self.postscale_gradients(): if self.gradient_average: - values.mul_(self.gradient_predivide_factor() / (dp_world_size / float(self.sequence_parallel_size))) + values.mul_(self.gradient_predivide_factor() / (dp_world_size)) else: - values.mul_(1. / (dp_world_size / float(self.sequence_parallel_size))) + values.mul_(1. / (dp_world_size)) indices_device_list = self.sparse_all_gather(indices, dp_group) values_device_list = self.sparse_all_gather(values, dp_group) From 11a62a06351d8711d61d25ba22e870b197f453f9 Mon Sep 17 00:00:00 2001 From: Liangliang-Ma Date: Thu, 6 Jun 2024 04:28:46 +0800 Subject: [PATCH 4/4] Add Compressedbackend for Onebit optimizers (#5473) In the process of adding onebit optimizers support for XPU devices, we have noticed that for different accelerator, the main difference of implementation of `compressed_allreduce` lies on `packbits` and `unpackbits`. CUDA uses cupy and NPU uses torch_npu. Instead of replace these to xpu only functions, we provided a CompressedBackend to do the `compressed_allreduce` work where users can add their own packbits/unpackbits kernels, which is a general path for all kinds of accelerators. In this PR, we: 1. Add CompressedBackend for onebitAdam, onebitLamb and zerooneAdam 2. Add XPU implement of packbits/unpackbits with SYCL, built in PackbitsBuilder 3. Add tests for onebit with CompressedBackend --------- Co-authored-by: Olatunji Ruwase --- accelerator/xpu_accelerator.py | 6 +- csrc/xpu/packbits/packing.cpp | 100 +++++++++++++++++ deepspeed/runtime/comm/compressed.py | 137 ++++++++++++++++++++++++ deepspeed/runtime/fp16/onebit/adam.py | 4 + deepspeed/runtime/fp16/onebit/lamb.py | 4 + deepspeed/runtime/fp16/onebit/zoadam.py | 4 + op_builder/xpu/__init__.py | 1 + op_builder/xpu/packbits.py | 26 +++++ tests/onebit/README.md | 31 ++++++ tests/onebit/test_compressed_backend.py | 96 +++++++++++++++++ tests/onebit/test_compressed_perf.py | 97 +++++++++++++++++ 11 files changed, 504 insertions(+), 2 deletions(-) create mode 100644 csrc/xpu/packbits/packing.cpp create mode 100644 deepspeed/runtime/comm/compressed.py create mode 100644 op_builder/xpu/packbits.py create mode 100644 tests/onebit/README.md create mode 100644 tests/onebit/test_compressed_backend.py create mode 100644 tests/onebit/test_compressed_perf.py diff --git a/accelerator/xpu_accelerator.py b/accelerator/xpu_accelerator.py index 3fed89d7200f..dc3d7a9e473f 100644 --- a/accelerator/xpu_accelerator.py +++ b/accelerator/xpu_accelerator.py @@ -267,9 +267,9 @@ def get_op_builder(self, class_name): # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed # if successful this also means we're doing a local install and not JIT compile path from op_builder import __deepspeed__ # noqa: F401 # type: ignore - from op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder + from op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder, PackbitsBuilder except ImportError: - from deepspeed.ops.op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder + from deepspeed.ops.op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder, PackbitsBuilder if class_name == "AsyncIOBuilder": return AsyncIOBuilder @@ -279,6 +279,8 @@ def get_op_builder(self, class_name): return CPUAdamBuilder elif class_name == "FusedAdamBuilder": return FusedAdamBuilder + elif class_name == "PackbitsBuilder": + return PackbitsBuilder else: return None diff --git a/csrc/xpu/packbits/packing.cpp b/csrc/xpu/packbits/packing.cpp new file mode 100644 index 000000000000..fb09bb16efdd --- /dev/null +++ b/csrc/xpu/packbits/packing.cpp @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. +// SPDX-License-Identifier: Apache-2.0 + +// DeepSpeed Team + +#include +#include +#include +#include + +using namespace sycl; +using namespace xpu; + +void packbitskernel(const float* input, uint8_t* output, const int input_size, id<1> item_ct1) +{ + // get the sign bit of each float and pack them into byte + int i = item_ct1; + for (int j = 0; j < 8; ++j) { + int k = i * 8 + j; + int bit = k < input_size && (!sycl::signbit(input[k])); + output[i] |= bit << (7 - j); + } +} + +void unpackbitskernel(const uint8_t* input, float* output, id<1> item_ct1) +{ + // use the bit value to set float, bit 0 -> float -1, bit 1 -> float 1 + int i = item_ct1; + output[i] = (float((input[i / 8] >> (7 - i % 8)) & 1) - 0.5) * 2; +} + +sycl::queue get_current_queue(at::Device device) +{ + c10::impl::VirtualGuardImpl impl(device.type()); + c10::Stream _stream = impl.getStreamFromGlobalPool(device, /*isHighPriority=*/false); + sycl::queue queue = xpu::get_queue_from_stream(_stream); + return queue; +} + +/* +pack float tensor into uint8 tensor. Every eight float elements get packed into one uint8 +if float x >= 0, will be packed as a '1' bit, or will be packed as '0' +Arguments: + tensor: A bool tensor that get packed. + input_size: numel of input tensor + rank: device id in order to get corresponding stream +*/ +at::Tensor packbits(at::Tensor tensor, int input_size, int rank) +{ + at::Device device = "xpu:" + std::to_string(rank); + sycl::queue q = get_current_queue(device); + + int packed_size = (input_size + 7) / 8; + auto unit8_options = at::TensorOptions().dtype(at::kByte).device(at::kXPU); + at::Tensor packed = torch::zeros({packed_size}, unit8_options); + + float* input = (float*)tensor.data_ptr(); + uint8_t* output = (uint8_t*)packed.data_ptr(); + + auto event = q.submit([&](sycl::handler& cgh) { + cgh.parallel_for<>(range(packed_size), [=](id<1> item_ct1) { + packbitskernel(input, output, input_size, item_ct1); + }); + }); + + return packed; +} + +/* +unpack uint8 tensor into float tensor. Every uint8 element get unpacked into eight float +a '1' bit will be converted to a float(1), a '0' bit will be converted to a float(-1). +Arguments: + tensor: A uint8 tensor that get unpacked. + input_size: numel of input tensor + rank: device id in order to get corresponding stream +*/ +at::Tensor unpackbits(at::Tensor tensor, int input_size, int rank) +{ + at::Device device = "xpu:" + std::to_string(rank); + sycl::queue q = get_current_queue(device); + + auto float_options = at::TensorOptions().dtype(at::kFloat).device(at::kXPU); + at::Tensor unpacked = torch::empty({input_size * 8}, float_options); + + uint8_t* input = (uint8_t*)tensor.data_ptr(); + float* output = (float*)unpacked.data_ptr(); + + auto event = q.submit([&](sycl::handler& cgh) { + cgh.parallel_for<>(range(input_size * 8), + [=](id<1> item_ct1) { unpackbitskernel(input, output, item_ct1); }); + }); + + return unpacked; +} + +PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) +{ + m.def("packbits", &packbits, "DeepSpeed XPU packbits (C++)"); + m.def("unpackbits", &unpackbits, "DeepSpeed XPU unpackbits (C++)"); +} diff --git a/deepspeed/runtime/comm/compressed.py b/deepspeed/runtime/comm/compressed.py new file mode 100644 index 000000000000..7f8c7395451d --- /dev/null +++ b/deepspeed/runtime/comm/compressed.py @@ -0,0 +1,137 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team + +import numpy as np +import torch +import deepspeed.comm as dist +from deepspeed.accelerator import get_accelerator +from deepspeed.ops.op_builder import PackbitsBuilder + + +class CompressedBackend(object): + + def __init__(self, mpu=None): + if mpu is None: + self.world_group = dist.new_group(ranks=range(dist.get_world_size())) + else: + self.mpu = mpu + self.world_group = self.mpu.get_data_parallel_group() + self.size = dist.get_world_size(group=self.world_group) + self.rank = dist.get_rank(group=self.world_group) + self.packer = PackbitsBuilder().load() + + def my_igather(self, rank, size, group, sendbuf, recvbuf, root): + req = [] + if rank == root: + for idx in range(size): + if idx != rank: + req.append(dist.irecv(recvbuf[idx], src=idx, group=group)) + else: + recvbuf[rank] = sendbuf + else: + req.append(dist.isend(sendbuf, group=group, dst=root)) + return req + + def my_gather(self, rank, size, group, sendbuf, recvbuf, root): + if rank == root: + for idx in range(size): + if idx != rank: + dist.recv(recvbuf[idx], src=idx, group=group) + else: + recvbuf[rank] = sendbuf + else: + dist.send(sendbuf, group=group, dst=root) + + def pack(self, buffer, size): + # pack float tensor into uint8 tensor + packed = self.packer.packbits(buffer.float(), buffer.numel(), self.rank) + return packed.reshape(size, -1) + + def unpack(self, buffer, size, dtype): + # unpack uint8 to float tensor + unpacked = self.packer.unpackbits(buffer, buffer.numel(), self.rank) + return unpacked.reshape(size, -1).to(dtype) + + def compressed_allreduce(self, buffer_m: torch.tensor, worker_error, server_error, local_rank): + original_shape = buffer_m.size() + if len(original_shape) > 1: + buffer_m = torch.flatten(buffer_m) + + # align size of original_buffer and error + original_size = buffer_m.numel() + worker_error_size = worker_error.numel() + if original_size != worker_error_size: + empty_tensor = torch.zeros(worker_error_size - original_size, device=buffer_m.device) + buffer_m = torch.cat([buffer_m, empty_tensor]) + + buffer_m.add_(worker_error) + worker_scale = torch.linalg.norm(buffer_m) / np.sqrt(torch.numel(buffer_m)) + + worker_error.set_(buffer_m - worker_scale * buffer_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0)) + + sign_list_packed_tmp = self.pack(buffer_m, self.size).type(torch.int8) + + recvbuf_sign = torch.zeros([self.size, len(sign_list_packed_tmp[self.rank])], + dtype=sign_list_packed_tmp[0].dtype, + device=sign_list_packed_tmp.device) + + sign_list_packed = [sign_list_packed_tmp[idx] for idx in range(self.size)] + + recvbuf_scale = [ + torch.zeros(1, dtype=worker_scale.dtype, device=get_accelerator().current_device_name()) + for _ in range(self.size) + ] + + # communication phase 1 + # all to all for sign + dist.all_to_all_single(recvbuf_sign, torch.stack(sign_list_packed), group=self.world_group) + # all gather for scale + dist.all_gather(recvbuf_scale, worker_scale, group=self.world_group) + + flattened_recvbuf_sign = recvbuf_sign.type(torch.uint8).flatten() + compensated_server_m = self.unpack(flattened_recvbuf_sign, self.size, torch.float32) \ + .mul_(torch.stack(recvbuf_scale).mul_(1 / self.size)).sum(0) + + compensated_server_m.add_(server_error) + + server_scale = torch.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel()) + + server_error.set_(compensated_server_m - + server_scale * compensated_server_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0)) + + server_sign_packed = self.pack(compensated_server_m, 1).type(torch.int8) + + # recvbuf_sign_server + recvbuf_sign_server_tmp = torch.zeros([self.size, len(server_sign_packed[0])], + dtype=recvbuf_sign.dtype, + device=server_sign_packed.device) + + recvbuf_sign_server = [recvbuf_sign_server_tmp[idx] for idx in range(self.size)] + + # recvbuf_scale_server + recvbuf_scale_server_tmp = torch.zeros([self.size, 1], + dtype=worker_scale.dtype, + device=server_sign_packed.device) + + recvbuf_scale_server = [recvbuf_scale_server_tmp[idx] for idx in range(self.size)] + + # communication Phase 2 + dist.all_gather(recvbuf_sign_server, server_sign_packed[0], group=self.world_group) + dist.all_gather(recvbuf_scale_server, server_scale, group=self.world_group) + + recvbuf_sign_server = torch.stack(recvbuf_sign_server) + + flattened_recvbuf_sign_server = recvbuf_sign_server.type(torch.uint8).flatten() + + buffer_m.data.copy_( + self.unpack(flattened_recvbuf_sign_server, self.size, + torch.float32).mul_(recvbuf_scale_server_tmp).flatten().data) + + if original_size != worker_error_size: + buffer_m = buffer_m[0:original_size] + if len(original_shape) > 1: + buffer_m = buffer_m.reshape(original_shape) + + return buffer_m diff --git a/deepspeed/runtime/fp16/onebit/adam.py b/deepspeed/runtime/fp16/onebit/adam.py index f8a50393ac5d..fa817573f734 100644 --- a/deepspeed/runtime/fp16/onebit/adam.py +++ b/deepspeed/runtime/fp16/onebit/adam.py @@ -101,6 +101,10 @@ def __init__(self, from deepspeed.runtime.comm.hccl import HcclBackend self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') self.comm_backend_handle = HcclBackend(self.deepspeed.mpu) + elif self.comm_backend_name == 'compressed': + from deepspeed.runtime.comm.compressed import CompressedBackend + self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') + self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu) self.size = self.comm_backend_handle.size self.divider = int(self.size * 8 / np.gcd(self.size, 8)) diff --git a/deepspeed/runtime/fp16/onebit/lamb.py b/deepspeed/runtime/fp16/onebit/lamb.py index 0f70782fd3ff..89b6f40a308c 100644 --- a/deepspeed/runtime/fp16/onebit/lamb.py +++ b/deepspeed/runtime/fp16/onebit/lamb.py @@ -123,6 +123,10 @@ def __init__(self, from deepspeed.runtime.comm.hccl import HcclBackend self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') self.comm_backend_handle = HcclBackend(self.deepspeed.mpu) + elif self.comm_backend_name == 'compressed': + from deepspeed.runtime.comm.compressed import CompressedBackend + self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') + self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu) self.size = self.comm_backend_handle.size diff --git a/deepspeed/runtime/fp16/onebit/zoadam.py b/deepspeed/runtime/fp16/onebit/zoadam.py index bd75ccd4f7a0..803bd929742d 100644 --- a/deepspeed/runtime/fp16/onebit/zoadam.py +++ b/deepspeed/runtime/fp16/onebit/zoadam.py @@ -114,6 +114,10 @@ def __init__(self, from deepspeed.runtime.comm.hccl import HcclBackend self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') self.comm_backend_handle = HcclBackend(self.deepspeed.mpu) + elif self.comm_backend_name == 'compressed': + from deepspeed.runtime.comm.compressed import CompressedBackend + self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce') + self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu) self.size = self.comm_backend_handle.size self.divider = int(self.size * 8 / np.gcd(self.size, 8)) diff --git a/op_builder/xpu/__init__.py b/op_builder/xpu/__init__.py index 2815f164e5f2..bf82e4248338 100755 --- a/op_builder/xpu/__init__.py +++ b/op_builder/xpu/__init__.py @@ -7,3 +7,4 @@ from .cpu_adagrad import CPUAdagradBuilder from .fused_adam import FusedAdamBuilder from .async_io import AsyncIOBuilder +from .packbits import PackbitsBuilder diff --git a/op_builder/xpu/packbits.py b/op_builder/xpu/packbits.py new file mode 100644 index 000000000000..cf5b5ebc59e4 --- /dev/null +++ b/op_builder/xpu/packbits.py @@ -0,0 +1,26 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team +from .builder import SYCLOpBuilder + + +class PackbitsBuilder(SYCLOpBuilder): + BUILD_VAR = "DS_BUILD_PACK_BITS" + NAME = "pack_bits" + + def __init__(self): + super().__init__(name=self.NAME) + + def absolute_name(self): + return f'deepspeed.ops.{self.NAME}_op' + + def sources(self): + return ['csrc/xpu/packbits/packing.cpp'] + + def include_paths(self): + return ['csrc/xpu/includes'] + + def cxx_args(self): + args = super().cxx_args() + return args + self.version_dependent_macros() diff --git a/tests/onebit/README.md b/tests/onebit/README.md new file mode 100644 index 000000000000..d62c25421d00 --- /dev/null +++ b/tests/onebit/README.md @@ -0,0 +1,31 @@ +# One-Bit tests + +In this folder, you can test the functionality and performance of different backend for doing compressed allreduce, which is the main algorithm in one-bit optimizers like [One-Bit Adam](https://www.deepspeed.ai/tutorials/onebit-adam/), [One-Bit Lamb](https://www.deepspeed.ai/tutorials/onebit-lamb/) and [Zero-One Adam](https://www.deepspeed.ai/tutorials/zero-one-adam/). + +## How to run + +### NCCL and MPI backend + +Basically it requires your environment have relative communication backend installed, the NCCL backend of PyTorch distributed or Message Passing Interface (MPI) like MVAPICH2-GDR and OpenMPI. [Detailed Pre-requisites](https://www.deepspeed.ai/tutorials/zero-one-adam/#12-pre-requisites-for-01-adam). + +To test accuracy and performance of NCCL backend: +```bash +python test_nccl_backend.py +python test_nccl_perf.py +``` +Similarly, for MPI backend: +```bash +python test_mpi_backend.py +python test_mpi_perf.py +``` + +### Compressed backend + +This backend provides an approach to abstract the generic part of one-bit optimizers and implements accelerator dependent part with DeepSpeed custom op builder. To use this `CompressedBackend` and test it, you should make sure that your current accelerator supports `PackbitsBuilder`, so that it could be loaded to do high performance packing and unpacking between float and Byte datatype. +An example can be found in `Deepspeed/op_builder/xpu/packbits.py`. + +The test usage is same as others: +```bash +python test_compressed_backend.py +python test_compressed_perf.py +``` diff --git a/tests/onebit/test_compressed_backend.py b/tests/onebit/test_compressed_backend.py new file mode 100644 index 000000000000..f6919a09a54b --- /dev/null +++ b/tests/onebit/test_compressed_backend.py @@ -0,0 +1,96 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team + +import torch +import deepspeed.comm as dist +import numpy as np +import argparse +import deepspeed +import os + +from deepspeed.runtime.comm.compressed import CompressedBackend +from deepspeed.accelerator import get_accelerator + +parser = argparse.ArgumentParser() +parser.add_argument('--local_rank', type=int, default=-1) +args = parser.parse_args() + +deepspeed.init_distributed(dist_backend=get_accelerator().communication_backend_name()) +args.local_rank = int(os.environ['LOCAL_RANK']) + +get_accelerator().set_device(args.local_rank) +device = torch.device(get_accelerator().device_name(), args.local_rank) + +size = dist.get_world_size() +rank = dist.get_rank() + +backend = CompressedBackend() +local_rank = args.local_rank + + +# A simulated compression function using deepspeed.comm +def torch_sim(a): + a_sign = a.sign().add_(1).bool().float().add_(-0.5).mul_(2.0) + scale = a.norm() / np.sqrt(a.numel()) + a_compressed = scale * a_sign + a_sign = None + worker_error = a - a_compressed + dist.all_reduce(a_compressed) + a_compressed.mul_(1 / dist.get_world_size()) + a_server_sign = a_compressed.sign().add_(1).bool().float().add_(-0.5).mul_(2.0) + a_list = torch.chunk(a_compressed, chunks=dist.get_world_size()) + server_scale = [chunk_a.norm() / np.sqrt(chunk_a.numel()) for chunk_a in a_list] + a_sign_list = torch.chunk(a_server_sign, dist.get_world_size()) + a_server_compressed = torch.cat([server_scale[i] * a_sign_list[i] for i in range(dist.get_world_size())]) + rank = dist.get_rank() + server_error = a_list[rank] - server_scale[rank] * a_sign_list[rank] + get_accelerator().synchronize() + dist.barrier() + return a_server_compressed, worker_error, server_error + + +tensor_size = 300 * 2**20 +server_size = int(tensor_size / size) +if tensor_size % (8 * size) != 0: + right_tensor_size = tensor_size + (8 * size - (tensor_size % (8 * size))) +else: + right_tensor_size = tensor_size +right_server_size = right_tensor_size // size + +# Adding bias to the initialization of the gradient we are communicating +# In order to get rid of the case where some elements in the gradient are too small +a = (torch.rand(tensor_size, device=device) - 0.5) + 0.01 * rank + +worker_error = torch.zeros(right_tensor_size, device=device) +server_error = torch.zeros(right_server_size, device=device) + +a_torch, worker_error_torch, server_error_torch = torch_sim(a) +get_accelerator().empty_cache() + +a_after = backend.compressed_allreduce(a, worker_error, server_error, local_rank) + +print(a_torch.cpu()) +print(a_after.cpu()) + +threshold = 1e-6 +magnitude_threshold = 1e-6 +diff_mask = (a_after - a_torch) > threshold +diff_server_mask = torch.chunk(diff_mask, size)[rank] +mpi_server = torch.chunk(a_after, size)[rank] + server_error +torch_server = torch.chunk(a_torch, size)[rank] + server_error_torch + +test_correctness = True + +# If the number in the compensated_server_m is too small (e.g 1e-8), then calling sign() might be problematic +# The test would skip those numbers that are too small in compensated_server_m +if test_correctness: + if torch.sum(diff_server_mask) == 0: + print('Successfully passed the test for Compressed Backend at Rank {}'.format(rank)) + else: + check_mag_mask = mpi_server[diff_server_mask] > magnitude_threshold + if torch.sum(check_mag_mask) == 0: + print('Successfully passed the test for Compressed Backend at Rank {}'.format(rank)) + else: + print('Fails at {} of positions'.format(torch.sum(check_mag_mask))) diff --git a/tests/onebit/test_compressed_perf.py b/tests/onebit/test_compressed_perf.py new file mode 100644 index 000000000000..a686af0f6b8d --- /dev/null +++ b/tests/onebit/test_compressed_perf.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team + +import torch +import deepspeed.comm as dist +import numpy as np +import argparse +import deepspeed +import os + +from deepspeed.runtime.comm.compressed import CompressedBackend +from deepspeed.utils.timer import SynchronizedWallClockTimer +from deepspeed.accelerator import get_accelerator +from statistics import mean + +timers = SynchronizedWallClockTimer() + +parser = argparse.ArgumentParser() +parser.add_argument('--local_rank', type=int, default=-1) +args = parser.parse_args() + +deepspeed.init_distributed(dist_backend=get_accelerator().communication_backend_name()) +args.local_rank = int(os.environ['LOCAL_RANK']) + +get_accelerator().set_device(args.local_rank) +device = torch.device(get_accelerator().device_name(), args.local_rank) + +size = dist.get_world_size() +rank = dist.get_rank() + +backend = CompressedBackend() +local_rank = args.local_rank + +# Setting tensor_size (BERT-Large) +tensor_size = 300 * 2**20 +server_size = int(tensor_size / size) +if tensor_size % (8 * size) != 0: + right_tensor_size = tensor_size + (8 * size - (tensor_size % (8 * size))) +else: + right_tensor_size = tensor_size +right_server_size = right_tensor_size // size + +# Adding bias to the initialization of the gradient we are communicating +# In order to get rid of the case where some elements in the gradient are too small +a = (torch.rand(tensor_size, device=device) - 0.5) + 0.01 * rank + +worker_error = torch.zeros(right_tensor_size, device=device) +server_error = torch.zeros(right_server_size, device=device) + +warmup = 10 +iters = 10 + +# Warmup +for i in range(warmup): + backend.compressed_allreduce(a, worker_error, server_error, local_rank) + +time_list = [] + +a_sign = a.sign().add_(1).bool().float().add_(-0.5).mul_(2.0) +scale = a.norm() / np.sqrt(a.numel()) +a_compressed = scale * a_sign + +print("Shape of the compressed buffer:", a_compressed.shape) if rank == 0 else None + +for i in range(iters): + timers('compressed_allreduce').start() + backend.compressed_allreduce(a, worker_error, server_error, local_rank) + #deepspeed.comm.all_reduce(a_compressed) + timers('compressed_allreduce').stop() + time_list.append(timers('compressed_allreduce').elapsed()) + +#timer_names = ['compressed_allreduce'] +#timers.log(names=timer_names, normalizer=1, memory_breakdown=None) + +places = 2 +convert = 1e3 +float_size = 4 + +if rank == 0: + for i in range(iters): + lat = time_list[i] + print("latency = ", lat * convert) + +minlat = round(min(time_list) * convert) +maxlat = round(max(time_list) * convert) +meanlat = round(mean(time_list) * convert, places) +print("min, max, and mean = {} ms, {} ms, {} ms".format(minlat, maxlat, meanlat)) if rank == 0 else None +#print("tensor shape", a.shape) +duration = meanlat / 1e3 +tput = ((tensor_size * 4) / duration) +print("algo throughput: %f Bytes/s, %f GB/s" % (tput, tput / 1e9)) if rank == 0 else None +size = tensor_size * 4 +n = dist.get_world_size() +busbw = (size / duration) * (2 * (n - 1) / n) +print("busbw: %f GB/s" % (busbw / 1e9)) if rank == 0 else None