InfoQ · 2020年07月20日

Uber正式开源分布式机器学习平台:Fiber

Uber 开发了 POET、Go-Explore 和 GTN 等算法,这些算法利用大量的计算来训练神经网络模型。为了使未来几代类似算法的大规模计算成为可能,Uber 进而开发了一种新的分布式计算库 Fiber,它可以帮助用户轻松地将本地计算方法扩展到成百上千台机器上。Fiber 可以使使用 Python 的大规模计算项目变得快速、简单和资源高效,从而简化 ML 模型训练过程,并获得更优的结果。

本文最初发布于 Uber 工程博客,由 InfoQ 中文站翻译并分享。

1.png

Fiber:Uber 的开源分布式机器学习平台,图片由 Flat UI Kit 提供,项目地址

在过去的几年中,计算机不断增强的处理能力推动了机器学习的进步。算法越来越多地利用并行性,并依赖分布式训练来处理大量数据。然而,随之而来的是增加数据和训练的需求,这对管理和利用大规模计算资源的软件提出了巨大的挑战。

在 Uber,我们开发了 POET、Go-Explore 和 GTN 等算法,这些算法利用大量的计算来训练神经网络模型。为了使未来几代类似算法的大规模计算成为可能,我们开发了一种新的分布式计算库 Fiber,它可以帮助用户轻松地将本地计算方法扩展到成百上千台机器上。Fiber 可以使使用 Python 的大规模计算项目变得快速、简单和资源高效,从而简化 ML 模型训练过程,并获得更优的结果。

大规模分布式计算的挑战

在理想情况下,将运行在一台机器上的应用程序扩展为运行在一批机器上的应用程序应该很容易,只需更改命令行参数即可。然而,在现实世界中,这并不容易。

我们每天都与许多运行大规模分布式计算任务的人一起工作,我们发现,现在很难利用分布式计算的原因有以下几个:

  • 在笔记本或台式机本地运行代码与在生产集群上运行代码之间存在着巨大的差距。 你可以让 MPI 在本地运行,但在计算机集群上运行它是完全不同的过程。
  • 不能动态扩展。如果你启动了一个需要大量资源的作业,那么你很可能需要等待,直到所有资源都分配好了才可以运行该作业。这个等待降低了扩展的效率。
  • 错误处理缺失。在运行时,有些作业可能会失败。你可能不得不还原部分结果或整个地放弃本次运行。
  • 学习成本很高。每个系统都有不同的 API 和编程约定。要使用新系统启动作业,用户必须学习一套全新的约定。

新的 Fiber 平台专门解决了这些问题。它为更广泛的用户群体提供了无缝使用大规模分布式计算的可能。

Fiber 简介

Fiber 是一个用于现代计算机集群的基于 Python 的分布式计算库。用户可以利用这个系统针对整个计算机集群进行编程,而不是只针对单个台式机或笔记本电脑。它最初是为了支持像 POET 这样的大规模并行科学计算项目而开发的,Uber 也已经用它来支持类似的项目。Fiber 的功能非常强大,这样主要是因为:

  • 易于使用。Fiber 允许用户编写在计算机集群上运行的程序,而不需要深入研究计算机集群的细节。
  • 易于学习。Fiber 提供了与 Python 标准 多处理 库相同的 API。知道如何使用多处理库的工程师可以很容易地用 Fiber 编写计算机集群程序。
  • 快速可靠。Fiber 的通信中枢基于 Nanomsg 构建,这是一个高性能异步消息传递库,可以提供快速、可靠的通信。
  • 不需要部署。Fiber 在计算机集群上的运行方式与普通应用程序相同。它会自动为用户处理资源分配和通信。
  • 提供了可靠的计算。Fiber 内置的错误处理功能让用户可以专注于编写实际的应用程序代码,而不是处理崩溃问题。当运行一个工作进程池时,这尤其有价值。

