Skip to content

Commit

Permalink
Merge branch 'master' into mrwyattii/pydantic-2-support
Browse files Browse the repository at this point in the history
  • Loading branch information
adk9 authored Jun 6, 2024
2 parents 75640e3 + 11a62a0 commit d72db03
Show file tree
Hide file tree
Showing 14 changed files with 525 additions and 17 deletions.
6 changes: 4 additions & 2 deletions accelerator/xpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ def get_op_builder(self, class_name):
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed
# if successful this also means we're doing a local install and not JIT compile path
from op_builder import __deepspeed__ # noqa: F401 # type: ignore
from op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder
from op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder, PackbitsBuilder
except ImportError:
from deepspeed.ops.op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder
from deepspeed.ops.op_builder.xpu import CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, AsyncIOBuilder, PackbitsBuilder

if class_name == "AsyncIOBuilder":
return AsyncIOBuilder
Expand All @@ -279,6 +279,8 @@ def get_op_builder(self, class_name):
return CPUAdamBuilder
elif class_name == "FusedAdamBuilder":
return FusedAdamBuilder
elif class_name == "PackbitsBuilder":
return PackbitsBuilder
else:
return None

Expand Down
100 changes: 100 additions & 0 deletions csrc/xpu/packbits/packing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation.
// SPDX-License-Identifier: Apache-2.0

// DeepSpeed Team

#include <ipex.h>
#include <torch/extension.h>
#include <iostream>
#include <sycl/sycl.hpp>

using namespace sycl;
using namespace xpu;

void packbitskernel(const float* input, uint8_t* output, const int input_size, id<1> item_ct1)
{
// get the sign bit of each float and pack them into byte
int i = item_ct1;
for (int j = 0; j < 8; ++j) {
int k = i * 8 + j;
int bit = k < input_size && (!sycl::signbit(input[k]));
output[i] |= bit << (7 - j);
}
}

void unpackbitskernel(const uint8_t* input, float* output, id<1> item_ct1)
{
// use the bit value to set float, bit 0 -> float -1, bit 1 -> float 1
int i = item_ct1;
output[i] = (float((input[i / 8] >> (7 - i % 8)) & 1) - 0.5) * 2;
}

sycl::queue get_current_queue(at::Device device)
{
c10::impl::VirtualGuardImpl impl(device.type());
c10::Stream _stream = impl.getStreamFromGlobalPool(device, /*isHighPriority=*/false);
sycl::queue queue = xpu::get_queue_from_stream(_stream);
return queue;
}

/*
pack float tensor into uint8 tensor. Every eight float elements get packed into one uint8
if float x >= 0, will be packed as a '1' bit, or will be packed as '0'
Arguments:
tensor: A bool tensor that get packed.
input_size: numel of input tensor
rank: device id in order to get corresponding stream
*/
at::Tensor packbits(at::Tensor tensor, int input_size, int rank)
{
at::Device device = "xpu:" + std::to_string(rank);
sycl::queue q = get_current_queue(device);

int packed_size = (input_size + 7) / 8;
auto unit8_options = at::TensorOptions().dtype(at::kByte).device(at::kXPU);
at::Tensor packed = torch::zeros({packed_size}, unit8_options);

float* input = (float*)tensor.data_ptr();
uint8_t* output = (uint8_t*)packed.data_ptr();

auto event = q.submit([&](sycl::handler& cgh) {
cgh.parallel_for<>(range(packed_size), [=](id<1> item_ct1) {
packbitskernel(input, output, input_size, item_ct1);
});
});

return packed;
}

