旷视研究院 · 2022年01月07日

技术的真相 | 基于 K8S Framework 框架实现多种扩展调度策略

01 前言

本文将分享旷视 Brain++ AI 生产力平台基于 K8S Framework 框架实现多种扩展调度策略的经历,希望对感兴趣的同学有所帮助。为了让更多的同学对 K8S 调度器有一个较为全面的了解,因此在讲述具体扩展前,会先对 K8S 的调度器进行介绍,其中包括设计,实现以及支持的扩展方式。在介绍调度相关内容之前,先简单介绍一下旷视 Brain++ AI 生产力平台。

备注:对于没有特别说明的代码,都是 V1.19.8。

旷视 Brain++ AI 生产力平台的底层以 Kubebrain 为核心。Kubebrain 以 kubenetes 为底座进行了功能扩展,除了提供原有平台的 workspace/rlaunch/rrun/rjob 等基础功能,还满足了多租户及网络隔离、多级权限管理等面向商业化上的核心诉求,并提供更好的系统扩展性。

下面介绍的调度器属于旷视 Brain++ AI 生产力平台的核心服务之一的计算服务中的资源调度部分。计算服务(以下简称 Compute)主要由资源调度,配额管理,workspace 和任务运行时组成。

02 调度器概览与设计

2.1 概览

2.1.1 Pod启动过程

image.png

step0

用户向镜像仓库推送准备好的镜像。

step1

用户编写所需创建的 Pod 的 Yaml 文件,这个请求会被提交给 Kube-APIServer 组件,该组件会把 Pod 相关信息写入 ETCD。

step2

Kube-Scheduler 会监听没有被调度的 Pod ,并更新 SchedulerCache 和 SchedulingQueue ,前者用于维护调度器视角下的集群情况,后者用于维护待调度 Pods。

step3

经过调度器调度,这里主要可以理解成就是过滤与打分,前者是过滤掉不符合要求的 Nodes ,比如 Label 不匹配,后者是针对过滤出来的 Nodes 进行优选打分,比如 Node 上的资源使用情况, Node 对 Pod 的偏好等都会作为打分的依据。

step4&5&6

当 Pod 完成调度之后,会在 Spec.NodeName 处填入对应的节点名字,此时 Kubelet 会监听到这个信息,并开始执行启动 Pod 的流程,其中包括给 Pod 拉取所需的镜像,建立 PodSandbox 等。

2.2 设计

2.2.1 发展历史

这里对于比较老的版本不再介绍,主要介绍出现 Framework 以后版本的发展情况。
image.png

  • 在 V1.15 版本,提出 Scheduling Framework 设计,期望将原先的过滤和打分过程插件化;
  • 在 V1.18 版本,除了抢占调度逻辑,其它过程基本插件化;
  • 在 V1.19 版本,支持 Multi Scheduling Profiles,同时将抢占调度逻辑插件化,Multi Scheduling Profiles 功能允许针对不同类型的 Pod 采用不同的调度器,类似 Linux 中的调度器,针对不同的 Workloads 采用不同的调度类实现调度。

2.2.2 设计目标

根据Design Proposal of the Scheduling Framework ,对基于 Framework 的调度器提出了以下几点设计目标:

  • 使调度器更具可扩展性;
  • 将调度器的一些特性移到插件中,使其核心更简单;
  • 在框架中提出扩展点;
  • 提出一种机制来接收插件结果,并根据收到的结果继续或中止插件;
  • 建议一种机制来处理错误并与插件进行沟通。

2.2.3 框架

image.png

Scheduling Cycle && Binding Cycle

调度一个 Pod 的分为两个阶段,调度周期和绑定周期。调度周期为 Pod 选择一个节点,绑定周期将该决策应用到集群。调度周期和绑定周期统称为调度上下文,前者要求串行,后者可以并发。如果 Pod 被确定为不可调度或存在内部错误,那么调度周期或绑定周期将被中止,并将 Pod 返回队列等待下一次重试,如果一个绑定周期被终止,那么它将触发 Reserve 插件中的 Unreserve 方法,释放预留的资源。下面对上图中的扩展点进行介绍。

QueueSort

该插件用于对调度队列中的 Pod 进行排序,需要特别注意的是一次只能启用一个排序插件。

PreFilter

此类插件用于预处理关于 Pod 的信息,如果预过滤器返回一个错误,那么调度周期将中止,这些插件完成后会在 CycleState中 填入 Filter 所需的信息。

Filter

