提交 dcaa6154 编辑于 作者: Solomonwisdom's avatar Solomonwisdom
浏览文件

Initial commit

上级
.DS_Store
.vscode
# DDP, Horovod, 单机多卡和多机多卡对比
## ring-allreduce简介
#### 数据并行
数据并行的方法包含在多个节点上并行分割数据和训练。不同批次的数据在不同节点上分别被计算,反向传播得到梯度,然后节点之间通过通信,求得平均梯度,用来对每个节点中的模型副本进行一致化更新。
具体的步骤可以简化为以下几步:
+ 运行训练模型的多个副本,每个副本:
+ 读取数据块
+ 将其输入模型
+ 计算梯度
+ 通过通信,计算所有副本的梯度均值
+ 一致化更新所有模型副本
+ 重复上述步骤
#### ring-allreduce
数据并行这种分布式训练的关键是找到一个好的通信策略。ring-allreduce是一个稳定的通信策略,GPU被组织成了一个逻辑环,每个GPU只有一个左邻和一个右邻;每个GPU只会向它的右邻居发送数据,并从他的左邻居接收数据。
<img src="./doc/fig1.png" alt="image" style="zoom:40%;" />
ring-allreduce分为两个步骤,分别是**The Scatter-Reduce****The Allgather**
+ The Scatter-Reduce
假设有4块GPU,梯度数据将被分为4块,GPU将进行4-1次Scatter-Reduce迭代,每次迭代中,GPU向右邻居发送一个块,并从左邻居接收一个块。第n个GPU从发送块n和接收块n-1开始,每次迭代都发送它在前一次迭代中接收到的块。
![image](./doc/fig2.png)
+ The Allgather
在The scatter-reduce结束后,每个GPU都有一块数据是最终值,接着,GPU交换这些块,以便所有的GPU都具有所需的数据,交换的过程和Scatter-reduce类似
![image](./doc/fig3.png)
之后,每个GPU对梯度求平均,并更新模型,读取下一批数据,进行训练。
## 两种框架的使用
### DDP(单机多卡)
Pytorch的torch.distributed模块中封装了ring-allreduce算法,是官方推荐的分布式训练方法。使用它的具体模式如下:
``` python
import torch.distributed as dist
# Use CUDA
use_cuda = torch.cuda.is_available()
# 当前线程所处的rank
local_rank = dist.get_rank()
# 设定cuda的默认GPU,每个rank不同
torch.cuda.set_device(local_rank)
# 初始化分布式进程组,backend指定后端通信方式,包括mpi,gloo,nccl。
# init_method是一个url,指定如何初始化进程组(如何找到其他节点)
torch.distributed.init_process_group(backend='nccl',init_method="env://")
def main():
# 训练集
trainset = ...
# 分布式采样器
sampler = torch.utils.data.distributed.DistributedSampler(trainset)
# 加载训练集
trainloader = data.DataLoader(dataset=trainset, batch_size=args.train_batch * dist.get_world_size(), shuffle=False, sampler=sampler)
# 测试集
testset = ...
# 在分布式训练中,一个batch会被划分为几等份,分配给每个GPU进行训练。
# 因此,batch_size需要乘上GPU的个数
testloader = data.DataLoader(testset, batch_size=args.test_batch * dist.get_world_size(), shuffle=False, num_workers=args.workers)
# 准备模型
model = ...
# 将模型放在自己的rank对应的cuda上
# 这里是一个映射关系,cuda[local_rank]会被映射到对应的visible device。
device = torch.device('cuda', local_rank)
model = model.to(device)
# 并行化
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
# 损失函数和优化方法
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
# Train and val
for epoch in range(start_epoch, args.epochs):
train...
eval...
if __name__ == '__main__':
main()
```
训练时,需要在指令中分配GPU,确定进程个数等,训练指令可以是:
``` shell
CUDA_VISIBLE_DEVICES=0,1,2,3 python3.5 -m torch.distributed.launch --nproc_per_node=4 cifar.py [other arguments]
```
### horovod
horovod是Uber开源的使用ring-allreduce算法分布式训练框架,适用于多个机器学习框架。
Pytorch中使用horovod的模式如下:
``` python
import horovod.torch as hvd
# 初始化
hvd.init()
# 分配rank
local_rank = hvd.local_rank()
torch.cuda.set_device(local_rank)
def main():
# 训练集
trainset = ...
# 分布式采样器
sampler = torch.utils.data.distributed.DistributedSampler(trainset,num_replicas=hvd.size(), rank=hvd.rank())
# 加载训练集
trainloader = data.DataLoader(dataset=trainset, batch_size=args.train_batch, shuffle=False, sampler=sampler)
# 测试集
testset = ...
testloader = data.DataLoader(testset, batch_size=args.test_batch * hvd.size(), shuffle=False, num_workers=args.workers)
# 准备模型
model=...
# 将模型放到指定cuda
device = torch.device('cuda', local_rank)
model = model.to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
# 用horovod封装优化器
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 广播参数,这个是为了在一开始的时候同步各个gpu之间的参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# Train and val
for epoch in range(start_epoch, args.epochs):
train...
eval...
if __name__ == '__main__':
main()
```
训练时也需要在指令中分配GPU,确定进程个数
``` shell
CUDA_VISIBLE_DEVICES=0,1,2,3 horovodrun -np 4 -H localhost:4 python3.5 cifar_horovod.py [other arguments]
```
### DDP 多机多卡
在多机多卡中,由于涉及到多个主机之间的通信,所以在<code>dist.init_progress_group()</code>的init_method参数中,需要指定多个进程如何进行通信。init_method有两种初始化方法:
+ 使用TCP初始化:
``` python
torch.distributed.init_process_group(backend='nccl',
init_method="tcp://210.28.134.32:29998",
rank=local_rank,
world_size=world_size)
```
+ 使用共享文件系统初始化
``` python
torch.distributed.init_process_group(backend='nccl',
init_method="file:///data/share/sharedfile",
rank=local_rank,
world_size=world_size)
```
注意,不管是哪种初始化方法,都需要提供rank和world_size。在TCP初始化中,rank为0的节点作为主节点,init_method中的IP地址对应主节点的IP地址。world_size指定一共需要的进程个数,在启动的进程数目达到world_size之前,<code>init_prpcess_group</code>将会阻塞。
代码模式:
``` python
import torch
import torch.distributed as dist
def my_init():
# GPU allocation
global local_rank
global gpu_id
# 在使用torch.distributed.launch启动时,gpu_id就是rank
# 但是在多节点情况下,需要手动设置
world_size = args.wz
gpu_id = args.gpu_id
local_rank = args.local_rank
torch.cuda.set_device(gpu_id) # 设定cuda的默认GPU,每个rank不同
torch.distributed.init_process_group(backend='nccl',init_method="tcp://210.28.134.32:29998" ,rank=local_rank, world_size=world_size)
def main():
trainset = ...
sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = data.DataLoader(dataset=trainset, batch_size=args.train_batch * dist.get_world_size(), shuffle=False, sampler=sampler)
testset = ...
testloader = data.DataLoader(testset, batch_size=args.test_batch * dist.get_world_size(), shuffle=False, num_workers=args.workers)
# Model
model = ...
device = torch.device('cuda', gpu_id)
model = model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu_id], output_device=gpu_id)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
# Train and val
for epoch in range(start_epoch, args.epochs):
train...
eval...
if __name__ == '__main__':
my_init()
main()
```
训练启动:
使用DDP进行多机多卡分布式训练的启动过程比较繁琐,需要手动逐个启动每个进程。
``` shell
#host1
python cifar_multi_nodes.py --local-rank 0 --world-size 4 --gpu-id 2 [other args]
python cifar_multi_nodes.py --local-rank 1 --world-size 4 --gpu-id 3 [other args]
#host2
python cifar_multi_nodes.py --local-rank 2 --world-size 4 --gpu-id 7 [other args]
python cifar_multi_nodes.py --local-rank 3 --world-size 4 --gpu-id 8 [other args]
```
### Horovod 多机多卡
horovod多机多卡不需要对代码进行任何修改,只需要运行至指定多个host和每个host的进程数目即可
``` shell
CUDA_VISIBLE_DEVICES=0,1 horovodrun -np 4 -H host1:2,host2:2 python cifar_horovod.py [other args]
```
## 效果对比
![image](./doc/fig4.png)
经过测试发现,多台机器之间训练的速度非常慢,对于参数较多的模型,速度比单机还要慢,可能是受限于网络的传输速度。
\ No newline at end of file
## CIFAR-10
#### AlexNet
```
python cifar.py -a alexnet --epochs 164 --schedule 81 122 --gamma 0.1
```
#### VGG19 (BN)
```
python cifar.py -a vgg19_bn --epochs 164 --schedule 81 122 --gamma 0.1
```
#### ResNet-110
```
python cifar.py -a resnet --depth 110 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### ResNet-1202
```
python cifar.py -a resnet --depth 1202 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### PreResNet-110
```
python cifar.py -a preresnet --depth 110 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### ResNeXt-29, 8x64d
```
python cifar.py -a resnext --depth 29 --cardinality 8 --widen-factor 4 --schedule 150 225 --wd 5e-4 --gamma 0.1
```
#### ResNeXt-29, 16x64d
```
python cifar.py -a resnext --depth 29 --cardinality 16 --widen-factor 4 --schedule 150 225 --wd 5e-4 --gamma 0.1
```
#### WRN-28-10-drop
```
python cifar.py -a wrn --depth 28 --depth 28 --widen-factor 10 --drop 0.3 --epochs 200 --schedule 60 120 160 --wd 5e-4 --gamma 0.2
```
#### DenseNet-BC (L=100, k=12)
**Note**:
* DenseNet use weight decay value `1e-4`. Larger weight decay (`5e-4`) if harmful for the accuracy (95.46 vs. 94.05)
* Official batch size is 64. But there is no big difference using batchsize 64 or 128 (95.46 vs 95.11).
```
python cifar.py -a densenet --depth 100 --growthRate 12 --train-batch 64 --epochs 300 --schedule 150 225 --wd 1e-4 --gamma 0.1
```
#### DenseNet-BC (L=190, k=40)
```
python cifar.py -a densenet --depth 190 --growthRate 40 --train-batch 64 --epochs 300 --schedule 150 225 --wd 1e-4 --gamma 0.1
```
## CIFAR-100
#### AlexNet
```
python cifar.py -a alexnet --dataset cifar100 --epochs 164 --schedule 81 122 --gamma 0.1
```
#### VGG19 (BN)
```
python cifar.py -a vgg19_bn --dataset cifar100 --epochs 164 --schedule 81 122 --gamma 0.1
```
#### ResNet-110
```
python cifar.py -a resnet --dataset cifar100 --depth 110 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### ResNet-1202
```
python cifar.py -a resnet --dataset cifar100 --depth 1202 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### PreResNet-110
```
python cifar.py -a preresnet --dataset cifar100 --depth 110 --epochs 164 --schedule 81 122 --gamma 0.1 --wd 1e-4
```
#### ResNeXt-29, 8x64d
```
python cifar.py -a resnext --dataset cifar100 --depth 29 --cardinality 8 --widen-factor 4 --schedule 150 225 --wd 5e-4 --gamma 0.1
```
#### ResNeXt-29, 16x64d
```
python cifar.py -a resnext --dataset cifar100 --depth 29 --cardinality 16 --widen-factor 4 --schedule 150 225 --wd 5e-4 --gamma 0.1
```
#### WRN-28-10-drop
```
python cifar.py -a wrn --dataset cifar100 --depth 28 --depth 28 --widen-factor 10 --drop 0.3 --epochs 200 --schedule 60 120 160 --wd 5e-4 --gamma 0.2
```
#### DenseNet-BC (L=100, k=12)
```
python cifar.py -a densenet --dataset cifar100 --depth 100 --growthRate 12 --train-batch 64 --epochs 300 --schedule 150 225 --wd 1e-4 --gamma 0.1
```
#### DenseNet-BC (L=190, k=40)
```
python cifar.py -a densenet --dataset cifar100 --depth 190 --growthRate 40 --train-batch 64 --epochs 300 --schedule 150 225 --wd 1e-4 --gamma 0.1
```
'''
Training script for CIFAR-10/100
Copyright (c) Wei YANG, 2017
'''
from __future__ import print_function
import argparse
import os
import shutil
import time
import datetime
import random
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.distributed as dist
from torch.multiprocessing import Process
import models.cifar as models
from utils import AverageMeter, accuracy, DataPartitioner
model_names = sorted(name for name in models.__dict__
if name.islower() and not name.startswith("__")
and callable(models.__dict__[name]))
parser = argparse.ArgumentParser(description='PyTorch CIFAR10/100 Training')
# Datasets
parser.add_argument('-d', '--dataset', default='cifar10', type=str)
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
# Optimization options
parser.add_argument('--epochs', default=300, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('--train-batch', default=256, type=int, metavar='N',
help='train batchsize')
parser.add_argument('--test-batch', default=100, type=int, metavar='N',
help='test batchsize')
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
metavar='LR', help='initial learning rate')
parser.add_argument('--drop', '--dropout', default=0, type=float,
metavar='Dropout', help='Dropout ratio')
parser.add_argument('--schedule', type=int, nargs='+', default=[150, 225],
help='Decrease learning rate at these epochs.')
parser.add_argument('--gamma', type=float, default=0.1, help='LR is multiplied by gamma on schedule.')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--weight-decay', '--wd', default=5e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)')
# Checkpoints
parser.add_argument('-c', '--checkpoint', default='checkpoint', type=str, metavar='PATH',
help='path to save checkpoint (default: checkpoint)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
help='path to latest checkpoint (default: none)')
# Architecture
parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet20',
choices=model_names,
help='model architecture: ' +
' | '.join(model_names) +
' (default: resnet18)')
parser.add_argument('--depth', type=int, default=56, help='Model depth.')
parser.add_argument('--block-name', type=str, default='BasicBlock',
help='the building block for Resnet and Preresnet: BasicBlock, Bottleneck (default: Basicblock for cifar10/cifar100)')
parser.add_argument('--cardinality', type=int, default=8, help='Model cardinality (group).')
parser.add_argument('--widen-factor', type=int, default=4, help='Widen factor. 4 -> 64, 8 -> 128, ...')
parser.add_argument('--growthRate', type=int, default=12, help='Growth rate for DenseNet.')
parser.add_argument('--compressionRate', type=int, default=2, help='Compression Rate (theta) for DenseNet.')
# Miscs
parser.add_argument('--manualSeed', type=int, help='manual seed')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
help='evaluate model on validation set')
# GPU allocation
parser.add_argument('--local_rank', type=int, default=0, help='Local rank')
args = parser.parse_args()
state = {k: v for k, v in args._get_kwargs()}
# Validate dataset
assert args.dataset == 'cifar10' or args.dataset == 'cifar100', 'Dataset can only be cifar10 or cifar100.'
# Use CUDA
use_cuda = torch.cuda.is_available()
# Random seed
if args.manualSeed is None:
args.manualSeed = random.randint(1, 10000)
random.seed(args.manualSeed)
torch.manual_seed(args.manualSeed)
if use_cuda:
torch.cuda.manual_seed_all(args.manualSeed)
# GPU allocation
local_rank = args.local_rank
torch.cuda.set_device(local_rank) # 设定cuda的默认GPU,每个rank不同
torch.distributed.init_process_group(backend='nccl',init_method="env://")
def main():
start_epoch = args.start_epoch # start from epoch 0 or last checkpoint epoch
# Data
print('==> Preparing dataset %s' % args.dataset)
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
if args.dataset == 'cifar10':
dataloader = datasets.CIFAR10
num_classes = 10
else:
dataloader = datasets.CIFAR100
num_classes = 100
trainset = dataloader(root='/data/share/cifar', train=True, download=True, transform=transform_train)
sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = data.DataLoader(dataset=trainset, batch_size=args.train_batch, shuffle=False, sampler=sampler)
testset = dataloader(root='/data/share/cifar', train=False, download=False, transform=transform_test)
testloader = data.DataLoader(testset, batch_size=args.test_batch, shuffle=False, num_workers=args.workers)
# Model
print("==> creating model '{}'".format(args.arch))
if args.arch.startswith('resnext'):
model = models.__dict__[args.arch](
cardinality=args.cardinality,
num_classes=num_classes,
depth=args.depth,
widen_factor=args.widen_factor,
dropRate=args.drop,
)
elif args.arch.startswith('densenet'):
model = models.__dict__[args.arch](
num_classes=num_classes,
depth=args.depth,
growthRate=args.growthRate,
compressionRate=args.compressionRate,
dropRate=args.drop,
)
elif args.arch.startswith('wrn'):
model = models.__dict__[args.arch](
num_classes=num_classes,
depth=args.depth,
widen_factor=args.widen_factor,
dropRate=args.drop,
)
elif args.arch.endswith('resnet'):
model = models.__dict__[args.arch](
num_classes=num_classes,
depth=args.depth,
block_name=args.block_name,
)
else:
model = models.__dict__[args.arch](num_classes=num_classes)
device = torch.device('cuda', local_rank)
model = model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
print('Model on cuda:%d' % local_rank)
cudnn.benchmark = True
print(' Total params: %.2fM' % (sum(p.numel() for p in model.parameters())/1000000.0))
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
# Train and val
for epoch in range(start_epoch, args.epochs):
adjust_learning_rate(optimizer, epoch)
train_loss, train_acc = train(trainloader, model, criterion, optimizer, epoch, use_cuda)
test_loss, test_acc = test(testloader, model, criterion, epoch, use_cuda)
print('Rank:{} Epoch[{}/{}]: LR: {:.3f}, Train loss: {:.5f}, Test loss: {:.5f}, Train acc: {:.2f}, Test acc: {:.2f}.'.format(dist.get_rank(),epoch+1, args.epochs, state['lr'],
train_loss, test_loss, train_acc, test_acc))
# print('Rank:{} Epoch[{}/{}]: LR: {:.3f}, Train loss: {:.5f}, Train acc: {:.2f}'.format(dist.get_rank(),epoch+1, args.epochs, state['lr'],train_loss, train_acc))
def train(trainloader, model, criterion, optimizer, epoch, use_cuda):
# switch to train mode
model.train()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
for batch_idx, (inputs, targets) in enumerate(trainloader):
if use_cuda:
inputs, targets = inputs.cuda(local_rank), targets.cuda(local_rank, async=True)
inputs, targets = torch.autograd.Variable(inputs), torch.autograd.Variable(targets)
# compute output
outputs = model(inputs)
loss = criterion(outputs, targets)
# measure accuracy and record loss
prec1, prec5 = accuracy(outputs.data, targets.data, topk=(1, 5))
losses.update(loss.data.item(), inputs.size(0))
top1.update(prec1.item(), inputs.size(0))
top5.update(prec5.item(), inputs.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
return (losses.avg, top1.avg)
def test(testloader, model, criterion, epoch, use_cuda):
global best_acc
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()