本文最后更新于:2024年1月21日 上午
单机多卡DDP tutorial
1 什么是DDP
当数据足够多的时候,一张显卡装不下很大的batch_size,我们需要将数据分摊到多个显卡上去。
DDP的思路很简单,我们在每个显卡上面都创建一个模型的复制以及相同的Optimizer,数据被Distributed Sampler平分,并送到每张显卡上完成Forward和Backward过程。
但如果只做以上步骤的话,我们得到的是四个不同参数的模型。因为每张显卡的数据不同,那么计算图得到的梯度也不同,直接更新参数会导致每张显卡的模型不一样。
DDP则利用Synchronize与All Reduce技术来同步所有显卡的训练,而这个技术好的地方在于它将Backward过程与显卡间通信同时进行,而不是等到所有梯度都计算出来之后再进行显卡间Synchornize,这样可以最大程度上保证显卡不会空闲。
2 Code Walkthrough
要使用DDP,至少需要这些东西
1 2 3 4 5 6 7 8
| import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
|
接下来,我们要写一个函数来初始化分布式训练
1
| def ddp_setup(rank, world_size)
|
rank
是分给多个进程(在这里就是GPU)的单独标识符,例如第一个GPU是rank0, 第二个GPU是rank1。world_size
是指整个机器上有多少进程(在这里也是GPU)。现在我们来实现这个函数
1 2 3 4 5 6 7 8 9 10
| def ddp_setup(rank, world_size) os.environ['MASTER_ADDR'] = 'localhost' # 使用一台机器多张卡,所以主节点设在本地IP上 os.environ['MASTER_PORT'] = '12345' # 主节点IP的端口,由于在本机上,随便填写一个 init_process_group( backend='nccl', rank=rank, world_size=world_size )
|
为了方便,这里写一个toy trainer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class Trainer(): def __init__( self, model, train_data, optimizer, gpu_id, save_every ): self.gpu_id = gpu_id self.model = model.to(gpu_id) self.train_data = train_data self.optimizer = optimizer self.save_every = save_every self.model = DDP( self.model, device_ids=[self.gpu_id] )
def save_ckpt(self): ckpt = self.model.module.state_dict() torch.save(ckpt, "ckpt.pth")
def train(self, epochs): for epoch in range(epochs): self.train_one_epoch() if self.gpu_id == 0 and epoch % self.save_every ==0: self.save_ckpt()
|
另外,还需要在data loader中传入分布式采样器
1 2 3 4 5 6 7 8
| def prepare_dataloader(dataset, batch_size): return DataLoader( dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset) )
|
最后配置主函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| def main( rank, world_size, total_epochs, save_every ): ddp_setup(rank, world_size) dataset, model, optimizer = init_train_objs() train_data = prepare_dataloader(dataset, batch_size=32) trainer = Trainer( model, train_data, optimizer, rank, save_every ) trainer.train(total_epochs) destroy_process_group()
|
最后是程序入口
1 2 3 4 5 6 7 8 9 10 11 12 13
| if __name__ == "__main__": total_epochs = 10, save_every = 2 world_size = torch.cuda.device_count() mp.spawn( main, args=(world_size, total_epochs, save_every), nprocs=world_size )
|
所以PyTorch的DDP是一个多进程设计思路,每个GPU看作一个进程,因此我们需要知道可以开多少进程(world_size),以及程序运行在哪个进程(rank)。我们在很多函数中都要传入rank
参数,就是因为DDP会告诉每个进程处在哪个rank上,虽然每个rank上运行的都是相同的代码,但有些时候判断是否在rank0
是有用的。另外,数据需要分布式采样器来为每个GPU分配数据。
更大型的项目中会使用到更多的分布式训练技巧,Meta的开源项目就写了很好的example,将在下一篇文章中解读。