此类插件用于过滤掉不能运行 Pod 的节点,对于每个节点,调度器将按照它们配置的顺序调用过滤器插件,若任何过滤器插件将该节点标记为不可用,那么将不会为该节点调用其余的插件。

PostFilter

此类插件在过滤阶段之后被调用,目前只有在经过普通调度流程之后没有为 Pod 找到可行节点时才会调用,即抢占调度。

PreScore

此类插件作用类似 PreFilter,是为了 Score 插件做数据准备。

Score

将根据打分原则,为通过过滤的 Node 进行打分。

特别注意:Normalize Score 不是一个可以单独扩展的插件,在 K8S 的源码实现中,这个插件是通过调用 runScoreExtension 时,调用注册的 Score 插件来实现调用的,此类插件的作用顾名思义就是归一化分数。

Reserve

这类插件支持两个动作,分别是 Reserve 和 UnReserve ,前者是指在为 Pod 确定好 Node 之后,需要在 Pod 执行 Permit 前为其预留资源,因为 Permit 可能会让该 Pod 处于 Waiting 状态,此时在对新的 Pod 进行调度的时候,需要将 Waiting Pod 占用的资源保留下来;后者就是释放对应的资源。

Permit

此类插件可以在为该 Pod 执行绑定前做一些控制操作,比如延迟 Pod 与 Node 绑定。这里支持三个操作:

  • Approve:一旦所有的 Permit 都批准了这个 Pod,那么这个 Pod 就会进入绑定周期;
  • Deny:如果有任何插件拒绝 Pod,那么该 Pod 将被返回到调度队列,重新进行调度,在此之前会调用 UnReserve 来释放资源;
  • Wait:这个操作可以带上等待时间,对于超过等待时间的 Pod 会被返回到调度队列,或者直接等到被批准,同样在返回到调度队列之前会调用 UnReserve 释放资源。

PreBind

此类插件用于执行绑定 Pod 之前所需要的准备的操作,比如等待 Pod 需要的 PVC 处于 Bound 状态。

Bind

此类插件将 Pod 绑定到节点,默认实现就是向 Kube-APIServer 发起将当前 Node 和 Pod 进行绑定的请求,多个被配置的插件会按照顺序执行,只要一个插件成功绑定 Pod ,那么其它的绑定插件都会被跳过。

PostBind

此类插件在 Pod 被成功绑定后调用,这是调度周期的结束,可以用于清理关联的资源。

03 调度器实现

3.1 核心数据结构

3.1.1 Queue

作用

存储待调度或调度失败的 Pod ,并按照一定的要求对其进行排序。

设计

三级队列:

  • 活动队列:根据优先级存储等待调度的 Pods;
  • Backoff 队列:如果 Pod 调度反复失败,则会增长等待调度时间,降低重试效率,从而避免反复失败浪费调度资源,针对调度失败的 Pod 会优先存储在 Backoff 队列中,等待后续重试;
  • 不可调度队列:出于某些原因导致 Pod 不可被调度(比如:资源不足),该 Pod 会被加入该队列。

接口


// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    // The podSchedulingCycle represents the current scheduling cycle number which can be
    // returned by calling SchedulingCycle().
    // 如何理解 IfNotPresent?表示这个 Pod 在三种队列中都不存在,在加入队列前会先检查该 Pod 是否存在
    // case1: moveRequestCycle < podSchedulingCycle,放入 UnschedulableQ
    // case2: moveRequestCycle >= podSchedulingCycle,放入 BackoffQ
    // schedulingCycle 记录的是调度器循环调度的次数(从队列中拿到 Pod,该数就增加1)
    // moveRequestCycle 是指收到 move 请求(move Pods from UnschedulableQ to ActiveQ/BackoffQ )时对应的 schedulingCycle
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop removes the head of the queue and returns it. It blocks if the
    // queue is empty and waits until a new item is added to the queue.
    Pop() (*framework.QueuedPodInfo, error)
    // case1: ActiveQ/BackoffQ 中有 oldPod,那么在 ActiveQ/BackoffQ 中更新这个 Pod
    // case2: UnschedulableQ 中有 newPod,那么把对应的 Pod 删除,并且将 newPod 加入 ActiveQ
    // case3: 该 Pod 不在任何一个 queue 中,那么将 newPod 加入 ActiveQ
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    // 把 UnschedulableQ 中的 Pods 移动到 ActiveQ 或者 BackoffQ
    // to ActiveQ:超过 backoff 时长
    // to BackoffQ,未超过 backoff 时长
    MoveAllToActiveOrBackoffQueue(event string)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
    NumUnschedulablePods() int
    // Run starts the goroutines managing the queue.
    Run()
}