除了这些好处之外,Fiber 还可以在特别关注性能的领域与其他专用框架搭配使用。例如,对于 随机梯度下降(SGD),Fiber 的 Ring 特性 可以帮助我们在计算机集群上建立分布式训练作业,并允许它与 Horovod 或 torch.distributed 协同。

2.png

<center>图 1:Fiber 启动许多不同的作业支持(job-backed)进程,然后在其中运行不同的 Fiber 组件和用户进程。Fiber Master 是管理所有其他进程的主进程。有些进程(如 Ring Node)保持成员之间的通信。</center/>

Fiber 可以帮助从事大规模分布式计算的用户减少从产生想法到在计算集群上实际运行分布式作业的时间。它还可以帮助用户屏蔽配置和资源分配任务的繁琐细节,并且可以缩短调试周期,简化从本地开发到集群开发的转换。

架 构

Fiber 让我们可以灵活地为经典的多处理 API 选择可以在不同集群管理系统上运行的后端。为了实现这种集成,Fiber 被分为三个不同的层:API 层、后端层和集群层。API 层为 Fiber 提供了进程、队列、池和管理器等基本构建块。它们具有与多处理相同的语义,但是我们对它们进行扩展了,使它们可以在分布式环境中工作。后端层处理在不同集群管理器上创建或终止作业的任务。当用户新增一个后端时,所有其他 Fiber 组件(队列、池等)都不需要更改。最后,集群层由不同的集群管理器组成。尽管它们不是 Fiber 本身的一部分,但是它们帮助 Fiber 管理资源并跟踪不同的作业,减少了 Fiber 所需要跟踪的项的数量。图 2 是总体架构图:

3.png

<center>图 2:Fiber 的架构包括 API 层、后端层和集群层,这让它可以在不同的集群管理系统上运行。</center/>

作业支持进程

Fiber 引入了一个新的概念,称为 作业支持过程(也称为 Fiber 进程)。这些进程与 Python 多处理库中的进程类似,但是更灵活:多处理库中的进程只在本地机器上运行,但 Fiber 进程可以在不同的机器上远程运行,也可以在同一机器上本地运行。当新的 Fiber 进程启动时,Fiber 会在当前计算机集群上创建一个具有适当 Fiber 后端的新作业。

4.png

<center>图 3:Fiber 中的每个作业支持进程都是在计算机集群上运行的一个容器化作业。每个作业支持进程也有自己的 CPU、GPU 和其他计算资源。在容器内运行的代码是自包含的。</center/>

Fiber 使用容器来封装当前进程的运行环境(如上图 3 所示),其中包括所有必需的文件、输入数据和其他依赖的程序包,而且要保证每个元素都是自包含的。所有子进程都以与父进程相同的容器镜像启动,以确保运行环境的一致性。因为每个进程都是一个集群作业,所以它的生命周期与集群上的任何作业相同。为了方便用户,Fiber 被设计成直接与计算机集群管理器交互。因此,不像 Apache Spark 或 ipyparallel,Fiber 不需要在多台机器上设置,也不需要通过任何其他机制引导。它只需要作为一个普通的 Python pip 包安装在一台机器上。

组 件

Fiber 基于 Fiber 进程实现了大多数多处理 API,包括管道、队列、池和管理器。

Fiber 中队列和管道的行为方式与多处理相同。不同之处在于,Fiber 中的队列和管道由运行在不同机器上的多个进程共享。两个进程可以从同一个管道读取和写入数据。此外,队列可以在不同机器上的多个进程之间共享,每个进程可以同时向同一队列发送或从同一队列接收信息。Fiber 队列是用高性能异步消息队列系统 Nanomsg 实现的。

5.png

<center>图 4:Fiber 可以在不同的 Fiber 进程之间共享队列。在本例中,一个 Fiber 进程与队列位于同一台机器上,另外两个进程位于另一台机器上。一个进程写入队列,另外两个进程读取队列。</center/>

Fiber 也支持 ,如下图 5 所示。它们让用户可以管理工作进程池。Fiber 使用 作业支持进程 扩展池,以便每个池可以管理数千个(远程)工作进程。用户还可以同时创建多个池。

