十二 · 2023年08月30日

pipeline高端玩法(三)——一个pipeline是如何建立起来的

✎ 编 者 按 

续接上文,这一次我们来详细了解下一个简单的pipeline是如何构建起来的

最简单的流水线

书接上文,一个最简单的流水线例子,这里对data_in打两拍做输出:

image.png

我们逐行分析pipeline里每一行代码都干了什么。

Stage分析

在第八行我们声明了一个Stageable变量,如前文所述,Stageable变量并不会立即产生电路对象。

代码第9行,我们创建一个Stage,Stage中的下面的代码会通过调用Pipeline中的addStage函数向Pipeline中的StageSet添加当前Stage:

if(_pip != null) {
    _pip.addStage(this)
}

在stage0中,第11行用io.data_in.valid驱动stage0中的internals.input.valid。而在第12行,从左向右看,this(payload)函数会调用Stage的apply函数:

def apply[T <: Data](key : Stageable[T]) : T = {
    apply(StageableKey(key.asInstanceOf[Stageable[Data]], null)).asInstanceOf[T]
}

其进一步调用:

def apply(key : StageableKey) : Data = {
    internals.stageableToData.getOrElseUpdate(key, ContextSwapper.outsideCondScope{
      key.stageable()//.setCompositeName(this, s"${key}")
    })
}

可以看到,这里的主要作用是一StageableKey作为Key查询internals.stageableToData中是否包含该key,如果有则返回其value,否则创建该key-value,并将value返回。最终这里会返回一个UInt(8 bits)电路对象。并将io.data_in.payload赋值给该电路对象。此时,stage0中的internals.stageableToData中包含一个元素。

在第14行,我们创建了stage1,其例化时传入了Connection,其会调用:

if(_pip != null) {
    _pip.addStage(this)
  }

  def chainConnect(connection: ConnectionLogic): Unit ={
    _pip.connect(_pip.stagesSet.takeWhile(_ != this).last, this)(connection)
  }

  def this(connection: ConnectionLogic)(implicit _pip: Pipeline) {
    this()
    chainConnect(connection)
  }       

这里的的调用关系是:

  1. 当前Pipeline的StageSet添加当前Stage元素
  2. 通过调用pipeline的connect函数和StageSet中的最后一个元素建立连接关系:
def connect(m : Stage, s : Stage)(logics : ConnectionLogic*) = {
    val c = new ConnectionModel
    connections += c
    c.m = m
    c.s = s
    c.logics ++= logics
    c
}

在connection里,创建了一个连接关系ConnectionModel,用于连接stage0中的output接口和stage1的input接口,采用M2S()方式进行连接。

同样,15行同样stage2的创建亦如此。

代码第16行则是用stage2的output.valid驱动data_out的valid输出。而代码第17行通过同样的方式创建了一个UInt(8 bits)电路对象,用来驱动data_out.payload。此时stage2中的internals.stageableToData中包含一个元素。

最后调用pipeline的build函数进行流水线的搭建。

build分析

我们按片段来分析build中的代码:

image.png

首先,上面第一行代码将connection中的stage添加到stageSet。在执行该行之前,stageSet中包含了stage0、stage1、stage2三个元素,而connection按照描述进行map后得到两个元素:(stage0,stage1),(stage1,stage2)。由于stageSet为LinkedHashSet,故执行完后依然是(stage0,stage1,stage2)。

第二行代码则是获取带有slave驱动的stage,这里即stage0(驱动stage1)、stage1(驱动stage2)。

第三行则是获取没有slave驱动的stage,这里只有stage2。

第7~8行则是分别建立每个stage都是由谁驱动的映射存储至stageMaster、而stage input的驱动逻辑则存储至stageDriver。同时这里也限制了每个stage,最多只能由一个驱动逻辑。处理完后,stageMaster、stageDriver中存储的元素分别为:

stageMaster:
  stage0->ArrayBuffer()
  stage1->ArrayBuffer(stage0)
  stage2->ArrayBuffer(stage2)
stageDriver:
  stage1->Connection(m=stage0,s=stage1,logic=M2S)
  stage2->Connection(m=stage1,s=stage2,logic=M2S)

代码13~16行由于我们并没有使用到stageableResultingToData,这里可以暂时忽略,等后续章节再进行讨论。

val clFlush = mutable.LinkedHashMap[ConnectionLogic, Bool]()
val clFlushNext = mutable.LinkedHashMap[ConnectionLogic, Bool]()
val clFlushNextHit = mutable.LinkedHashMap[ConnectionLogic, Bool]()
val clThrowOne = mutable.LinkedHashMap[ConnectionLogic, Bool]()
val clThrowOneHit = mutable.LinkedHashMap[ConnectionLogic, Bool]()

这里声明的变量我们这里都是用不上,可以暂时先不进行关注。