3.1.2 Cache

作用

收集 Pods 的信息,并提供 Node 级别的聚合信息,用以提高调度效率。

设计

  • 数据感知:通过 Reflector ListAndWatch APIServer 实现;
    image.png
  • Snapshot 机制:每个调度流程启动时都会去获取当下的缓存快照;
    image.png
  • 节点打散:使 Pod 可以均匀分布在集群中;
    image.png
  • 过期删除:当一个 Pod 出于 Assumed 状态后,经过一段时间没有获取到 Add 事件,那么会被删除;
    image.png
  • Pod 状态机:根据不同状态执行不同的动作
  • Initial:从 APIServer 监听到一个(可能完成分配的)Pod;
  • Assumed:在 Scheduler 中完成调度已经绑定的Pod(未实际分配);
  • Added:Pod 被加入到了缓存中;
  • Deleted:Pod 在缓存中被删除;
  • Expired:由于长时间没有感知到真正分配事件而被超时删除。

接口

type Cache interface {
    // PodCount returns the number of pods in the cache (including those from deleted nodes).
    PodCount() (int, error)

    // AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
    // The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
    // After expiration, its information would be subtracted.
    AssumePod(pod *v1.Pod) error

    // FinishBinding signals that cache for assumed pod can be expired
    FinishBinding(pod *v1.Pod) error

    // ForgetPod removes an assumed pod from cache.
    ForgetPod(pod *v1.Pod) error

    // AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
    // If added back, the pod's information would be added again.
    AddPod(pod *v1.Pod) error

    // UpdatePod removes oldPod's information and adds newPod's information.
    UpdatePod(oldPod, newPod *v1.Pod) error

    // RemovePod removes a pod. The pod's information would be subtracted from assigned node.
    // 与 Forget 不同的是,remove 的前提是 added,其它都一样,这样也就意味着 remove 不需要从 assumed中去删除。
    RemovePod(pod *v1.Pod) error

    // GetPod returns the pod from the cache with the same namespace and the
    // same name of the specified pod.
    GetPod(pod *v1.Pod) (*v1.Pod, error)

    // IsAssumedPod returns true if the pod is assumed and not expired.
    IsAssumedPod(pod *v1.Pod) (bool, error)

    // AddNode adds overall information about node.
    AddNode(node *v1.Node) error

    // UpdateNode updates overall information about node.
    UpdateNode(oldNode, newNode *v1.Node) error

    // RemoveNode removes overall information about node.
    RemoveNode(node *v1.Node) error

    // UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
    // The node info contains aggregated information of pods scheduled (including assumed to be)
    // on this node.
    // The snapshot only includes Nodes that are not deleted at the time this function is called.
    // nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
    UpdateSnapshot(nodeSnapshot *Snapshot) error

    // Dump produces a dump of the current cache.
    Dump() *Dump
}

3.2 普通调度流程分析

image.png
普通调度流程和框架给出插件图基本一致,也比较好理解,这里就不再赘述了,这里需要注意的一点是在跑完 Score 插件后如果没有当前 Pod 没有合适的 Node,此时会启动抢占调度逻辑(见下文),这里会调用 PostFilter 插件,其它红色部分表示在该插件上调用返回错误就出调用Unreserve 用于释放资源。根据源码可知,不同的插件点都默认注册了下面的插件,彼此的关系如下图。
image.png
由图 10 可以发现,同一个插件可以注册在不同的插件点,比如volumebinding 既可以在 FilterPlugins 中注册,也可以在 PreBindPlugins 中注册。由于注册的插件繁多,这里不会对这些插件的功能进行一一的介绍,感兴趣的同学可以下载源码分析或者阅读这里的系列文档(文档不定期更新)。

3.3 抢占调度流程分析

在普通调度完,检查该 Pod 的调度结果为空会触发抢占调度,简化流程如图 11。
image.png

3.3.1 podEligibleToPreemptOthers

这一步需要确定 Pod 是否能够走抢占调度逻辑,这类 Pods 需要满足以下条件。

满足的方式

cond1 && cond2 || cond1 && cond3 || cond1 && cond4。

具体条件说明

  • cond1: pod.Spec.PreemptionPolicy == PreemptLowerPriority,该 Pod 本身需要支持抢占调度;
  • cond2: len(pod.Status.NominatedNodeName) == 0,NominatedNodeName 是指当前 Pod 通过抢占被分配到的 Node,表示当前 Pod 还没有通过抢占获得过 Node,这里要注意的是当 Pod 通过抢占指定 Node 后,并不会立马和这个 Node 进行绑定,而是会再次进入调度流程;
  • cond3: len(pod.Status.NominatedNodeName) > 0 && nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable,表示当前 Pod 曾经已经执行过一次抢占了,但是在调度过程中可能被其它更高优先级的 Pod 占用了被 Nominated 的 Node,导致在抢占后的新的一轮调度中出现了 NominatedNode 不再适合的情况,比如不满足 PodAffinity,这表示新调上去的 Pod 不想要当前的这个 Pod 调度上去,所以此时会出现 NominatedNodeName  不为空的情况,在这种情况下,结合 NominatedNode 是因为 UnschedulableAndUnresolvable 导致的也允许再次执行抢占;
  • cond4: len(pod.Status.NominatedNodeName) > 0 && nominatedNodeStatus.Code() != framework.UnschedulableAndUnresolvable && 该 Node 上所有比该 Pod 优先级低的 Pod 的deletetimestamp 均为 nil,这个条件已经比较清楚了,不再解释。

3.3.2 nodeWherePreemptionMightHelp

这一步找出通过抢占可能可以为该 Pod 完成调度的 Nodes,这类 Nodes 只需要满足一个条件 status.Code != UnschedulableAndUnresolvable,

这里需要和上面讲的 

nominatedNodeStatus.Code()==framework.UnschedulableAndUnresolvable 进行区分,这个 Node 其实也会在这一步被过滤掉。

3.3.3 selectVictimsOnNode

step1

计算当前 Node 上所有优先级比 Pod 低的Pods 全部删除是否可行,并且将 Filter 扩展点的插件再跑一遍;

step2

将这类 Victims 分为两类:ViolatingVictims和 NonviolatingVictims;

step3

先遍历 ViolatingVictims,尽可能保证这些 Pods可以不被抢占掉,做法就是把这个 Pod 加入 NodeInfo,并且将 Filter 扩展点的插件再跑一遍;

step4

在遍历 NonviolatingVictims,做法同3。
image.png

3.3.4 callExtender

默认没有,所以没有逻辑。

3.3.5 SelectCandidate

选择原则如下:

  • 最少违反 PDB,关于 PDB 大家可以参考官方文档;
  • 最高优先级最小优先;
  • 优先级总和最低优先;
  • 最少抢占数量优先;
  • 最近更新节点优先;
  • 随机选一个。

越前面的原则可以区分开这些 Nodes 后就会结束不再往下走。

04 调度器扩展方式

4.1 扩展方案

image.png
本文选用的是 Framework 方案。

4.2 扩展策略 1:批量调整

4.2.1 背景

研究员在训练模型时有五个模块需要进行工作,其中数据蒸馏和训练需要同步,因此这两个模块的任务需要保证同时运行,那么同时运行的前提其实就是实现类似批量调度的策略,不能让一类任务空跑,得蒸馏和训练同时跑。

4.2.2 设计

批量调度的设计参考了 Coscheduling based on PodGroup CRD,在此基础上,结合内部研究员经常使用的 Rlaunch 工具,实现了基于 Rlaunch 向研究员提供了批量调度的功能。

使用方式

在使用上,研究员只需提交一个 Yaml 文件,用以申明批量调度相关的要求,具体如下:


apiVersion: workspace.brainpp.cn/v1alpha1
kind: BatchProcess
metadata:
  name: podgroup1
  namespace: megvii-brain
spec:
  podGroup:
    replicas: 4                                 # 批量调度的任务总数
    minAvailable: 2                             # 满足批量调度的最小任务总数
    timeoutSeconds: 10                          # 用户为达到最小任务总数愿意等待的时间,超过之后,如果没有达到那么会所有 Pods 都会调度失败
  simpleProcesses:
  - command:
    - bash
    - -c
    - 'tail -f /etc/hosts'
    replicas: 2                                 # 类型1的任务需要批量调度的任务总数
    minAvailable: 1                             # 类型1的任务满足批量调度的最小任务总数
    resources:                                  # 当前每个任务的资源配置
      limits:
        cpu: 2
        memory: 2048
  - command:                                   
    - bash
    - -c
    - 'tail -f /etc/hosts'
    replicas: 2                                 # 类型2的任务需要批量调度的任务总数
    minAvailable: 1                             # 类型2的任务满足批量调度的最小任务总数
    resources:
      limits:
        cpu: 2
        memory: 2048

