V · 2024年08月07日

PyTorch中的多进程并行处理

PyTorch是一个流行的深度学习框架,一般情况下使用单个GPU进行计算时是十分方便的。但是当涉及到处理大规模数据和并行处理时,需要利用多个GPU。这时PyTorch就显得不那么方便,所以这篇文章我们将介绍如何利用torch.multiprocessing模块,在PyTorch中实现高效的多进程处理。

image.png

多进程是一种允许多个进程并发运行的方法,利用多个CPU内核和GPU进行并行计算。这可以大大提高数据加载、模型训练和推理等任务的性能。PyTorch提供了torch.multiprocessing模块来解决这个问题。

导入库

 import torch
 import torch.multiprocessing as mp
 from torch import nn, optim

对于多进程的问题,我们主要要解决2方面的问题:1、数据的加载;2分布式的训练

数据加载

加载和预处理大型数据集可能是一个瓶颈。使用torch.utils.data.DataLoader和多个worker可以缓解这个问题。

 from torch.utils.data import DataLoader, Dataset
 class CustomDataset(Dataset):
     def __init__(self, data):
         self.data = data
     def __len__(self):
         return len(self.data)
     def __getitem__(self, idx):
         return self.data[idx]
 data = [i for i in range(1000)]
 dataset = CustomDataset(data)
 dataloader = DataLoader(dataset, batch_size=32, num_workers=4)
 for batch in dataloader:
     print(batch)

num_workers=4意味着四个子进程将并行加载数据。这个方法可以在单个GPU时使用,通过增加数据读取进程可以加快数据读取的速度,提高训练效率。

分布式训练

分布式训练包括将训练过程分散到多个设备上。torch.multiprocessing可以用来实现这一点。

我们一般的训练流程是这样的

 class SimpleModel(nn.Module):
     def __init__(self):
         super(SimpleModel, self).__init__()
         self.fc = nn.Linear(10, 1)
 def forward(self, x):
         return self.fc(x)
 def train(rank, model, data, target, optimizer, criterion, epochs):
     for epoch in range(epochs):
         optimizer.zero_grad()
         output = model(data)
         loss = criterion(output, target)
         loss.backward()
         optimizer.step()
         print(f"Process {rank}, Epoch {epoch}, Loss: {loss.item()}")

要修改这个流程,我们首先需要初始和共享模型

 def main():
     num_processes = 4
     data = torch.randn(100, 10)
     target = torch.randn(100, 1)
     model = SimpleModel()
     model.share_memory()  # Share the model parameters among processes
     optimizer = optim.SGD(model.parameters(), lr=0.01)
     criterion = nn.MSELoss()
     processes = []
     for rank in range(num_processes):
         p = mp.Process(target=train, args=(rank, model, data, target, optimizer, criterion, 10))
         p.start()
         processes.append(p)
     for p in processes:
         p.join()
 if __name__ == '__main__':
     main()

上面的例子中四个进程同时运行训练函数,共享模型参数。

多GPU的话则可以使用分布式数据并行(DDP)训练

对于大规模的分布式训练,PyTorch的torch.nn.parallel.DistributedDataParallel(DDP)是非常高效的。DDP可以封装模块并将其分布在多个进程和gpu上,为训练大型模型提供近线性缩放。

 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel as DDP

修改train函数初始化流程组并使用DDP包装模型。

 def train(rank, world_size, data, target, epochs):
     dist.init_process_group("gloo", rank=rank, world_size=world_size)
     
     model = SimpleModel().to(rank)
     ddp_model = DDP(model, device_ids=[rank])
     
     optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
     criterion = nn.MSELoss()
 
     for epoch in range(epochs):
         optimizer.zero_grad()
         output = ddp_model(data.to(rank))
         loss = criterion(output, target.to(rank))
         loss.backward()
         optimizer.step()
         print(f"Process {rank}, Epoch {epoch}, Loss: {loss.item()}")
 
     dist.destroy_process_group()

修改main函数增加world_size参数并调整进程初始化以传递world_size。

 def main():
     num_processes = 4
     world_size = num_processes
     data = torch.randn(100, 10)
     target = torch.randn(100, 1)
     mp.spawn(train, args=(world_size, data, target, 10), nprocs=num_processes, join=True)
 if __name__ == '__main__':
     mp.set_start_method('spawn')
     main()

这样,就可以在多个GPU上进行训练了

常见问题及解决

1、避免死锁

在脚本的开头使用mp.set_start_method('spawn')来避免死锁。

 if __name__ == '__main__':
     mp.set_start_method('spawn')
     main()

因为多线程需要自己管理资源,所以请确保清理资源,防止内存泄漏。

2、异步执行

异步执行允许进程独立并发地运行,通常用于非阻塞操作。

 def async_task(rank):
     print(f"Starting task in process {rank}")
     # Simulate some work with sleep
     torch.sleep(1)
     print(f"Ending task in process {rank}")
 def main_async():
     num_processes = 4
     processes = []
     
     for rank in range(num_processes):
         p = mp.Process(target=async_task, args=(rank,))
         p.start()
         processes.append(p)
     
     for p in processes:
         p.join()
 if __name__ == '__main__':
     main_async()

3、共享内存管理

使用共享内存允许不同的进程在不复制数据的情况下处理相同的数据,从而减少内存开销并提高性能。

 def shared_memory_task(shared_tensor, rank):
     shared_tensor[rank] = shared_tensor[rank] + rank
 def main_shared_memory():
     shared_tensor = torch.zeros(4, 4).share_memory_()
     processes = []
     
     for rank in range(4):
         p = mp.Process(target=shared_memory_task, args=(shared_tensor, rank))
         p.start()
         processes.append(p)
     
     for p in processes:
         p.join()
     print(shared_tensor)
 if __name__ == '__main__':
     main_shared_memory()

共享张量shared_tensor可以被多个进程修改

总结

PyTorch中的多线程处理可以显著提高性能,特别是在数据加载和分布式训练时使用torch.multiprocessing模块,可以有效地利用多个cpu,从而实现更快、更高效的计算。无论您是在处理大型数据集还是训练复杂模型,理解和利用多处理技术对于优化PyTorch中的性能都是必不可少的。使用分布式数据并行(DDP)进一步增强了跨多个gpu扩展训练的能力,使其成为大规模深度学习任务的强大工具。

https://avoid.overfit.cn/post/a68990d2d9d14d26a4641bbaf265671e

推荐阅读
关注数
4197
内容数
904
SegmentFault 思否旗下人工智能领域产业媒体,专注技术与产业,一起探索人工智能。
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息