6.png

<center>图 5:在具有三个工作进程的池中,如本例所示,两个工作进程位于一台机器上,另一个位于另一台机器上。它们共同处理提交到主进程中任务队列的任务,并将结果发送到结果队列。</center/>

管理器代理对象 使 Fiber 能够支持共享存储,这在分布式系统中至关重要。通常,这个功能由计算机集群外部存储系统如 Cassandra 和 Redis 提供。相反,Fiber 为应用程序提供了内置的内存存储。该接口与多处理系统中的管理器类型接口相同。

Ring 是对多处理 API 的扩展,可以用于分布式计算设置。在 Fiber 中,Ring 指的是一组共同工作的、相对平等的进程。与池不同,Ring 没有主进程和辅助进程的概念。Ring 内的所有成员承担大致相同的责任。Fiber 的 Ring 模型拓扑(如下图 6 所示)在机器学习分布式 SGD 中非常常见,torch.distributed 和 Horovod 就是例子。一般来说,在一个计算机集群上启动这种工作负载是非常具有挑战性的;Fiber 提供 Ring 特性就是为了帮助建立这样的拓扑。

7.png

<center>图 6:在一个有四个节点的 Fiber Ring 中,Ring 节点 0 和 Ring 节点 3 运行在同一台机器上,但在两个不同的容器中。Ring 节点 1 和节点 2 都在单独的机器上运行。所有这些进程共同运行同一函数的副本,并在运行期间相互通信。</center/>

应用程序

借助上述灵活的组件,我们现在可以使用 Fiber 构建应用程序了。在这一节中,我们将展示两种使用 Fiber 帮助用户构建分布式应用程序的方式。

赋能新应用程序

在下面的例子中,我们将展示工程师如何运用 Fiber 来实现大规模分布式计算。这个例子演示的是一个 强化学习(RL)算法。通常,分布式 RL 的通信模式涉及在机器之间发送不同类型的数据,包括动作、神经网络参数、梯度、per-step/episode 观察及奖励。

Fiber 实现了管道和池来传输这些数据。在底层,池是普通的 Unix 套接字,为使用 Fiber 的应用程序提供接近线路速度的通信。现代计算机网络的带宽通常高达每秒几百千兆。通过网络传输少量数据 通常速度很快。

此外,如果有许多不同的进程向一个进程发送数据,进程间通信延迟也不会增加太多,因为数据传输可以并行进行。事实证明,Fiber 池可以作为许多 RL 算法的基础,因为模拟器可以在各个池工作进程中运行,并且结果可以并行回传。

下面的示例显示了使用 Fiber 实现的简化 RL 代码:

# fiber.BaseManager is a manager that runs remotely
class RemoteEnvManager(fiber.managers.AsyncManager):
pass
class Env(gym.env):
# gym env
pass
RemoteEnvManager.register(‘Env’, Env)
def build_model():
# create a new policy model
return model
def update_model(model, observations):
# update model with observed data
return new_model
def train():
model = build_model()
manager = RemoteEnvManager()
num_envs = 10
envs = [manager.Env() for i in range(num_envs)]
handles = [envs[i].reset() for i in num_envs]
obs = [handle.get() for handle in handles]
for i in range(1000):
actions = model(obs)
handles = [env.step() for action in actions]
obs = [handle.get() for handle in handles]
model = update_model(model, obs)

赋能现有的多处理应用程序

许多 Python 用户利用了多处理。Fiber 为此类应用程序提供了更多的机会,通过这种系统,只需更改几行代码,就可以在类似于 Kubernetes 的计算机集群上的分布式设置中运行。

例如,OpenAI Baselines 是一个非常流行的 RL 库,它有许多参考算法,比如 DQN 和 PPO。它的缺点是只能在一台机器上工作。如果希望大规模地训练 PPO,就必须创建自己的基于 MPI 的系统并手动设置集群。

