Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

教程文档:分布式专题 #481

Open
7 tasks
strint opened this issue Apr 25, 2022 · 3 comments
Open
7 tasks

教程文档:分布式专题 #481

strint opened this issue Apr 25, 2022 · 3 comments

Comments

@strint
Copy link
Contributor

strint commented Apr 25, 2022

Maintainer List

@doombeaker @clackhan @strint @leaves-zwx

这些同事需要对本教程的正确性、专业性长期负责。

目的

发布分布式教程,以让开发者参考教程,可以自服务的完成分布式模型的搭建;

受众

所有用户和开发者;跟随0.8 周期完成;

稳定性

稳定性跟随特定Feature的稳定性;

完备性

1、Quick start @doombeaker
5分钟启动一个测试

  • 跑一个数据/模型混合并行;
  • 介绍启动方式;

2、1D 并行 @clackhan

  • 数据并行
  • 模型并行
  • 模型加载、保存

3、流水并行 @strint

  • 流水并行示例

4、2D 混合并行 + 流水并行 @leaves-zwx

  • 示例 + 推荐使用libai

测试

所有示例要求都可以执行

@clackhan
Copy link

2、1D 并行 @clackhan

Global tensor可以轻松支持任何并行性,包括数据并行性、模型并行性,可以跨多台机器运行。

注意: 本教程中的代码在 2-GPU 服务器上运行,但可以轻松推广到其他环境

  • 数据并行

  • 模型构建

在数据并行模式中,每个GPU上包含完整的模型参数,各张卡的参数完全相同,每个rank输入不同的数据。接下来使用Global 模式训练数据并行网络,第一步是创建模型,下面代码定义了一个包含两个全连接层的网络,并将其扩展到到两卡。

注意: 代码中单模型通过to_global扩展到两卡时,会将rank 0上模型的参数广播到其他rank上,故无需担心不同的进程上模型参数初始值不同。

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim
import time

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

      
placement = flow.placement(type="cuda", ranks=[0, 1])
sbp = flow.sbp.broadcast

model = ToyModel()
model.to_global(placement, sbp) # 将模型扩展到两卡
  • 模型训练

数据并行模型训练脚本与单机单卡无异

max_iter = 20

for i in range(max_iter):
    data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
    labels = flow.randn(20, 5, placement=placement, sbp=flow.sbp.split(0))
    loss = model(data, labels)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
  • 模型保存与加载

与local 模式相比,global模式在保存和加载模型时需要指定参数global_dst_rank,该参数用于配置在哪个rank执行模型持久化或加载模型过程。

flow.save(model.state_dict(), 'model_weights.pth', global_dst_rank=0)

model2 = ToyModel()
model2.to_global(placement, sbp)
model2.load_state_dict(flow.load('model_weights.pth', global_src_rank=0))

⚠️在global模式下,每个rank都需要执行flow.save(...)/flow.load(...)脚本,即不允许在if rank == 0:作用域中执行flow.save(...)/flow.load(...)

  • 完整代码(在网页中可以默认折叠)与启动方式

OneFlow 提供了 oneflow.distributed.launch (链接到distributed.launch教程/文档)模块帮助用户更方便地启动分布式训练。

用户可以借助以下的形式,启动分布式训练:

python3 -m oneflow.distributed.launch [启动选项] 训练脚本.py

执行以下命令启动单机两卡的数据并行训练:

python3 -m oneflow.distributed.launch --nproc_per_node 2 data_parallel.py
# data_parallel.py

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 20)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(20, 16)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

def demo_basic(max_iter, load_path=None, save_path=None):
    placement = flow.placement(type="cuda", ranks=[0, 1])
    sbp = flow.sbp.broadcast

    model = ToyModel()
    model.to_global(placement, sbp) # 将模型扩展到两卡
    
    if load_path is not None:
        model.load_state_dict(flow.load(load_path, global_src_rank=0))
    
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    
    for i in range(max_iter):
        data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
        labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.split(0))
        loss = model(data, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if save_path is not None:
        flow.save(model.state_dict(), save_path, global_dst_rank=0)

if __name__ == "__main__":
    demo_basic(10)
  • 模型并行

使用Global 模式可以很轻松的实现模型并行,整体过程与数据并行大体相同,区别在与定义模型时sbp的配置。与数据并行不同,模型并行的参数均匀分布在每个GPU上,相反每个rank上都有完整的数据。

注意: 调用to_global(...)接口执行local_to_global的转换时,当sbp中包含flow.sbp.split(x)时,会将各rank上的local tensor以x为dim拼接到一起,为了使得linear module的参数(in_features,out_features)具有和数据并行相同的语义,下面代码中定义的ModelParallelLinear

模型并行模式下,脚本启动方式和模型的加载/保存与数据并行相同

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim

class ModelParallelLinear(nn.Module):
    def __init__(self, in_features: int, out_features: int, placement: flow.placement, bias: bool = True):
        super(ModelParallelLiner, self).__init__()
        assert out_features % placement.ranks.size == 0, "out_features must be divisible by parallel num"
        self.linear = nn.Linear(in_features, out_features // placement.ranks.size, bias) # 由于调用to_global将
        self.linear.to_global(placement, flow.sbp.split(0))

    def forward(self, x):
        return self.linear(x)

class ToyModel(nn.Module):
    def __init__(self, placement):
        super(ToyModel, self).__init__()
        self.net1 = ModelParallelLinear(10, 20, placement)
        self.relu = nn.ReLU()
        self.net2 = ModelParallelLinear(20, 16, placement)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

def demo_basic(max_iter, load_path=None, save_path=None):
    placement = flow.placement(type="cuda", ranks=[0, 1])

    model = ToyModel(placement)
    if load_path is not None:
        model.load_state_dict(flow.load(load_path, global_src_rank=0))
    
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    
    for i in range(max_iter):
        data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.broadcast)
        labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.broadcast)
        loss = model(data, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if save_path is not None:
        flow.save(model.state_dict(), save_path, global_dst_rank=0)

if __name__ == "__main__":
    demo_basic(10)
  • 模型加载、保存

@strint
Copy link
Contributor Author

strint commented Jul 27, 2022

第二篇:

使用 Global Tensor 进行多机多设备编程:数据并行、模型并行、流水并行

各一个可执行的示例 + 解释

参考:

执行过程

  • 题目 + 提纲
  • 把例子跑起来,填充进去
  • 在补充内容

@strint
Copy link
Contributor Author

strint commented Jul 27, 2022

第三篇:global tensor 后向

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants