From 06731a009e47a7be9eeceef92f380981b7a96510 Mon Sep 17 00:00:00 2001 From: jiangzishan Date: Wed, 25 Sep 2024 11:52:02 +0800 Subject: [PATCH] remove amd backend; optimize code structure, improve robustness. --- byte_micro_perf/backends/AMD/backend_amd.py | 302 ---------------- byte_micro_perf/backends/AMD/custom_ops.py | 63 ---- byte_micro_perf/backends/AMD/requirements.txt | 2 - byte_micro_perf/backends/GPU/backend_gpu.py | 341 ++++++++---------- byte_micro_perf/backends/GPU/requirements.txt | 5 +- byte_micro_perf/backends/backend.py | 297 +++++++++------ byte_micro_perf/backends/module_store.py | 21 +- byte_micro_perf/core/perf_engine.py | 106 ++++-- byte_micro_perf/launch.py | 37 +- byte_micro_perf/workloads/gemm.json | 15 +- 10 files changed, 465 insertions(+), 724 deletions(-) delete mode 100644 byte_micro_perf/backends/AMD/backend_amd.py delete mode 100644 byte_micro_perf/backends/AMD/custom_ops.py delete mode 100644 byte_micro_perf/backends/AMD/requirements.txt diff --git a/byte_micro_perf/backends/AMD/backend_amd.py b/byte_micro_perf/backends/AMD/backend_amd.py deleted file mode 100644 index 57b8bf62..00000000 --- a/byte_micro_perf/backends/AMD/backend_amd.py +++ /dev/null @@ -1,302 +0,0 @@ -# Copyright 2023 ByteDance and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import math -import os -from datetime import timedelta -from typing import Any, Dict, List - -import torch -import torch.distributed as dist -import torch.distributed.distributed_c10d as dist_c10d - -from backends.backend import Backend -from backends.module_store import * -from backends.utils import get_dtype_bytes - -from .custom_ops import GPUGemmOp, GPUBatchGemmOp, GPUGroupGemmOp - - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger("PerfEngine") - - -class BackendAMD(Backend): - - def get_device_count(self): - return torch.cuda.device_count() - - def set_device(self, device_index): - torch.cuda.set_device(device_index) - - def get_device(self): - return torch.cuda.current_device() - - def all_gather_object(self, obj): - gather_object_list = [None for _ in range(self.world_size)] - dist.all_gather_object( - object_list=gather_object_list, - obj=obj, - group=self.group - ) - return gather_object_list - - - def get_device_name(self): - return torch.cuda.get_device_name(0) - - def get_backend_properties(self): - self.memory_limit = int( - torch.cuda.get_device_properties(0).total_memory / (1024**3) - ) - - if self.vendor_path is not None and os.path.exists(self.vendor_path) and (self.vendor_path).endswith(".json"): - with open(self.vendor_path, "r") as f: - self.hw_info_dict = json.load(f) - # if the vendor path does not exist, please set this param manaually - self.bandwidth_limit = self.hw_info_dict["内存参数"]["内存"]["内存带宽(GB/s)"] - else: - log.warning( - "Vendor_path: [ {} ] was not found or not a full path points to json, please check your path!!! Otherwise, please set the hardware info manaually.".format( - self.vendor_path - ) - ) - - - # device/host ops - def host2device(self): - self.op = Host2DeviceOp(torch.device("cuda")) - - def device2host(self): - self.op = Device2HostOp() - - - # communication ops - def allreduce(self): - self.op = AllReduceOp(self.group) - - def allgather(self): - self.op = AllGatherOp(self.group) - - def reducescatter(self): - self.op = ReduceScatterOp(self.group) - - def alltoall(self): - self.op = AllToAllOp(self.group) - - def broadcast(self): - self.op = BroadcastOp(self.group) - - def p2p(self): - self.op = P2POp(self.group, self.ranks, self.rank) - - # compute ops - # unary ops - def sin(self): - self.op = SinOp() - - def cos(self): - self.op = CosOp() - - def exp(self): - self.op = ExpOp() - - def exponential(self): - self.op = ExponentialOp() - - def silu(self): - self.op = SiluOp() - - def gelu(self): - self.op = GeluOp() - - def swiglu(self): - self.op = SwiGLUOp() - - def cast(self): - self.op = CastOp() - - - # binary ops - def add(self): - self.op = AddOp() - - def mul(self): - self.op = MulOp() - - def sub(self): - self.op = SubOp() - - def div(self): - self.op = DivOp() - - - # reduce ops - def layernorm(self): - self.op = LayerNormOp() - - def softmax(self): - self.op = SoftmaxOp() - - def reduce_sum(self): - self.op = ReduceSumOp() - - def reduce_min(self): - self.op = ReduceMinOp() - - def reduce_max(self): - self.op = ReduceMaxOp() - - - # index ops - def index_add(self): - self.op = IndexAddOp() - - def sort(self): - self.op = SortOp() - - def unique(self): - self.op = UniqueOp() - - def scatter(self): - self.op = ScatterOp() - - def gather(self): - self.op = GatherOp() - - # gemm ops - def gemm(self): - self.op = GPUGemmOp() - - def gemv(self): - self.op = GPUGemmOp() - - def batch_gemm(self): - self.op = GPUBatchGemmOp() - - def group_gemm(self): - self.op = GPUGroupGemmOp() - - - - # create input tensors - def build_tensor(self, input_shapes, dtype): - torch.cuda.empty_cache() - torch_dtype = getattr(torch, dtype) - - # compute size of input and output tensors - if hasattr(self.op, "compute_size"): - bytes_per_cnt = self.op.compute_size(input_shapes, dtype) - # default: input_tensors_size == output_tensor_size, all tensors have same dtype - else: - dtype_size = get_dtype_bytes(dtype) - element_num = 2 * sum([math.prod(shape) for shape in input_shapes]) - bytes_per_cnt = dtype_size * element_num - - # compute max avail tensors for compute - avail_bytes = (self.memory_limit - 4) * 1024**3 - avail_cnts = avail_bytes // bytes_per_cnt - max_data_cnt = min(self.iterations, avail_cnts) - - # create input tensors for each op - input_tensors_list = [] - for _ in range(max_data_cnt): - # create input tensors - if hasattr(self.op, "custom_create_tensors"): - input_tensors = self.op.custom_create_tensors(input_shapes, torch_dtype, "cuda") - input_tensors_list.append(input_tensors) - # default: all input tensors have same dtype - else: - if torch_dtype in [torch.int8, torch.int32]: - input_tensors = [ - torch.randint(-3, 3, size=shape, dtype=torch_dtype, device="cuda") - for shape in input_shapes - ] - else: - input_tensors = [ - torch.randn(shape, dtype=torch_dtype, device="cuda") - for shape in input_shapes - ] - input_tensors_list.append(input_tensors) - if hasattr(self.op, "process_inputs"): - input_tensors_list = [ - self.op.process_inputs(*(input_tensor)) - for input_tensor in input_tensors_list - ] - return input_tensors_list, max_data_cnt, bytes_per_cnt - - - - def _run_operation(self, operation, inputs): - result = operation(*inputs) - return result - - def device_synchronize(self): - torch.cuda.synchronize() - return True - - def initialize_ccl(self, rank, world_size): - """ - initialize distributed process groups and relevant ENVs - """ - # check device_count - device_count = torch.cuda.device_count() - if world_size > device_count: - world_size = device_count - if rank >= world_size: - return False - - # set envs - os.environ["MASTER_ADDR"] = "127.0.0.1" - os.environ["MASTER_PORT"] = "49373" - os.environ["LOCAL_RANK"] = str(rank) - os.environ["RANK"] = str(rank) - os.environ["WORLD_SIZE"] = str(world_size) - - torch.cuda.set_device(rank) - - # Call the init process - timeout_seconds = int(os.environ.get("MEGATRON_NCCL_TIMEOUT_SECOND", 30)) - torch.distributed.init_process_group( - backend="nccl", - world_size=world_size, - rank=rank, - store=None, - timeout=timedelta(seconds=timeout_seconds), - ) - self.setup_2d_group() - log.warning("DIST: rank {}, world_size {}".format(rank, world_size)) - return True - - def setup_2d_group(self): - self.rank = dist.get_rank() - torch.cuda.set_device(self.rank) - origin_store_based_barrier = dist_c10d._store_based_barrier - dist_c10d._store_based_barrier = lambda *a, **kw: None - self.world_size = dist.get_world_size() - self.ranks = range(0, self.world_size) - group = dist.new_group(self.ranks) - if self.rank in self.ranks: - self.group = group - dist_c10d._store_based_barrier = origin_store_based_barrier - # wait for all ranks finish group initializing - torch.distributed.barrier() - - def destroy_process_group(self): - dist.destroy_process_group() - - def barier(self): - dist.barrier(self.group) \ No newline at end of file diff --git a/byte_micro_perf/backends/AMD/custom_ops.py b/byte_micro_perf/backends/AMD/custom_ops.py deleted file mode 100644 index 8a6ee7ce..00000000 --- a/byte_micro_perf/backends/AMD/custom_ops.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import List - -import torch - -from backends.module_store import GemmOp, BatchGemmOp, GroupGemmOp - - -# gemm(pytorch) float32/float16/bfloat16 --> float32/float16/bfloat16 -# gemm(cutlass) int8 --> int32 -class GPUGemmOp(GemmOp): - def __init__(self): - super().__init__() - - def forward( - self, - input_tensor_a : torch.Tensor, - input_tensor_b : torch.Tensor - ): - compute_dtype = input_tensor_a.dtype - if compute_dtype == torch.int8: - output_tensor = input_tensor_a - else: - output_tensor = torch.mm(input_tensor_a, input_tensor_b) - return output_tensor - - -# batch_gemm(pytorch) float32/float16/bfloat16 --> float32/float16/bfloat16 -# batch_gemm(cutlass) int8 --> int32 -class GPUBatchGemmOp(BatchGemmOp): - def __init__(self): - super().__init__() - - def forward( - self, - input_tensor_a : torch.Tensor, - input_tensor_b : torch.Tensor - ): - compute_dtype = input_tensor_a.dtype - - output_tensor = None - if compute_dtype == torch.int8: - output_tensor = input_tensor_a - else: - output_tensor = torch.bmm(input_tensor_a, input_tensor_b) - return output_tensor - - -# group_gemm(pytorch) float32/float16/bfloat16 --> float32/float16/bfloat16 -# group_gemm(cutlass) int8 --> int32 -class GPUGroupGemmOp(GroupGemmOp): - def __init__(self): - super().__init__() - - def forward(self, - a_list : List[torch.Tensor], - b_list : List[torch.Tensor] - ): - compute_dtype = a_list[0].dtype - if compute_dtype == torch.int8: - output_tensors = a_list - else: - output_tensors = [a @ b for a, b in zip(a_list, b_list)] - return output_tensors \ No newline at end of file diff --git a/byte_micro_perf/backends/AMD/requirements.txt b/byte_micro_perf/backends/AMD/requirements.txt deleted file mode 100644 index 3c7b834c..00000000 --- a/byte_micro_perf/backends/AMD/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ --i https://download.pytorch.org/whl/rocm6.1 -torch \ No newline at end of file diff --git a/byte_micro_perf/backends/GPU/backend_gpu.py b/byte_micro_perf/backends/GPU/backend_gpu.py index 1fb5baf0..d7c65ca9 100644 --- a/byte_micro_perf/backends/GPU/backend_gpu.py +++ b/byte_micro_perf/backends/GPU/backend_gpu.py @@ -23,39 +23,33 @@ import torch.distributed as dist import torch.distributed.distributed_c10d as dist_c10d +from backends import module_store from backends.backend import Backend -from backends.module_store import * from backends.utils import get_dtype_bytes -from .custom_ops import GPUGemmOp, GPUBatchGemmOp, GPUGroupGemmOp - logging.basicConfig(level=logging.INFO) log = logging.getLogger("PerfEngine") class BackendGPU(Backend): + def __init__(self, workload_dict: Dict[str, Any], vendor_path: str): + super().__init__(workload_dict, vendor_path) + def get_device_count(self): return torch.cuda.device_count() - def set_device(self, device_index): + def get_device_name(self): + return torch.cuda.get_device_name(0) + + def set_device(self, device_index : int): torch.cuda.set_device(device_index) def get_device(self): return torch.cuda.current_device() - def all_gather_object(self, obj): - gather_object_list = [None for _ in range(self.world_size)] - dist.all_gather_object( - object_list=gather_object_list, - obj=obj, - group=self.group - ) - return gather_object_list - - - def get_device_name(self): - return torch.cuda.get_device_name(0) + def device_synchronize(self): + torch.cuda.synchronize() def get_backend_properties(self): self.memory_limit = int( @@ -75,125 +69,60 @@ def get_backend_properties(self): ) - # device/host ops - def host2device(self): - self.op = Host2DeviceOp() - - def device2host(self): - self.op = Device2HostOp() - - # communication ops - def allreduce(self): - self.setup_2d_group() - self.op = AllReduceOp(self.group) - - def allgather(self): - self.setup_2d_group() - self.op = AllGatherOp(self.group) - - def reducescatter(self): - self.setup_2d_group() - self.op = ReduceScatterOp(self.group) + def initialize_ccl(self, rank, world_size): + torch.cuda.set_device(rank) - def alltoall(self): - self.setup_2d_group() - self.op = AllToAllOp(self.group) + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "49373" + os.environ["LOCAL_RANK"] = str(rank) + os.environ["RANK"] = str(rank) + os.environ["WORLD_SIZE"] = str(world_size) - def broadcast(self): - self.setup_2d_group() - self.op = BroadcastOp(self.group) + dist.init_process_group( + backend="nccl", + world_size=world_size, + rank=rank, + timeout=timedelta(seconds=1800) + ) - def p2p(self): self.setup_2d_group() - self.op = P2POp(self.group, self.ranks, self.rank) - - # compute ops - # unary ops - def sin(self): - self.op = SinOp() - - def cos(self): - self.op = CosOp() - - def exp(self): - self.op = ExpOp() - - def exponential(self): - self.op = ExponentialOp() - - def silu(self): - self.op = SiluOp() - - def gelu(self): - self.op = GeluOp() - - def swiglu(self): - self.op = SwiGLUOp() - - def cast(self): - self.op = CastOp() - - - # binary ops - def add(self): - self.op = AddOp() - - def mul(self): - self.op = MulOp() - - def sub(self): - self.op = SubOp() - - def div(self): - self.op = DivOp() - - - # reduce ops - def layernorm(self): - self.op = LayerNormOp() - - def softmax(self): - self.op = SoftmaxOp() - - def reduce_sum(self): - self.op = ReduceSumOp() - - def reduce_min(self): - self.op = ReduceMinOp() - - def reduce_max(self): - self.op = ReduceMaxOp() + return True - # index ops - def index_add(self): - self.op = IndexAddOp() + def setup_2d_group(self): + # get rank and set device + self.rank = dist.get_rank() + torch.cuda.set_device(self.rank) - def sort(self): - self.op = SortOp() + origin_store_based_barrier = dist_c10d._store_based_barrier + dist_c10d._store_based_barrier = lambda *a, **kw: None - def unique(self): - self.op = UniqueOp() + self.world_size = dist.get_world_size() + self.ranks = range(0, self.world_size) + group = dist.new_group(self.ranks) + if self.rank in self.ranks: + self.group = group - def scatter(self): - self.op = ScatterOp() - - def gather(self): - self.op = GatherOp() + dist_c10d._store_based_barrier = origin_store_based_barrier - # gemm ops - def gemm(self): - self.op = GPUGemmOp() + # wait for all ranks finish group initializing + dist.barrier() - def gemv(self): - self.op = GPUGemmOp() + def destroy_process_group(self): + dist.destroy_process_group() - def batch_gemm(self): - self.op = GPUBatchGemmOp() + def barrier(self): + dist.barrier(self.group) - def group_gemm(self): - self.op = GPUGroupGemmOp() + def all_gather_object(self, obj): + gather_object_list = [None for _ in range(self.world_size)] + dist.all_gather_object( + object_list=gather_object_list, + obj=obj, + group=self.group + ) + return gather_object_list # create input tensors @@ -211,15 +140,18 @@ def build_tensor(self, input_shapes, dtype): bytes_per_cnt = dtype_size * element_num - # avoid use L2 Cache: assume 256 MB currently - # data_per_cnt > 256 MB, only one buffer - # data_per_cnt < 256 MB, malloc multiple buffer to exceed 256MB, and use first and last buffer + # avoid use L2 Cache: assume max 1GB currently + # data_per_cnt > 1GB, use two buffers + # data_per_cnt < 1GB, malloc multiple buffer to exceed 1GB + assume_l2_cache_size = 1 * 1024**3 + assume_avail_bytes = self.memory_limit * 0.9 * 1024**3 - assume_l2_cache_size = 256 * 1024**2 - if bytes_per_cnt > self.memory_limit * 0.9 * 1024 ** 3: + if bytes_per_cnt > assume_avail_bytes: return [], 0, bytes_per_cnt - elif bytes_per_cnt > assume_l2_cache_size: + elif 2 * bytes_per_cnt > assume_avail_bytes: max_data_cnt = 1 + elif bytes_per_cnt > assume_l2_cache_size: + max_data_cnt = 2 else: max_data_cnt = math.ceil(assume_l2_cache_size / bytes_per_cnt) @@ -249,78 +181,125 @@ def build_tensor(self, input_shapes, dtype): for input_tensor in input_tensors_list ] - - if max_data_cnt > 2: - max_data_cnt = 2 - new_tensor_list = [] - new_tensor_list.append(input_tensors_list[0]) - new_tensor_list.append(input_tensors_list[-1]) - else: - new_tensor_list = input_tensors_list - - return new_tensor_list, max_data_cnt, bytes_per_cnt - + return input_tensors_list, max_data_cnt, bytes_per_cnt def _run_operation(self, operation, inputs): result = operation(*inputs) return result - def device_synchronize(self): - torch.cuda.synchronize() - return True - def initialize_ccl(self, rank, world_size): - """ - initialize distributed process groups and relevant ENVs - """ + # device/host ops + def host2device(self): + self.op = module_store.Host2DeviceOp() - # set cuda device - torch.cuda.set_device(rank) + def device2host(self): + self.op = module_store.Device2HostOp() - # set envs - os.environ["MASTER_ADDR"] = "127.0.0.1" - os.environ["MASTER_PORT"] = "49373" - os.environ["LOCAL_RANK"] = str(rank) - os.environ["RANK"] = str(rank) - os.environ["WORLD_SIZE"] = str(world_size) + # communication ops + def allreduce(self): + self.op = module_store.AllReduceOp(self.group) - # Call the init process - torch.distributed.init_process_group( - backend="nccl", - world_size=world_size, - rank=rank - ) + def allgather(self): + self.op = module_store.AllGatherOp(self.group) - # create group - self.setup_2d_group() + def reducescatter(self): + self.op = module_store.ReduceScatterOp(self.group) - log.info(f"DIST: rank {rank}, world_size {world_size}") - return True + def alltoall(self): + self.op = module_store.AllToAllOp(self.group) + def broadcast(self): + self.op = module_store.BroadcastOp(self.group) - def setup_2d_group(self): - # get rank and set device - self.rank = dist.get_rank() - torch.cuda.set_device(self.rank) + def p2p(self): + self.op = module_store.P2POp(self.group, self.ranks, self.rank) - origin_store_based_barrier = dist_c10d._store_based_barrier - dist_c10d._store_based_barrier = lambda *a, **kw: None + # compute ops + # unary ops + def sin(self): + self.op = module_store.SinOp() - self.world_size = dist.get_world_size() - self.ranks = range(0, self.world_size) - group = dist.new_group(self.ranks) - if self.rank in self.ranks: - self.group = group + def cos(self): + self.op = module_store.CosOp() - dist_c10d._store_based_barrier = origin_store_based_barrier + def exp(self): + self.op = module_store.ExpOp() - # wait for all ranks finish group initializing - torch.distributed.barrier() + def exponential(self): + self.op = module_store.ExponentialOp() + + def silu(self): + self.op = module_store.SiluOp() + + def gelu(self): + self.op = module_store.GeluOp() + + def swiglu(self): + self.op = module_store.SwiGLUOp() + + def cast(self): + self.op = module_store.CastOp() - def destroy_process_group(self): - dist.destroy_process_group() - def barier(self): - dist.barrier(self.group) \ No newline at end of file + # binary ops + def add(self): + self.op = module_store.AddOp() + + def mul(self): + self.op = module_store.MulOp() + + def sub(self): + self.op = module_store.SubOp() + + def div(self): + self.op = module_store.DivOp() + + + # reduce ops + def layernorm(self): + self.op = module_store.LayerNormOp() + + def softmax(self): + self.op = module_store.SoftmaxOp() + + def reduce_sum(self): + self.op = module_store.ReduceSumOp() + + def reduce_min(self): + self.op = module_store.ReduceMinOp() + + def reduce_max(self): + self.op = module_store.ReduceMaxOp() + + + # index ops + def index_add(self): + self.op = module_store.IndexAddOp() + + def sort(self): + self.op = module_store.SortOp() + + def unique(self): + self.op = module_store.UniqueOp() + + def scatter(self): + self.op = module_store.ScatterOp() + + def gather(self): + self.op = module_store.GatherOp() + + + # gemm ops + def gemm(self): + self.op = module_store.GemmOp() + + def gemv(self): + self.op = module_store.GemmOp() + + def batch_gemm(self): + self.op = module_store.BatchGemmOp() + + def group_gemm(self): + self.op = module_store.GroupGemmOp() diff --git a/byte_micro_perf/backends/GPU/requirements.txt b/byte_micro_perf/backends/GPU/requirements.txt index 46e955c4..44ceb261 100644 --- a/byte_micro_perf/backends/GPU/requirements.txt +++ b/byte_micro_perf/backends/GPU/requirements.txt @@ -1,3 +1,2 @@ -nvidia-cutlass --i https://download.pytorch.org/whl/cu118 -torch==2.1.2 +-i https://download.pytorch.org/whl/cu121 +torch==2.3.1 diff --git a/byte_micro_perf/backends/backend.py b/byte_micro_perf/backends/backend.py index 59cfa6df..4a57599b 100644 --- a/byte_micro_perf/backends/backend.py +++ b/byte_micro_perf/backends/backend.py @@ -14,13 +14,22 @@ import os import time -import random +import logging import traceback from abc import ABC, abstractmethod +from datetime import timedelta from typing import Any, Dict, List +import torch +import torch.distributed as dist +import torch.distributed.distributed_c10d as dist_c10d + from backends.utils import dump_communication_ops_report, dump_computation_ops_report +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("PerfEngine") + + class Backend(ABC): def __init__(self, workload_dict: Dict[str, Any], vendor_path: str): self.op_name = workload_dict["operator"] @@ -42,62 +51,206 @@ def __init__(self, workload_dict: Dict[str, Any], vendor_path: str): self.target_dtype = None + """ + device management related + """ @abstractmethod def get_device_count(self): - pass + raise NotImplementedError @abstractmethod - def set_device(self, device_index): - pass + def get_device_name(self): + raise NotImplementedError + + @abstractmethod + def set_device(self, device_index : int): + raise NotImplementedError @abstractmethod def get_device(self): - pass + raise NotImplementedError @abstractmethod - def all_gather_object(self, obj): - pass + def device_synchronize(self): + raise NotImplementedError + @abstractmethod + def get_backend_properties(self): + raise NotImplementedError @abstractmethod - def get_device_name(self): - pass + def initialize_ccl(self, rank, world_size): + torch.cuda.set_device(rank) + + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "49373" + os.environ["LOCAL_RANK"] = str(rank) + os.environ["RANK"] = str(rank) + os.environ["WORLD_SIZE"] = str(world_size) + + dist.init_process_group( + backend="nccl", + world_size=world_size, + rank=rank, + timeout=timedelta(seconds=1800) + ) + + self.setup_2d_group() + return True + @abstractmethod - def get_backend_properties(self): - pass + def setup_2d_group(self): + # get dist info + self.world_size = dist.get_world_size() + self.rank = dist.get_rank() + self.ranks = range(0, self.world_size) + + # set device + torch.cuda.set_device(self.rank) + + # get original store_based_barrier + origin_store_based_barrier = dist_c10d._store_based_barrier + dist_c10d._store_based_barrier = lambda *a, **kw: None + group = dist.new_group(self.ranks) + if self.rank in self.ranks: + self.group = group + dist_c10d._store_based_barrier = origin_store_based_barrier + + # wait for all ranks finish group initializing + torch.barrier() @abstractmethod - def build_tensor(self, input_shapes: List[List[int]], dtype): - pass + def destroy_process_group(self): + dist.destroy_process_group() @abstractmethod - def _run_operation(self, operation, inputs): - pass + def barrier(self): + dist.barrier(self.group) @abstractmethod - def device_synchronize(self): - pass + def all_gather_object(self, obj): + if dist.is_initialized() and self.world_size is not None and self.group is not None: + gather_object_list = [None for _ in range(self.world_size)] + dist.all_gather_object( + object_list=gather_object_list, + obj=obj, + group=self.group + ) + return gather_object_list + @abstractmethod - def initialize_ccl(self, rank, world_size): - pass + def build_tensor(self, input_shapes: List[List[int]], dtype): + raise NotImplementedError @abstractmethod - def setup_2d_group(self): + def _run_operation(self, operation, inputs): + return operation(*inputs) + + + + # perf specify input_shape for + def perf(self, input_shapes: List[List[int]], dtype): + error = "" + + # create input tensors based on input_shapes and dtype + tensor_list, tensor_cnt, tensor_size_perc_cnt = self.build_tensor( + input_shapes, dtype) + + if tensor_cnt > 0: + try: + # warmup + num_warm_up = 10 + for _ in range(num_warm_up): + self._run_operation(self.op, tensor_list[0]) + + # test perf + num_test_perf = 10 + self.device_synchronize() + start_time = time.perf_counter_ns() + for i in range(num_test_perf): + self._run_operation(self.op, tensor_list[0]) + self.device_synchronize() + end_time = time.perf_counter_ns() + + prefer_iterations = self.iterations + max_perf_seconds = 10.0 + op_duration = (end_time - start_time) / num_test_perf / 1e9 + if op_duration > max_perf_seconds: + prefer_iterations = 5 + else: + prefer_iterations = min(max(int(max_perf_seconds // op_duration), 10), self.iterations) + + # ccl ops need barrier + if self.op_name in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p"]: + self.barrier() + + # perf + self.device_synchronize() + start_time = time.perf_counter_ns() + for i in range(prefer_iterations): + self._run_operation(self.op, tensor_list[i % tensor_cnt]) + self.device_synchronize() + end_time = time.perf_counter_ns() + + # time in us + total_exec_time = (end_time - start_time) / 1e3 + latency = round(total_exec_time / prefer_iterations, 2) + except Exception as e: + traceback.print_exc() + latency = 0 + error = "RUN_OP_ERROR" + else: + latency = 0 + error = "OOM" + + tensor_list = [] + + if self.op_name in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p"]: + report = dump_communication_ops_report( + self.op_name, + dtype, + input_shapes, + self.group.size(), + None, + latency, + error + ) + else: + report = dump_computation_ops_report( + self.op_name, + dtype, + input_shapes, + self.bandwidth_limit, + latency, + error + ) + + return report + + + + """ + gemm ops + """ + def gemm(self): pass - @abstractmethod - def destroy_process_group(self): + def gemv(self): pass - @abstractmethod - def barier(self): + def batch_gemm(self): + pass + + def group_gemm(self): pass - # communication ops + """ + communication ops + """ def host2device(self): pass @@ -196,97 +349,3 @@ def scatter(self): def gather(self): pass - - # gemm ops - def gemm(self): - pass - - def gemv(self): - pass - - def batch_gemm(self): - pass - - def group_gemm(self): - pass - - - # perf specify input_shape for - def perf(self, input_shapes: List[List[int]], dtype): - error = "" - - # create input tensors based on input_shapes and dtype - tensor_list, tensor_cnt, tensor_size_perc_cnt = self.build_tensor( - input_shapes, dtype - ) - - if tensor_cnt > 0: - try: - # warmup - num_warm_up = 10 - for _ in range(num_warm_up): - self._run_operation(self.op, tensor_list[0]) - - # test perf - num_test_perf = 10 - self.device_synchronize() - start_time = time.perf_counter_ns() - for i in range(num_test_perf): - self._run_operation(self.op, tensor_list[0]) - self.device_synchronize() - end_time = time.perf_counter_ns() - - prefer_iterations = self.iterations - max_perf_seconds = 10.0 - op_duration = (end_time - start_time) / num_test_perf / 1e9 - if op_duration > max_perf_seconds: - prefer_iterations = 5 - else: - prefer_iterations = min(max(int(max_perf_seconds // op_duration), 10), self.iterations) - - # ccl ops need barrier - if self.op_name in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p"]: - self.barier() - - # perf - self.device_synchronize() - start_time = time.perf_counter_ns() - for i in range(prefer_iterations): - self._run_operation(self.op, tensor_list[i % tensor_cnt]) - self.device_synchronize() - end_time = time.perf_counter_ns() - - # time in us - total_exec_time = (end_time - start_time) / 1e3 - latency = round(total_exec_time / prefer_iterations, 2) - except Exception as e: - traceback.print_exc() - latency = 0 - error = "RUN_OP_ERROR" - else: - latency = 0 - error = "OOM" - - tensor_list = [] - - if self.op_name in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast", "p2p"]: - report = dump_communication_ops_report( - self.op_name, - dtype, - input_shapes, - self.group.size(), - None, - latency, - error - ) - else: - report = dump_computation_ops_report( - self.op_name, - dtype, - input_shapes, - self.bandwidth_limit, - latency, - error - ) - return report - diff --git a/byte_micro_perf/backends/module_store.py b/byte_micro_perf/backends/module_store.py index 630d2db0..1655a8cb 100644 --- a/byte_micro_perf/backends/module_store.py +++ b/byte_micro_perf/backends/module_store.py @@ -14,7 +14,6 @@ import math import random -from typing import List import torch import torch.distributed as dist @@ -22,6 +21,9 @@ from .utils import get_dtype_bytes +""" +gemm ops +""" class GemmOp(torch.nn.Module): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -174,17 +176,21 @@ def forward(self, input_tensor_a, input_tensor_b): return output_tensor_list + +""" +communication ops +""" class Host2DeviceOp(torch.nn.Module): def __init__(self): super().__init__() def custom_create_tensors(self, input_shapes, torch_dtype, xpu_device): - host_tensor = torch.randn(input_shapes[0], dtype=torch_dtype, device="cpu") - device_tensor = torch.randn(input_shapes[0], dtype=torch_dtype, device=xpu_device) + host_tensor = torch.zeros(input_shapes[0], dtype=torch_dtype, device="cpu").pin_memory() + device_tensor = host_tensor.to(xpu_device) return [host_tensor, device_tensor] def forward(self, host_tensor, device_tensor): - device_tensor.copy_(host_tensor, non_blocking=True) + device_tensor.copy_(host_tensor) return device_tensor @@ -193,16 +199,15 @@ def __init__(self): super().__init__() def custom_create_tensors(self, input_shapes, torch_dtype, xpu_device): - device_tensor = torch.randn(input_shapes[0], dtype=torch_dtype, device=xpu_device) - host_tensor = torch.randn(input_shapes[0], dtype=torch_dtype, device="cpu") + host_tensor = torch.zeros(input_shapes[0], dtype=torch_dtype, device="cpu").pin_memory() + device_tensor = host_tensor.to(xpu_device) return [device_tensor, host_tensor] def forward(self, device_tensor, host_tensor): - host_tensor.copy_(device_tensor, non_blocking=True) + host_tensor.copy_(device_tensor) return host_tensor - class AllReduceOp(torch.nn.Module): def __init__(self, group): super().__init__() diff --git a/byte_micro_perf/core/perf_engine.py b/byte_micro_perf/core/perf_engine.py index d3b93213..d123f3ef 100644 --- a/byte_micro_perf/core/perf_engine.py +++ b/byte_micro_perf/core/perf_engine.py @@ -16,6 +16,7 @@ import sys import json import time +import datetime import signal import argparse import importlib @@ -31,39 +32,50 @@ import torch.multiprocessing as mp import virtualenv -CUR_DIR = pathlib.Path.cwd() -FILE_DIR = pathlib.Path(sys.path[0]) -BYTE_MLPERF_ROOT = FILE_DIR.parent -sys.path.insert(0, BYTE_MLPERF_ROOT.__str__()) +import torch -from backends.backend import Backend +# directory config +CUR_DIR = pathlib.Path.cwd().absolute() +FILE_DIR = pathlib.Path(__file__).parent.absolute() +BYTE_MLPERF_ROOT = FILE_DIR.parent +sys.path.insert(0, str(BYTE_MLPERF_ROOT)) +# logger config logging.basicConfig(level=logging.INFO) logger = logging.getLogger("PerfEngine") def get_args(): - """Parse commandline.""" parser = argparse.ArgumentParser() + + # hardware config parser.add_argument( - "--task", - default="gemm", - help="The task going to be evaluted, refs to workloads/", + "--hardware_type", + default="GPU", + help="The backend going to be evaluted, refs to backends/", + ) + parser.add_argument( + "--vendor_path", + help="The hardware configs need to be loaded, refs to vendor_zoo/NVIDIA/A100-PCIe.json", ) + + # task config parser.add_argument( "--task_dir", default=str(BYTE_MLPERF_ROOT.joinpath("workloads")), help="The direcotry of tasks going to be evaluted, e.g., set to workloads" ) parser.add_argument( - "--hardware_type", - default="GPU", - help="The backend going to be evaluted, refs to backends/", + "--task", + default="gemm", + help="The task going to be evaluted, refs to workloads/", ) + + # feature control parser.add_argument( - "--vendor_path", - required=False, - help="The hardware configs need to be loaded, refs to vendor_zoo/NVIDIA/A100-PCIe.json", + "--parallel", + type=int, default=1, + help="Run all tasks in parallel if available" ) parser.add_argument( "--activate_venv", @@ -253,8 +265,14 @@ def start_engine(self) -> None: ) # create output dir based on task - hardware_reports_dir = BYTE_MLPERF_ROOT.joinpath("reports", self.backend_type) - output_dir = BYTE_MLPERF_ROOT.joinpath("reports", self.backend_type, self.workload["operator"]) + # {BYTEMLPERF_ROOT}/byte_micro_perf/reports/{backend_type}/{task_name} + hardware_reports_dir = BYTE_MLPERF_ROOT.joinpath( + "reports", self.backend_type + ) + output_dir = BYTE_MLPERF_ROOT.joinpath( + "reports", self.backend_type, + self.workload["operator"] + ) output_dir.mkdir(parents=True, exist_ok=True) @@ -281,7 +299,7 @@ def start_engine(self) -> None: for shape in shape_list: test_list.append(ConfigInstance(dtype, shape, case_index)) case_index = case_index + 1 - + try: mp.set_start_method("spawn", force=True) except Exception as e: @@ -305,9 +323,8 @@ def signal_handler(signum, frame): # all operations will enter subprocess to test in parallel for group in group_list: - logger.info(f"Start to test group size: {group}") - # TODO: test allreduce/allgather in parallel - instance_num = device_count if group == 1 else group + logger.info(f"Start to test group size: {group}") + instance_num = min(device_count, max(1, self.args.parallel)) if group == 1 else group if self.workload["operator"] in ["device2host", "host2device"]: instance_num = 1 @@ -333,6 +350,9 @@ def signal_handler(signum, frame): assert "ready" == output_queues.get() logger.info("all ranks are ready and listening, init done") + + start_time = time.perf_counter_ns() + if group == 1: for test_instance in test_list: input_queues.put(test_instance, True) @@ -342,15 +362,32 @@ def signal_handler(signum, frame): for process in _subprocesses.processes: process.join() + + end_time = time.perf_counter_ns() + duration = (end_time - start_time) / 1e9 + duration = round(duration, 3) - with open(f"{hardware_reports_dir}/_run_report.log", "a") as f: - print(f"[success] {self.args.task}, group_size={group}", file=f) + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + ret_code = 0 + for process in _subprocesses.processes: + if process.exitcode != 0: + ret_code = process.exitcode + break + + if ret_code != 0: + with open(f"{hardware_reports_dir}/_run_report.log", "a") as f: + print(f"[failed] {self.args.task}, group_size={group}, {current_time}, {duration} s", file=f) + else: + with open(f"{hardware_reports_dir}/_run_report.log", "a") as f: + print(f"[success] {self.args.task}, group_size={group}, {current_time}, {duration} s", file=f) + except Exception as e: traceback.print_exc() logger.error(f"Execute task: {self.args.task} failed, group: {group}, error msg: {e}") with open(f"{hardware_reports_dir}/_run_report.log", "a") as f: - print(f"[error] {self.args.task}, group_size={group}", file=f) + print(f"[error] {self.args.task}, group_size={group}, {current_time}, {duration} s", file=f) subprocess_pids = [] @@ -388,7 +425,8 @@ def perf_func(self, rank: int, *args): if test_instance == "end": break - print(f"rank {rank}: {test_instance.index + 1} / {len(test_list)}") + + start_time = time.perf_counter_ns() test_dtype = test_instance.dtype test_shape = test_instance.tensor_shapes @@ -407,7 +445,12 @@ def perf_func(self, rank: int, *args): logger.error(f"Execute op: {op_name.lower()} failed, input_shape: {test_shape}, dtype: {test_dtype}, error msg: {e}") reports = {} - result_list.append(ResultItem(test_instance, reports)) + if reports and "Error" not in reports: + result_list.append(ResultItem(test_instance, reports)) + + duration = (time.perf_counter_ns() - start_time) / 1e9 + duration = round(duration, 3) + print(f"rank {rank}: {test_instance.index + 1} / {len(test_list)}, duration: {duration} s") output_result_list = [] if world_size > 1: @@ -422,8 +465,8 @@ def perf_func(self, rank: int, *args): elif group_size > 1: for i, test_instance in enumerate(test_list): - if rank == 0: - print(f"rank {rank}: {test_instance.index + 1} / {len(test_list)}") + + start_time = time.perf_counter_ns() test_dtype = test_instance.dtype test_shape = test_instance.tensor_shapes @@ -444,6 +487,13 @@ def perf_func(self, rank: int, *args): result_list.append(ResultItem(test_instance, reports)) + end_time = time.perf_counter_ns() + duration = (end_time - start_time) / 1e9 + duration = round(duration, 3) + + if rank == 0: + print(f"rank {rank}: {test_instance.index + 1} / {len(test_list)}, duration: {duration} s") + if rank == 0: print(f"{len(result_list)} tasks finished.") diff --git a/byte_micro_perf/launch.py b/byte_micro_perf/launch.py index 96c56bda..6f58927e 100644 --- a/byte_micro_perf/launch.py +++ b/byte_micro_perf/launch.py @@ -20,9 +20,13 @@ import subprocess import signal +# directory config CUR_DIR = pathlib.Path.cwd().absolute() -BYTE_MLPERF_ROOT = pathlib.Path(__file__).parent.absolute() -sys.path.insert(0, BYTE_MLPERF_ROOT) +FILE_DIR = pathlib.Path(__file__).parent.absolute() +BYTE_MLPERF_ROOT = FILE_DIR +sys.path.insert(0, str(BYTE_MLPERF_ROOT)) + +# logger config logging.basicConfig(level=logging.INFO) logger = logging.getLogger("lanuch") @@ -77,6 +81,11 @@ def parse_task(task_dir): ) # feature control + parser.add_argument( + "--parallel", + type=int, default=1, + help="Run all tasks in parallel if available" + ) parser.add_argument( "--install_requirements", action="store_true", help="Install all required packages" @@ -172,17 +181,14 @@ def parse_task(task_dir): if args.install_requirements: logger.info("******************* Pip Package Installing *******************") subprocess.run( - "python3 -m pip install --upgrade pip --quiet", - shell=True + ["python3", "-m", "pip", "install", "pip", "--upgrade", "--quiet"] ) subprocess.run( - "python3 -m pip install -r requirements.txt --quiet", - shell=True + ["python3", "-m", "pip", "install", "-r", "requirements.txt", "--quiet"] ) - if not args.activate_venv and BYTE_MLPERF_ROOT.joinpath("backends", args.hardware_type, "requirements.txt").exists(): + if not args.activate_venv: subprocess.run( - f"python3 -m pip install -r backends/{args.hardware_type}/requirements.txt --quiet", - shell=True + ["python3", "-m", "pip", "install", "-r", f"backends/{hardware}/requirements.txt", "--quiet"] ) @@ -211,14 +217,11 @@ def signal_handler(signum, frame): cmds = [ "python3", "./core/perf_engine.py", - "--hardware_type", - args.hardware_type, - "--task", - task, - "--vendor_path", - str(args.vendor_path), - "--task_dir", - str(args.task_dir) + "--hardware_type", args.hardware_type, + "--vendor_path", str(args.vendor_path), + "--task", task, + "--task_dir", str(args.task_dir), + "--parallel", str(args.parallel) ] if args.activate_venv: cmds.append("--activate_venv") diff --git a/byte_micro_perf/workloads/gemm.json b/byte_micro_perf/workloads/gemm.json index 6fe110da..6c166655 100644 --- a/byte_micro_perf/workloads/gemm.json +++ b/byte_micro_perf/workloads/gemm.json @@ -3,7 +3,20 @@ "iterations": 100, "input_shape_groups": { "M": [4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072], - "KN": [[1024, 1024], [16384, 1024], [16384, 32], [32, 16384], [1024, 16384], [4096, 4096], [8192, 8192], [12288, 12288]] + "KN": [ + [1024, 1024], + [4096, 4096], + [8192, 8192], + [12288, 12288], + + [16384, 32], + [16384, 128], + [16384, 1024], + + [32, 16384], + [128, 16384], + [1024, 16384] + ] }, "dtype": [ "float32",