相比之下,有了 Fiber,事情就简单多了。它可以无缝地扩展像 PPO 这样的 RL 算法,从而利用分布式环境的数百个工作进程。Fiber 提供了与多处理相同的 API,OpenAI Baselines 就是使用这些 API 在本地获取多核 CPU 的处理能力。要让 OpenAI Baselines 使用 Fiber,只需要修改 一行代码:

8.png

修改完这行代码,OpenAI Baselines 就可以在 Kubernetes 上运行了。我们在 这里 提供了在 Kubernetes 上运行 OpenAI Baselines 的完整指南。

错误处理

Fiber 实现了基于池的错误处理。在创建新池时,还将创建关联的任务队列、结果队列和挂起表。然后,用户可以将新创建的任务添加到任务队列中。该任务队列由主进程和工作进程共享。每个工作进程从任务队列中获取一个任务,然后在该任务中运行任务函数。每当用户从任务队列中删除一个任务时,Fiber 就会在挂起表中添加一个条目。工作进程完成该任务后会将结果放入结果队列中。然后,Fiber 从挂起表中删除与该任务相关的条目。

9.png

10.png

<center>图 7:上图是一个包含四个工作进程的普通 Fiber 池。在下图,Worker 3 出现故障,因此 Fiber 启动一个新的工作进程(Worker 5),然后准备将其添加到池中。</center/>

如果池里有一个工作进程在处理过程中失败,如上图 7 所示,父池作为所有工作进程的进程管理器将会检测到该失败。然后,如果这个失败的进程有挂起任务,则父池会将挂起表中的挂起任务放回到任务队列中。接下来,它启动一个新的工作进程来替换之前失败的进程,并将新创建的工作进程绑定到任务队列和结果队列。

性 能

Fiber 最重要的应用之一是扩展计算算法(如 RL) 和基于群体的方法(如 ES)。在这些应用程序中,延迟非常关键。RL 和基于群体的方法经常应用于需要与模拟器(如 ALE、Gym 和 Mujoco)频繁交互以评估策略和收集经验的设置中。等待模拟器结果所带来的延迟严重影响了整体的训练性能。

为了测试 Fiber,我们将其性能与其他框架进行了比较。我们还在框架开销测试中增加了 Ray,以提供一些初步结果,并希望在将来添加更详细的结果。

通常有两种方法可以减少 RL 算法和基于群体的方法的延迟。要么我们可以减少需要传输的数据量,要么我们可以提升不同进程之间通信通道的速度。为了加快通信处理,Fiber 使用 Nanomsg 实现了管道和池。此外,用户还可以使用 speedus 这样的库进一步提高性能。

框架开销

通常,框架中的组件会影响计算资源,因此,我们测试了 Fiber 的开销。我们比较了 Fiber、Python 多处理库、Apache Spark、Ray 和 ipyparallel。在测试过程中,我们创建了一批工作负载,完成这些任务所需的总时间是固定的。每个任务的持续时间从 1 秒到 1 毫秒不等。

对于每个框架,我们在本地运行了 5 个工作进程,并通过调整批次的大小来确保每个框架的总耗时大约为 1 秒(即 1 毫秒的任务,我们运行了 5000 个)。我们假设,Fiber 的性能应该和多处理差不多,因为 Fiber 和多处理都不依赖于复杂的调度机制。相反,我们认为 Apache Spark、Ray 和 ipyparallel 会比 Fiber 慢,因为它们中间依赖于调度器。

11.png

<center>图 8:在测试 Fiber、Python 多处理库、Apache Spark 和 ipyprallel 的框架开销时,我们在本地运行了 5 个工作进程,并调整批次大小,使每个框架在大约 1 秒钟内完成任务。</center/>

当任务持续时间为 100 毫秒或更多时,Fiber 几乎没有表现出任何差异,当任务持续时间降至 10 毫秒或 1 毫秒时,它比其他框架更接近多处理库。