这里需要注意的一点是,目前还没有实现在批量调度组内针对不同任务类型保证最小启动个数这样的操作,这个特性后续有可能会实现。

4.2.3 实现

image.png
我们扩展了四个扩展点,分别是 QueueSort,PreFilter,Permit 和 PostBind,图13中 Kubebrain-Scheduler 是指增加了批量调度功能的 Kube-Scheduler,下面对这四个扩展点进行介绍。

QueueSort

原则就是将属于同一个批量调度组的 Pods 尽可能连续排在一起,这里会依据 Pods 的优先级和创建 PodGroup 的时间。在用户提交批量任务的客户端会保证同一个组的 Pods 优先级是一样的,因为如果区分优先级导致无法调度成功,与批量调度的初衷就不一致了。

备注:PodGroup 是用于描述一组批量调度的 Pods 的自定义资源,其中包括超时,最小调度数以及组调度状态等。

PreFilter

目前的功能就是更新当前 Pod 所属的 PodGroup 的状态,如果该 Pod 不需要被批量调度,那么就会略过,在实现中通过 Label 来确定 Pod 所属的 PodGroup。

Permit

这里实现了关键逻辑,具体可见图13。当一个 Pod 进入 Permit 扩展点时,会先找到对应的 PodGroup,并根据配置的等待时间, PodGroup 创建时间和当前时间确定是否超时,具体会有两种情况:

  • 超时,拒绝当前的 Pod,后台有一个 Controller 会定期更新 PodGroup 的状态,告知用户此时批量调度已经失败,请取消该批量任务;
  • 没有超时,统计集群中属于这个 PodGroup 的处于 Scheduled 的 Pods 和 位于 Waiting 队列的 Pods,并加上当前的 Pod,计算是否超过最小要求的任务起动数。

这里需要解释一下的就是为什么还要计算 Scheduled 的 Pods,这是处于这样场景的考虑,比如用户设置了比较长的等待时间,此时这一组的 Pods 都已经正常跑起来了,然后由于优先级比较低或者设置的内存资源太小等导致其中部分 Pods 被杀了,后台 Controller 此时会发现这个事情,并为该 PodGroup 重新创建对应的 Pods,那么此时就需要计算 Scheduled 的 Pods。

PostBind

这个插件做的工作就是更新 PodGroup 的状态,比如 Scheduled。

4.3 扩展策略 2:支持 GPU 卡类型调度

4.3.1 背景

研究员在使用 Rlaunch 进行工作时,可以通过指定不同的 GPU 类型,来要求当前的任务运行在具有此类 GPU 的节点上,这类要求有两类策略,可以是强制的也可以是尽量满足。这里哪些用户可以使用哪些 GPU,这与 Kubebrain 平台的配额管理和组织管理方式有关,这里不再赘述。

4.3.2 设计与实现

这个设计要求给含有不同类型卡的 Node 打上 Label,将用户在 Rlaunch 中对不同类型的卡的需求转换为 Pod 上的 NodeAffinity,但是此时有可能将含有稀有卡的 Node 在经过 Filter 之后仍作为普通 Pod (没有指定卡类型)的备选 Node,因此需要将其过滤掉,这个逻辑是通过扩展 Filter 插件实现的。

4.4 扩展策略 3:支持按机器组类型调整

4.4.1 背景

部分研究小组希望保证任务运行环境的独立性,并且独占稀有资源,那么相当于要把集群中的部分机器作为这些小组独享的机器。

4.4.2 设计与实现

这个设计比较简单,在研究员启动的 Pod 上标记对应的标签表示想要使用的私有机器,底层可以通过将标签翻译成 NodeAffinity 来做,另一种方式就是选择扩展 Filter 插件,将对应的 Node 进行过滤,只保留含有私有机器标签的机器。目前我们选择了使用后者,原因是前者需要给那些没有指定私有机器的 Pod 做反亲和性设置,这需要在 Webhook 中统一实现,相比后者,这种效率较低,因为 Webhook 会走一遍网络请求。

首发:旷视研究院
作者:应泽雯

专栏文章推荐

欢迎关注旷视研究院极术社区专栏,定期更新最新旷视研究院成果
加入旷视:career@megvii.com
推荐阅读
关注数
7707
内容数
164
专注旷视研究院学术论文解读推送,涵盖计算机视觉,文字识别等
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息