/*
unpack uint8 tensor into float tensor. Every uint8 element get unpacked into eight float
a '1' bit will be converted to a float(1), a '0' bit will be converted to a float(-1).
Arguments:
tensor: A uint8 tensor that get unpacked.
input_size: numel of input tensor
rank: device id in order to get corresponding stream
*/
at::Tensor unpackbits(at::Tensor tensor, int input_size, int rank)
{
at::Device device = "xpu:" + std::to_string(rank);
sycl::queue q = get_current_queue(device);

auto float_options = at::TensorOptions().dtype(at::kFloat).device(at::kXPU);
at::Tensor unpacked = torch::empty({input_size * 8}, float_options);

uint8_t* input = (uint8_t*)tensor.data_ptr();
float* output = (float*)unpacked.data_ptr();

auto event = q.submit([&](sycl::handler& cgh) {
cgh.parallel_for<>(range(input_size * 8),
[=](id<1> item_ct1) { unpackbitskernel(input, output, item_ct1); });
});

return unpacked;
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
{
m.def("packbits", &packbits, "DeepSpeed XPU packbits (C++)");
m.def("unpackbits", &unpackbits, "DeepSpeed XPU unpackbits (C++)");
}
18 changes: 9 additions & 9 deletions deepspeed/inference/quantization/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

device = get_accelerator().device_name() if get_accelerator().is_available() else 'cpu'

quantizer_cuda_module = None
quantizer_module = None


def get_quantizer_cuda_module():
global quantizer_cuda_module
if quantizer_cuda_module is None:
quantizer_cuda_module = deepspeed.ops.op_builder.QuantizerBuilder().load()
return quantizer_cuda_module
def get_quantizer_module():
global quantizer_module
if quantizer_module is None:
quantizer_module = deepspeed.ops.op_builder.QuantizerBuilder().load()
return quantizer_module


def tensor_clamp(tensor: Tensor, min, max) -> Tensor:
Expand Down Expand Up @@ -107,19 +107,19 @@ def dequantize(self, tensor: Tensor, quant_scale: Tensor, quant_min: Tensor) ->
if self.config['group_size'] % 8 == 0 and \
(self.config['num_bits'] == 4 or self.config['num_bits'] == 8) and \
self.config['group_dim'] == len(tensor.shape) - 1 and \
self.dtype == torch.float16 and device == 'cuda':
self.dtype == torch.float16 and device == get_accelerator().device_name():

last_dimension_size = self.config['group_size']
if self.config['num_bits'] == 4:
last_dimension_size = last_dimension_size // 2
quantized_tensor = get_quantizer_cuda_module().dequantize_int4_to_half_experimental(
quantized_tensor = get_quantizer_module().dequantize_int4_to_half_experimental(
tensor.reshape(-1, last_dimension_size), quant_scale, quant_min,
tensor.numel() // last_dimension_size, self.config['group_size'])
shape = list(tensor.shape)
shape[-1] = shape[-1] * 2
elif self.config['num_bits'] == 8:
# last_dimension_size = last_dimension_size // 2
quantized_tensor = get_quantizer_cuda_module().dequantize_int8_to_half_experimental(
quantized_tensor = get_quantizer_module().dequantize_int8_to_half_experimental(
tensor.reshape(-1, last_dimension_size), quant_scale, quant_min,
tensor.numel() // last_dimension_size, self.config['group_size'])
shape = list(tensor.shape)
Expand Down
137 changes: 137 additions & 0 deletions deepspeed/runtime/comm/compressed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import numpy as np
import torch
import deepspeed.comm as dist
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder import PackbitsBuilder


class CompressedBackend(object):

def __init__(self, mpu=None):
if mpu is None:
self.world_group = dist.new_group(ranks=range(dist.get_world_size()))
else:
self.mpu = mpu
self.world_group = self.mpu.get_data_parallel_group()
self.size = dist.get_world_size(group=self.world_group)
self.rank = dist.get_rank(group=self.world_group)
self.packer = PackbitsBuilder().load()

def my_igather(self, rank, size, group, sendbuf, recvbuf, root):
req = []
if rank == root:
for idx in range(size):
if idx != rank:
req.append(dist.irecv(recvbuf[idx], src=idx, group=group))
else:
recvbuf[rank] = sendbuf
else:
req.append(dist.isend(sendbuf, group=group, dst=root))
return req

def my_gather(self, rank, size, group, sendbuf, recvbuf, root):
if rank == root:
for idx in range(size):
if idx != rank:
dist.recv(recvbuf[idx], src=idx, group=group)
else:
recvbuf[rank] = sendbuf
else:
dist.send(sendbuf, group=group, dst=root)

def pack(self, buffer, size):
# pack float tensor into uint8 tensor
packed = self.packer.packbits(buffer.float(), buffer.numel(), self.rank)
return packed.reshape(size, -1)

def unpack(self, buffer, size, dtype):
# unpack uint8 to float tensor
unpacked = self.packer.unpackbits(buffer, buffer.numel(), self.rank)
return unpacked.reshape(size, -1).to(dtype)

def compressed_allreduce(self, buffer_m: torch.tensor, worker_error, server_error, local_rank):
original_shape = buffer_m.size()
if len(original_shape) > 1:
buffer_m = torch.flatten(buffer_m)

# align size of original_buffer and error
original_size = buffer_m.numel()
worker_error_size = worker_error.numel()
if original_size != worker_error_size:
empty_tensor = torch.zeros(worker_error_size - original_size, device=buffer_m.device)
buffer_m = torch.cat([buffer_m, empty_tensor])

buffer_m.add_(worker_error)
worker_scale = torch.linalg.norm(buffer_m) / np.sqrt(torch.numel(buffer_m))

worker_error.set_(buffer_m - worker_scale * buffer_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))

sign_list_packed_tmp = self.pack(buffer_m, self.size).type(torch.int8)

recvbuf_sign = torch.zeros([self.size, len(sign_list_packed_tmp[self.rank])],
dtype=sign_list_packed_tmp[0].dtype,
device=sign_list_packed_tmp.device)

sign_list_packed = [sign_list_packed_tmp[idx] for idx in range(self.size)]

recvbuf_scale = [
torch.zeros(1, dtype=worker_scale.dtype, device=get_accelerator().current_device_name())
for _ in range(self.size)
]

# communication phase 1
# all to all for sign
dist.all_to_all_single(recvbuf_sign, torch.stack(sign_list_packed), group=self.world_group)
# all gather for scale
dist.all_gather(recvbuf_scale, worker_scale, group=self.world_group)

flattened_recvbuf_sign = recvbuf_sign.type(torch.uint8).flatten()
compensated_server_m = self.unpack(flattened_recvbuf_sign, self.size, torch.float32) \
.mul_(torch.stack(recvbuf_scale).mul_(1 / self.size)).sum(0)

compensated_server_m.add_(server_error)

server_scale = torch.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())

server_error.set_(compensated_server_m -
server_scale * compensated_server_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))

server_sign_packed = self.pack(compensated_server_m, 1).type(torch.int8)

# recvbuf_sign_server
recvbuf_sign_server_tmp = torch.zeros([self.size, len(server_sign_packed[0])],
dtype=recvbuf_sign.dtype,
device=server_sign_packed.device)

recvbuf_sign_server = [recvbuf_sign_server_tmp[idx] for idx in range(self.size)]

# recvbuf_scale_server
recvbuf_scale_server_tmp = torch.zeros([self.size, 1],
dtype=worker_scale.dtype,
device=server_sign_packed.device)

recvbuf_scale_server = [recvbuf_scale_server_tmp[idx] for idx in range(self.size)]

# communication Phase 2
dist.all_gather(recvbuf_sign_server, server_sign_packed[0], group=self.world_group)
dist.all_gather(recvbuf_scale_server, server_scale, group=self.world_group)

recvbuf_sign_server = torch.stack(recvbuf_sign_server)

flattened_recvbuf_sign_server = recvbuf_sign_server.type(torch.uint8).flatten()

buffer_m.data.copy_(
self.unpack(flattened_recvbuf_sign_server, self.size,
torch.float32).mul_(recvbuf_scale_server_tmp).flatten().data)

if original_size != worker_error_size:
buffer_m = buffer_m[0:original_size]
if len(original_shape) > 1:
buffer_m = buffer_m.reshape(original_shape)

return buffer_m
14 changes: 9 additions & 5 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2407,18 +2407,22 @@ def _reduce_non_expert_gradients(self, grads, elements_per_buffer):
split_sparse_tensor_buckets, split_dense_tensor_buckets = split_half_float_double_sparse(grads)
if self.pipeline_parallelism:
dp_group = self.mpu.get_data_parallel_group()
dp_world_size = dist.get_world_size(dp_group)
else:
dp_group = groups._get_sequence_data_parallel_group()

dp_world_size = dist.get_world_size(dp_group) / float(self.sequence_parallel_size)
for _, sparse_bucket_tuple in enumerate(split_sparse_tensor_buckets):
if sparse_bucket_tuple:
bucket_type, sparse_bucket = sparse_bucket_tuple
self.sparse_allreduce_no_retain(sparse_bucket, dp_group=dp_group)
self.sparse_allreduce_no_retain(sparse_bucket, dp_group=dp_group, dp_world_size=dp_world_size)

for _, dense_bucket_tuple in enumerate(split_dense_tensor_buckets):
if dense_bucket_tuple:
bucket_type, dense_bucket = dense_bucket_tuple
self.allreduce_no_retain(dense_bucket, dp_group=dp_group, numel_per_bucket=elements_per_buffer)
self.allreduce_no_retain(dense_bucket,
dp_group=dp_group,
numel_per_bucket=elements_per_buffer,
dp_world_size=dp_world_size)

def _reduce_expert_gradients(self, expert_grads, elements_per_buffer):
# to maintain the gradients value unaffected by ep_size setting,
Expand Down Expand Up @@ -2490,9 +2494,9 @@ def sparse_allreduce(self, sparse, dp_group, dp_world_size=None):
dp_world_size = dist.get_world_size(group=dp_group)
if self.postscale_gradients():
if self.gradient_average:
values.mul_(self.gradient_predivide_factor() / (dp_world_size / float(self.sequence_parallel_size)))
values.mul_(self.gradient_predivide_factor() / (dp_world_size))
else:
values.mul_(1. / (dp_world_size / float(self.sequence_parallel_size)))
values.mul_(1. / (dp_world_size))

indices_device_list = self.sparse_all_gather(indices, dp_group)
values_device_list = self.sparse_all_gather(values, dp_group)
Expand Down
4 changes: 4 additions & 0 deletions deepspeed/runtime/fp16/onebit/adam.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def __init__(self,
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)
elif self.comm_backend_name == 'compressed':
from deepspeed.runtime.comm.compressed import CompressedBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu)
self.size = self.comm_backend_handle.size

self.divider = int(self.size * 8 / np.gcd(self.size, 8))
Expand Down
4 changes: 4 additions & 0 deletions deepspeed/runtime/fp16/onebit/lamb.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def __init__(self,
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)
elif self.comm_backend_name == 'compressed':
from deepspeed.runtime.comm.compressed import CompressedBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu)