我们以多处理作为参考,因为它非常轻量级,除了创建新进程和并行运行任务外没有实现任何其他特性。此外,它还利用了仅在本地可用的通信机制(例如共享内存、Unix 域套接字等)。这使得支持分布式资源管理系统的其他框架难以超越多处理,因为这些系统无法利用类似的机制。

12.png

<center>图 9:我们的开销测试显示,Fiber 的执行情况与 Python 多处理库类似,在 1 毫秒处,ipyparallel 和 Apache Spark 处理任务的耗时更长。最佳完成时间为 1 秒。</center/>

与 Fiber 相比,ipyparallel 和 Apache Spark 在每个任务持续时间上都落后很多。当任务持续时间为 1 毫秒时,ipyparallel 花费的时间几乎是 Fiber 的 24 倍,Apache Spark 花费的时间是后者的 38 倍。显然,当任务持续时间较短时,ipyparallel 和 Apache Spark 都引入了相当大的开销,而且,对于 RL 和基于群体的方法,它们不如 Fiber 合适,后者使用了模拟器,响应时间只有几毫秒。我们还可以看到,在运行 1 毫秒的任务时,Ray 花费的时间大约是 Fiber 的 2.5 倍。

分布式任务测试

为了探究 Fiber 的可伸缩性和效率,我们将其与 ipyparallel 进行了比较,由于之前的性能测试结果,我们没有考虑 Apache Spark。我们也排除了 Python 多处理库,因为它不能扩展到一台机器之外。我们运行了 50 次 进化策略迭代(ES),根据它们的耗时对比了 Fiber 和 ipyparallel 的可伸缩性和效率。

在工作负载相同的情况下,我们预计 Fiber 可以完成得更快,因为前面已测试过,它的开销比 ipyparallel 小得多。对于 Fiber 和 ipyparallel,我们使用的群体大小为 2048,因此,无论工作进程的数量多少,总计算量都是固定的。我们还在两者中实现了相同的 共享噪音表,每八个工作进程共享一个噪音表。这项工作的实验域是 OpenAI Gym Bipedal Walker Hardcore 环境的一个修改版本,这里 对修改进行了描述。

13.png

<center>图 10:当 ES 迭代 50 次以上时,使用不同数量的工作进程运行 ES,Fiber 的扩展性均优于 ipyparallel。每个工作进程在单个 CPU 上运行。</center/>

主要结果是,Fiber 的扩展性比 ipyparallel 更好,并且完成每次测试的速度明显更快。随着工作进程数从 32 增加到 1024,Fiber 的运行时间逐渐缩短。相比之下,当工作进程数从从 256 增加到 512 时,ipyparallel 的运行时间逐渐变长。在使用 1024 个工作进程时,由于进程之间的通信错误,ipyparallel 未能完成运行。这个失败削弱了 ipyparallel 运行大规模并行计算的能力。根据 Amdahl 定律,我们看到,当工作进程数增加到 512 以上时,Fiber 的收益会减少。在这种情况下,主进程处理数据的速度就会成为瓶颈。

总的来说,在所有工作进程数的测试中,Fiber 的性能都超过了 ipyparallel。此外,与 ipyparallel 不同的是,Fiber 在运行 1024 个工作进程时也完成了这项工作。这个结果更能显示出 Fiber 与 ipyparallel 相比具有更好的可伸缩性。

结 论

Fiber 是一个新的 Python 分布式库,现已 开源。我们设计它是为了让用户能够在一个计算机集群上轻松地实现大规模计算。实验表明,Fiber 实现了我们的许多目标,包括有效地利用大量的异构计算硬件,动态地伸缩算法以提高资源使用效率,以及减少在计算机集群上运行复杂算法所需的工程负担。

我们希望,Fiber 将进一步加快解决工程难题的进展,使开发方法并大规模地运行以了解其好处变得更容易。

本文转自 公众号:AI前线 ,作者Jiale Zhi 等,点击阅读原文
推荐阅读
关注数
12504
内容数
151
跟踪科技公司、科研机构和IT产业在AI方向的最新动态,挖掘AI技术应用场景和商业化落地案例。
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息