单机多卡DDP tutorial

本文最后更新于:2024年1月21日 上午

单机多卡DDP tutorial

1 什么是DDP

当数据足够多的时候,一张显卡装不下很大的batch_size,我们需要将数据分摊到多个显卡上去。

DDP的思路很简单,我们在每个显卡上面都创建一个模型的复制以及相同的Optimizer,数据被Distributed Sampler平分,并送到每张显卡上完成Forward和Backward过程。

但如果只做以上步骤的话,我们得到的是四个不同参数的模型。因为每张显卡的数据不同,那么计算图得到的梯度也不同,直接更新参数会导致每张显卡的模型不一样。

DDP则利用Synchronize与All Reduce技术来同步所有显卡的训练,而这个技术好的地方在于它将Backward过程与显卡间通信同时进行,而不是等到所有梯度都计算出来之后再进行显卡间Synchornize,这样可以最大程度上保证显卡不会空闲。

2 Code Walkthrough

要使用DDP,至少需要这些东西

1
2
3
4
5
6
7
8
import torch.multiprocessing as mp
# Python多进程分配器
from torch.utils.data.distributed import DistributedSampler
# 分布式训练的采样器,用来平分数据到不同显卡上
from torch.nn.parallel import DistributedDataParallel as DDP
# 模型的DDP Wrapper
from torch.distributed import init_process_group, destroy_process_group
# 创建多进程的函数以及摧毁多进程的函数

接下来,我们要写一个函数来初始化分布式训练

1
def ddp_setup(rank, world_size)

rank是分给多个进程(在这里就是GPU)的单独标识符,例如第一个GPU是rank0, 第二个GPU是rank1。world_size是指整个机器上有多少进程(在这里也是GPU)。现在我们来实现这个函数

1
2
3
4
5
6
7
8
9
10
def ddp_setup(rank, world_size)
os.environ['MASTER_ADDR'] = 'localhost'
# 使用一台机器多张卡,所以主节点设在本地IP
os.environ['MASTER_PORT'] = '12345'
# 主节点IP的端口,由于在本机上,随便填写一个
init_process_group(
backend='nccl', # CUDA计算后端库
rank=rank, # 传入目前在哪个进程上
world_size=world_size # 传入总进程个数
)

为了方便,这里写一个toy trainer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Trainer():
def __init__(
self,
model,
train_data,
optimizer,
gpu_id,
save_every
):
self.gpu_id = gpu_id # 目前处在的GPU
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP( # DDP Wrapper
self.model,
device_ids=[self.gpu_id] # 传入目前处在的GPU
)

def save_ckpt(self):
ckpt = self.model.module.state_dict()
# 当使用DDP Wrap模型之后,需要访问module才能访问到真正的state_dict
torch.save(ckpt, "ckpt.pth")

def train(self, epochs):
for epoch in range(epochs):
self.train_one_epoch()
if self.gpu_id == 0 and epoch % self.save_every ==0:
# 多判断一下是不是在主进程上,否则将在所有进程里保存模型,这样就造成了浪费
self.save_ckpt()

另外,还需要在data loader中传入分布式采样器

1
2
3
4
5
6
7
8
def prepare_dataloader(dataset, batch_size):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False, # 使用分布式采样器时不需要shuffle
sampler=DistributedSampler(dataset)
)

最后配置主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def main(
rank,
world_size,
total_epochs,
save_every
):
ddp_setup(rank, world_size)
# 初始化这个进程
dataset, model, optimizer = init_train_objs()
train_data = prepare_dataloader(dataset, batch_size=32)
trainer = Trainer(
model,
train_data,
optimizer,
rank,
save_every
)
trainer.train(total_epochs)
destroy_process_group()
# 完成训练,摧毁这个进程

最后是程序入口

1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == "__main__":
total_epochs = 10,
save_every = 2
world_size = torch.cuda.device_count()
# 自动传回有多少GPU可以用
mp.spawn(
main,
args=(world_size, total_epochs, save_every),
nprocs=world_size
)
# 开启多进程,传入主函数,并把主函数需要的参数传入args
# 注意这里并没有传rank,因为spawn会自动为每个进程分配rank
# nprocs传入有多少进程,即world_size

所以PyTorch的DDP是一个多进程设计思路,每个GPU看作一个进程,因此我们需要知道可以开多少进程(world_size),以及程序运行在哪个进程(rank)。我们在很多函数中都要传入rank参数,就是因为DDP会告诉每个进程处在哪个rank上,虽然每个rank上运行的都是相同的代码,但有些时候判断是否在rank0是有用的。另外,数据需要分布式采样器来为每个GPU分配数据。

更大型的项目中会使用到更多的分布式训练技巧,Meta的开源项目就写了很好的example,将在下一篇文章中解读。


单机多卡DDP tutorial
https://jesseprince.github.io/2024/01/20/pytorch/ddp1_overview/
作者
林正
发布于
2024年1月20日
许可协议