Pytorch分布式训练(单机多卡)

主要两种方式:DataParallel和DistributedDataParallel
DataParallel实现简单,但速度较慢,且存在负载不均衡的问题。
DistributedDataParallel本身是实现多机多卡的,但单机多卡也可以使用,配置稍复杂。demo如下:

DataParallel

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
                         batch_size=batch_size, shuffle=True)

class Model(nn.Module):
    # Our model

    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output
model = Model(input_size, output_size)

if torch.cuda.is_available():
    model.cuda()

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 就这一行
    model = nn.DataParallel(model)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = Variable(data.cuda())
    else:
        input_var = Variable(data)
    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

DistributedDataParallel

运行: CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 distributedDataParallel.py

# distributedDataParallel.py 
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
from apex import amp # 使用混合精度
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

# 2) 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
world_size = dist.get_world_size() #os.environ["WORLD_SIZE"]
class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
sampler = DistributedSampler(dataset, num_replicas=world_size, shuffle=True)
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size, 
                         sampler=sampler)

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)
optimizer = torch.optim.Adam([{'params': model.parameters()}], lr=args.base_lr, weight_decay=args.weight_decay)
model, optimizer = amp.initialize(model, optimizer, opt_level=1) # 如果使用apex混合精度训练,初始化放在model和optimizer定义之后,DDP之前

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data.to(device)
    else:
        input_var = data

    output = model(input_var)
    # print("Outside: input size", input_var.size(), "output_size", output.size())
    loss = criterion(output , ground_truth) # loss定义(略)
    optimizer.zero_grad()
    # loss.backward()
    with amp.scale_loss(loss , optimizer) as scaled_loss: # 使用apex混合精度,在loss backward处改为scaled_loss
    	scaled_loss.backward()
    optimizer.step()
    

参考:【分布式训练】单机多卡的正确打开方式


版权声明:本文为xiezongsheng1990原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。