self.size = self.comm_backend_handle.size

Expand Down
4 changes: 4 additions & 0 deletions deepspeed/runtime/fp16/onebit/zoadam.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ def __init__(self,
from deepspeed.runtime.comm.hccl import HcclBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = HcclBackend(self.deepspeed.mpu)
elif self.comm_backend_name == 'compressed':
from deepspeed.runtime.comm.compressed import CompressedBackend
self.using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
self.comm_backend_handle = CompressedBackend(self.deepspeed.mpu)
self.size = self.comm_backend_handle.size

self.divider = int(self.size * 8 / np.gcd(self.size, 8))
Expand Down
4 changes: 3 additions & 1 deletion deepspeed/runtime/zero/stage_1_and_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2433,7 +2433,9 @@ def estimate_zero2_model_states_mem_needs(total_params,
gpu_mem = 2 * total_params
cpu_mem = total_params * max(4 * total_gpus, 16) * additional_buffer_factor
else:
gpu_mem = 4 * total_params + int(16 * total_params / total_gpus)
# GPU's total_params multipliers: 2 = params_16bit,
# 18 = 2_grads_16bit + 4_grads_32bit + 4_params_32bit + 8_optimizer_states_32bit(momentum and variance)
gpu_mem = 2 * total_params + int(18 * total_params / total_gpus)
cpu_mem = total_params * 4 * num_gpus_per_node * additional_buffer_factor

return int(cpu_mem), int(gpu_mem)
Expand Down
1 change: 1 addition & 0 deletions op_builder/xpu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
from .cpu_adagrad import CPUAdagradBuilder
from .fused_adam import FusedAdamBuilder
from .async_io import AsyncIOBuilder
from .packbits import PackbitsBuilder
Loading

0 comments on commit d72db03

Please sign in to comment.