def propagateData(key : StageableKey, stage : Stage): Boolean ={
  if(stage.internals.stageableTerminal.contains(key)) return false
  stage.stageableToData.get(key) match {
    case None => {
      val hits = ArrayBuffer[Stage]()
      for(m <- stageMasters(stage)){
        if(propagateData(key, m)){
          stage.apply(key) //Force creation
          hits += m
        }
      }
      hits.size match {
        case 0 => false
        case 1 => true
        case 2 => PendingError(s"$key at $stage has multiple drivers : ${hits.mkString(",")}"); false
      }
    }
    case Some(x) => true
  }
}

for(stage <- stagesSet){
  for(key <- stage.stageableToData.keys){
    for(m <- stageMasters(stage)) {
      propagateData(key, m);
    }
  }
}

这里就有点儿意思了对于StageSet中的每个stage中stageableToData中的每个元素,都会调用propgateData函数进行处理。我们不妨先来看看此时各stage中的stageableToData中的元素:

stage0:
  StageableKey(payload,null)->UInt(8)
stage1:
  null
stage2:
  StageableKey(payload,null)->UInt(8)

由于stage0没有master,无需考虑,stage1中stageableToData为空,也可以跳过。那么来看stage2,此时调用propagateData传入的参数为:

key=StageableKey(payload,null)->UInt(8)
m=stage1

进入propagateData函数,由于stageableTerminal我们并未使用,继续往下看,由于stage1中的stageableToData为空,故这里match匹配为None,此时会看stage1的master端口,此时嵌套调用propagateData,传入参数为:

key=StageableKey(payload,null)->UInt(8)
m=stage0

由于stage0中的stageableToData包含该key,那么此时返回true退出,再次回到头次调用的处理上。

由于嵌套返回true,那么此时会在stage1上调用apply函数,为stage1插入一个stageableKey。最终,各stage中stageableToData的结果为:

stage0:
  StageableKey(payload,null)->UInt(8)
stage1:
  StageableKey(payload,null)->UInt(8)
stage2:
  StageableKey(payload,null)->UInt(8)

看到这里,是不是明白了一些门道了呢?pipeline自动帮我们补齐了stage1中所依赖的元素,完成了payload从stage0到stage2中的传输元素补齐。

对于propagateRequirements函数,这里我们并用不上,这里我们先无需关注。

接下来看时internal connections:

image.png

这里我们近会用到s.output.valid:=s.input.valid,即将每个stage中的internal.output.valid由internal.input.valid进行驱动。

剩下的就是stage之间的连接了:

for(c <- connections){
  val stageables = (c.m.stageableToData.keys).filter(key => c.s.stageableToData.contains(key) && !c.m.stageableTerminal.contains(key))
  var m = ConnectionPoint(c.m.output.valid, c.m.output.ready, stageables.map(c.m.outputOf(_)).toList)
  for((l, id) <- c.logics.zipWithIndex){
    val s = if(l == c.logics.last)
      ConnectionPoint(c.s.input.valid, c.s.input.ready, stageables.map(c.s.stageableToData(_)).toList)
    else {
      ConnectionPoint(Bool(), (m.ready != null) generate Bool(), stageables.map(_.stageable.craft()).toList)
    }
    val area = l.on(m, s, clFlush(l), clFlushNext(l), clFlushNextHit(l), clThrowOne(l), clThrowOneHit(l))
    if(c.logics.size != 1)
      area.setCompositeName(c, s"level_$id", true)
    else
      area.setCompositeName(c, true)
    m = s
  }
}

对于每个connection,首先是将master端和slave端stage共有的stageableToData给筛选到stageables中去,这里对应的为:

Connection(m=stage0,s=stage1,logic=M2S):
  StageableKey(payload,null)
Connection(m=stage1,s=stage2,logic=M2S):
  StageableKey(payload,null)

接着创建ConnectionPoint,对应的paylaod即为StageabelKey所对应的电路对象,也就意味着:

Connection(m=stage0,s=stage1,logic=M2S):
    m=ConenctionPoint(stage0.out.valid,stage0.out.ready,stage0.stageableToData(StageableKey(payload,null)))
    s=ConenctionPoint(stage1.in.valid,stage1.in.ready,stage1.stageableToData(StageableKey(payload,null)))
Connection(m=stage1,s=stage2,logic=M2S):
    m=ConenctionPoint(stage1.out.valid,stage1.out.ready,stage1.stageableToData(StageableKey(payload,null)))
    s=ConenctionPoint(stage2.in.valid,stage2.in.ready,stage2.stageableToData(StageableKey(payload,null)))

最终,调用M2S.on创建stage之间的连接关系:

image.png

这里我们只用到6~7行,建立起各stage之间的连接关系。

至此!完成整个流水线的创建。

写在最后

通过本篇,分析了一个简单的流水线在Pipeline中的创建实现,后续将陆续进行更加复杂流水线的Demo及其背后自动实现原理。

☆ END ☆

作者:玉骐
原文链接:Spinal FPGA
微信公众号:
 title=

推荐阅读

更多SpinalHDL技术干货请关注[Spinal FPGA]欢迎添加极术小姐姐微信(id:aijishu20)加入技术交流群,请备注研究方向。
推荐阅读
关注数
1578
内容数
131
用SpinalHDL提升生产力
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息