chainlink源码分析六

OCR流程分析

Posted by Emo on February 8, 2023

chainlink源码分析

EMO’s Blog

链下聚合DataFeed

简介

所谓的DataFeed服务,本质上是通过不断更新的方式在链上维护一个链下数据的镜像,方便链上合约访问。很明显这种方式的缺点是,维护成本高,数据的时效性差以及数据范围受限。主要是为了满足对于一些需求很大的特定数据的访问,如 Eth 的价格等。

目前提供两种实现方式

  • 链上聚合喂价
  • 链下聚合喂价 (应用OCR实现的DataFeed,本版块将要讲解的)

相较于链上聚合DataFeed,基于OCR的链下聚合DataFeed的优点是,gas的消耗更低,每次数据更新只需要发起一笔交易 (链上聚合DataFeed需要发起 2f+1 笔交易) 。

OCR简述

OCR全称Offchain Reporting是一种链下聚合共识方案。目前OCR1.0只服务于 DataFeed(喂价) 。

常规的区块链共识算法,一般是对于确定性操作进行共识,能够对一个确定的结果达成共识。

但是在链外数据访问方面,由于链外世界缺乏一个全局时钟,所以很难保证所有节点都访问到相同的结果。所以聚合共识是对于一个包含了所有节点的观察结果的报告达成共识。至于如何从这样的一个报告中得到最终答案,则由业务方去决定。

节点网络

节点间的网络采用开源的 Libp2p 搭建

DataFeed所需的服务

节点上创建的链下聚合DataFeed工作,实际上会在节点上运行起多个 Service (服务) ,不同的 Service 承担着不同的工作

  • 监听器服务 负责监听链上事件的工作
  • OCR服务 负责聚合共识以及报告提交的工作
  • Bootstrape服务 网络相关,承担节点发现的工作

后面的分析主要是OCR服务。

OCR模块解析

服务模块

当 configChanged 被触发后,会通过执行 RunOracle() 去创建一个 oracleState 实例,而在 oracleState 的启动过程中,又会通过 RunPacemaker() 和 RunTransmission() 方法去创建 pacemakerState 和 transmissionState 实例。 这三个 xxxState 实例,都具备一个 run() 方法,当他们被创建后,都会在各自的协程里运行 run() 方法。其中 pacemakerState 在每次视图切换的时候都会重新初始化创建 reportGenerationState ,用于管理该视图内的所有聚合共识处理。

  • 通信模块
    • oracleState 管理通信以及驱动ocr流程
  • 业务模块
    • pacemakerState 执行视图切换相关流程
    • reportGeneration 执行相应视图内的所有聚合相关的操作
    • transimisstion 交易提交排期相关处理

通信管理

网络消息模型

节点与节点之间传递的消息结构是包裹了 Message 的 MessageWithSender 。

type MessageWithSender struct {
    Msg    Message // 这是一个接口类型
    Sender commontypes.OracleID
}
type Message interface {
    process(o *oracleState, sender commontypes.OracleID) // 都需要实现 process 这个方法
}

实际上 chNet 这个 channal 里会接收到各种各样的实现了 Message 接口的消息类型。在 Message 接口的基础上,还扩展出了两种子类型接口

  • MessageToReportGeneration 聚合共识相关流程的触发
    • MessageObserveReq: Leader发出这个消息给所有节点,以收集他们的观察结果
    • MessageObserve: 节点将观察结果发送给leader,作为对收到MessageObserveReq后的回复
    • MessageReportReq: Leader将收集到的观察结果发给节点们签名
    • MessageReport: 节点们将带有观察结果和签名的消息发送给Leader
    • MessageFinal: Leader将最终的聚合签名发送给所有节点
    • MessageFinalEcho: 所有节点都会将这个消息发送给其他节点
  • MessageToPacemaker 视图切换相关流程的触发
    • MessageNewEpoch: 发送 newepoch(epoch_number) 消息,表明节点认为需要移动到这个epoch了

通信模型

每个模块负责不同的功能,他们之间通过相应的 channal 进行通信。

  • chNetToPacemaker: oracleState 通知 pacemakerState
  • chNetToReportGeneration: oracleState 通知 reportGeneration ,主要是共识流程相关的一些信息
  • chPacemakerToOracle: pacemakerState 通知 oracleState ,主要是通知切换 epoch
  • chReportGenerationToTransmission: oracleState 通知 reportGeneration
  • chNet: oracleState 接收来自其他节点的消息

模块和模块之前传递的具体消息如下图中所示

代码结构

oracleState 当中不断轮询 chNet ,并将其中收到的信息处理/转发到相应的处理模块中去。

func (o *oracleState) run() {
     
    ......
 
    for {
        select { // 监听和管理P2P消息处理流程
        case msg := <-chNet:
            if 0 <= int(msg.Sender) && int(msg.Sender) < o.Config.N() {
                msg.Msg.process(o, msg.Sender) // 这里本质上是调用 message 自身的 process 方法,将消息转发到相应的信道当中去
            } else {
                o.logger.Critical("msg.Sender out of bounds. This should *never* happen.", commontypes.LogFields{
                    "sender": msg.Sender,
                    "n":      o.Config.N(),
                })
            }
        case epoch := <-chPacemakerToOracle:
            o.epochChanged(epoch)
        case <-chDone:
        }
        ......
    }
}

oracleState 会从两个类 channal 中接收信息

  • chNet 来自其他节点的信息
  • chPacemakerToOracle 来自 pacemakerState 模块的信息 其中,chNet 这个 channal 接收的类型是 MessageWithSender ,其中主要包裹了 Message 和 Sender 两个信息。而对于其中 Message 的处理,其实就是调用 Message 本身的 process 方法。

其中 MessageToPacemaker 类型的消息的 process 方法,其实就是将 msg 转发到 chNetToPacemaker 这个 channal 当中,由 pacemakerState 自行接收并处理

func (msg MessageNewEpoch) process(o *oracleState, sender commontypes.OracleID) {
    o.chNetToPacemaker <- MessageToPacemakerWithSender{msg, sender}
}

而 MessageToReportGeneration 则是转入 reportGenerationMessage 方法处理

func (msg ...) process(o *oracleState, sender commontypes.OracleID) {
   o.reportGenerationMessage(msg, sender)
}
  
func (o *oracleState) reportGenerationMessage(msg MessageToReportGeneration, sender commontypes.OracleID) {
   msgEpoch := msg.epoch()
   if msgEpoch < o.epoch { // past 消息
      // drop
      o.logger.Debug("oracle: dropping message for past epoch", commontypes.LogFields{
         "epoch":  o.epoch,
         "sender": sender,
         "msg":    msg,
      })
   } else if msgEpoch == o.epoch { // current msg
      o.chNetToReportGeneration <- MessageToReportGenerationWithSender{msg, sender}
   } else { // future msg
      o.bufferedMessages[sender].Push(msg)
      o.logger.Trace("oracle: buffering message for future epoch", commontypes.LogFields{
         "epoch":  o.epoch,
         "sender": sender,
         "msg":    msg,
      })
   }
}

其实就是一个分类处理

聚合共识

三个大的阶段

  • Report 聚合阶段。Leader 向 Followers 收集至少 2f+1 个 (observe, signature)
  • Attest Report 压缩签名阶段。Leader 向 Followers 收集 f+1 个对 report 的签名,这个阶段将链上需要验证的签名从 2f+1 压缩到了 f+1 。
  • 提交前确认阶段。Leader 向 Followers 广播 Attest Report,Followers 广播转发 Attest Report 。 当 Followers 确认网络中至少 f+1 个节点收到 Attest Report ,就可以将 Attest Report 交给 Transmission 模块去提交了。

整体有六轮通信

  • leader 通过广播包含 epoch 和 round 的 MessageObserveReq 向 followers 请求 observation
  • followers 收到 MessageObserveReq 后
    • 如果其中的 round 超过上限了,则发出 EventChangeLeader 通知 pacemaker 进行 leader 切换,并关闭这个 reportGeneration 实例。
    • 执行自己的的 observation pipeline 去获取 observation
    • 构造 MessageObserve 向 leader 告知自己的 observation
  • leader 收集到超过 quorum 数量的 observation 后,再等待 Tgrace 秒,然后广播 MessageReportReq 告知 followers 这个 report 。(等待 Tgrace 秒,是为了让一些消息发送稍慢的节点的 observation 也能够包含到 report 当中去)
  • followers 通过发送 MessageReport 给 leader 确认自己已经收到 report 了
  • leader 收到超过 quorum 数量的确认后,将 report 提交到 transmission protocal 中去等待处理,同时广播 MessageFinal 给 followers 告知这次流程可以结束了
  • followers 收到告知后,会向其他 followers 广播 MessageFinalEcho 来告知其他节点自己被通知流程可以结束了
  • 当 followers 接收到超过 f 个流程结束的确认后,会将 report 提交到 transmission protocal 中去等待处理

聚合轮次开始

在 reportGenerationState 刚被初始化时,会立马执行一次 startRound() 方法。而后 Leader 每隔 Tround 时间会执行一次 startRound() 方法。 startRound流程如下

  1. 检查 round 是否超过限制,是否溢出
  2. 准备接收 observes 和 reports 的列表
  3. 更新阶段到 phaseObserve
  4. 广播 MessageObserveReq
  5. 重置 tRound ,leader 何时触发 new round 的 heartbeat

Followers 处理 messageObserveReq

当 followers接收到这个消息后的处理步骤和细节

  1. 消息检查 (只接受来自 leader 的,处于当前 epoch 且大于我记录的最新 round 的消息)
    • 消息是不是属于当前 epoch 的
    • 消息发送者是不是 leader
    • 消息的 round 是不是大于我记录的最新 round
    • 消息的 round 是否超过上限 RMax + 1 (+1 是为了能够在后面触发 epoch change)
  2. 更新自身 round ,表明自己进入这个 round 的流程中了
  3. 如果 round > Rmax (起始就是指 round == Rmax + 1) 时,发起 epoch change ,切换 epoch 和 leader
  4. 初始化 followerState
  5. 执行 pipeline 去获取 observation
  6. 对 observation 签名并检查签名
  7. 发送 MessageObservation 给 leader
func (repgen *reportGenerationState) messageObserveReq(msg MessageObserveReq, sender commontypes.OracleID) {
     
    // 检查 message validation
    dropPrefix := "messageObserveReq: dropping MessageObserveReq from "
 
    if msg.Epoch != repgen.e {
        repgen.logger.Debug(dropPrefix+"wrong epoch",
            commontypes.LogFields{"round": repgen.followerState.r, "msgEpoch": msg.Epoch},
        )
        return
    }
    if sender != repgen.l {
        repgen.logger.Warn(dropPrefix+"non-leader",
            commontypes.LogFields{"round": repgen.followerState.r, "sender": sender})
        return
    }
    if msg.Round <= repgen.followerState.r {
        repgen.logger.Debug(dropPrefix+"earlier round",
            commontypes.LogFields{"round": repgen.followerState.r, "msgRound": msg.Round})
        return
    }
    if int64(repgen.config.RMax)+1 < int64(msg.Round) {
        repgen.logger.Warn(dropPrefix+"out of bounds round",
            commontypes.LogFields{"round": repgen.followerState.r, "rMax": repgen.config.RMax, "msgRound": msg.Round})
        return
    }
 
    // 更新 followerState 中记录的当前 round
    repgen.followerState.r = msg.Round
 
    // 判断时候发起 epoch change
    if repgen.followerState.r > repgen.config.RMax {
        repgen.logger.Debug(
            "messageReportReq: leader sent MessageObserveReq past its expiration "+
                "round. Time to change leader",
            commontypes.LogFields{
                "round":        repgen.followerState.r,
                "messageRound": msg.Round,
                "roundMax":     repgen.config.RMax,
            })
        select {
        case repgen.chReportGenerationToPacemaker <- EventChangeLeader{}:
        case <-repgen.ctx.Done():
        }
 
        return
    }
     
    // 初始化 followerState
    repgen.followerState.sentEcho = nil
    repgen.followerState.sentReport = false
    repgen.followerState.completedRound = false
    repgen.followerState.receivedEcho = make([]bool, repgen.config.N())
 
    repgen.telemetrySender.RoundStarted(
        repgen.config.ConfigDigest,
        repgen.e,
        repgen.followerState.r,
        repgen.l,
    )
 
    // 获取 observation
    value := repgen.observeValue()
 
    ......
 
    // 签名
    so, err := MakeSignedObservation(value, repgen.followerReportContext(), repgen.privateKeys.SignOffChain)
 
    ......
 
    // 发送 MessageObservation 给 leader
    repgen.netSender.SendTo(MessageObserve{
        repgen.e,
        repgen.followerState.r,
        so,
    }, repgen.l)
}

Leader 处理 MessageObservation

  1. 检查 leader,epoch 和 round
  2. 检查 leaderState 当前所处的状态阶段,如果已经过了 phaseGrace 阶段,则不再接受 MessageObserve (因为已经打包好 report 了)
  3. 检查 leaderState.observe[sender] ,判断该 sender 是否发过 observation 了
  4. 检查签名
  5. 将 SignedObservation 加入缓存 leaderState.observe
  6. 状态切换
    • 如果处于 phaseObserve 状态,且收到超过 2F 的 observe,则进入到 phaseGrace 状态,并开始 TGrace 计时 (计时结束,将生成 report 且不再接收 MessageObserve)
    • pharseGrace 状态阶段依旧可以接收 observe

进入 eventTGraceTimeout 中构建report并发出 MessageRportReq

上面当 Leader 接收到 2F 的合法 MessageObserve 后,会进入 TGrace 计时,当计时结束后,就需要去构建 report 发出 MessageReportReq ,并进入到 phaseReport 阶段了。

TGrace 超时信号是在 reportGenerationState.run() 中接收到的

func (repgen *reportGenerationState) run() {
    for {
        select {
        ......
        case <-repgen.leaderState.tGrace: // tGrace 计时结束,要去发 MessageReport 了
            repgen.eventTGraceTimeout()
        }
    }
}
  1. 在 enentTGraceTimeout 中的流程如下
  2. 检查当前状态是否是 phaseGrace
  3. 遍历所有的 follower ,筛选出合法的 observation observation 排序,从小到大
  4. 构建并广播 MessageReport
func (repgen *reportGenerationState) eventTGraceTimeout() {
    // 验证当前状态
    if repgen.leaderState.phase != phaseGrace {
        repgen.logger.Error("leader's phase conflicts tGrace timeout", commontypes.LogFields{
            "round": repgen.leaderState.r,
            "phase": englishPhase[repgen.leaderState.phase],
        })
        return
    }
    // 构建 signed observation 集
    asos := []AttributedSignedObservation{}
    for oid, so := range repgen.leaderState.observe {
        if so != nil {
            asos = append(asos, AttributedSignedObservation{
                *so,
                commontypes.OracleID(oid),
            })
        }
    }
    // observation从小到大排序
    sort.Slice(asos, func(i, j int) bool {
        return asos[i].SignedObservation.Observation.Less(asos[j].SignedObservation.Observation)
    })
    // 广播 MessageReportReq
    repgen.netSender.Broadcast(MessageReportReq{
        repgen.e,
        repgen.leaderState.r,
        asos,
    })
    // 当前 round 的 state 移动到 pharseReport
    repgen.leaderState.phase = phaseReport
}

Followers 处理 MessageReportReq

  1. 检查 leader,epoch 和 round
  2. 如果 follower 标记为已经发送了 report 或者当前 round 已完结,则不会接受这个消息
  3. 验证 report
    • 检查 observation 是否排序
    • 检查每个 observation 的签名,并检查是否有重复 observation
    • 检查 signed observation 数量是否大于 2F
  4. 是否应该 report
    • 第一次 report
    • report 间隔时间满足
    • observation 中位数达到偏差值要求
    • 如果eth节点访问失败,默认应该 report
    • 链上发起了快速聚合请求
  5. 根据 shouldReport 结果
    • 如果 false,则标记该 round 结束
    • 如果 true ,则
      • 将 signed observations 压缩成 observations ,并对整个 observations 签名 -验证自己的签名后,发送 MessageRequest 给 leader

shouldReport 当中,流程正常的情况下,可以总结为三种返回 true

  • 合约的第一次report,即 (epoch, round) = (0, 0)
  • 到了设定的提交间隔时间,一般为 5min ~ 24h
  • 这次 report 的结果和合约上的最新结果相比,差值比例达到了设定的提交阙值,比如 0.2% ~ 5%

Leader 处理 MessageRequest

  1. 检查 leader,epoch 和 round
  2. 检查 leader 是否处于 phaseReport 状态
  3. 重复检查
  4. 签名验证
  5. 缓存 report ,包含 observations 和一个 signature
  6. 如果回签的 report 数量大于 F ,则会广播 MessageFinal 并进入 phaseFinal 阶段

Follower 处理 MessageFinal

  1. 检查 leader,epoch 和 round
  2. 检查自己是否发出过 MessageFinalEcho
  3. 如果没有发出过
    • 检查签名集数量和签名合法性
    • 将 MessageFinal 中的内容重新包装成 MessageFinalEcho 并广播

所有人处理 MessageFinalEcho

  1. 检查 epoch 和 round
  2. 重复性检查
  3. 检查签名集数量和签名合法性
  4. 缓存消息
  5. 如果自己没有发过 MessageFinalEcho ,则直接把收到的 MessageFinalEcho 转发广播出去
  6. 如果收到的合法 MessageFinalEcho 超过了 F 个,则
    • 将 report 转交给 transmission 处理
    • 标记该 round 结束

交易提交算法

当 report 通过了 reportGeneration 流程过后,就会进入到 transmission 流程中。我们把这个 report 叫做 O 。 transmission 中还维护了一个 times 最小堆,里面存放的是 pendingRequest 的,排序按照这个 pendingRequest 的 time 来决定。这个 time 是这个 pendingRequest 什么时候会执行。也就是说,最先需要执行的 pendingRequest 会在最上面。 同时,每当有 pendingRequest 进入到 times 堆里,transmission 都会将其记录为最新的 pendingRequest ,这里把他叫做 L 除此之外,我们将聚合合约上提交成功的 report 叫做 C 。 当 O 进入到 transmission 后,首先需要通过以下验证

  • L 为空,或者
  • L.epoch <= C.epoch and L.round <= C.round,或者
  • (O.medium - L.medium) / L.medium > α (α 为偏差阙值,medium为report中的观察结果的中位数)

通过上面筛选后,将会计算其应该什么时候执行,我们叫做 time 。这个是用来控制谁能成为提交这个 report 的 transmitter 的。在我们 n 个节点都将 report 转发到 transmission 中后,每个人都去提交这个 report ,那么必然只会有一个节点能提交成功,其他节点都白白浪费 gas 。但是如果只选出一个节点来提交 report ,那么又容易发生单点故障,导致 report 丢失。所以 chainlink 对此设计了一个机制。 chainlink 将提交 report 分为了批次提交,也就是每隔 ∆stage 时间会有一批节点去尝试提交这个 report ,如果 report 在这个批次中提交成功了,那么后续批次就不会尝试提交了。也就是说,哪怕最坏情况下有 f 个恶意节点都在提交顺序的最前面,我们只需要当提交的总节点达到了 f+1 就能保证一定有一个正确节点提交了 report 。至于节点的提交顺序,为了公平,我们会根据一个 (sharedkey,epoch,round) 去伪随机打乱排序出一个序列,然后节点按照这个排序去在自己所属的批次尝试提交即可。也就是说,stage 1 的节点提交该 report 的延时为 0 ,而 stage n 的节点提交的 report 的延时为 ((n - 1) * ∆stage) 。 这个规则是一种弱约束,也就是说有节点完全可以不遵守这个规则,去直接提交。chainlink 没有在代码层面对这种行为进行限制,而是通过经济惩罚的层面去限制。合约的拥有者需要自己去监控提交 report 的节点有没有遵循提交的批次规则,如果违背了规则 (提交得太早或太晚) 可以将其踢出局。 所以,按照排序应该在 stage k 提交结果的节点,会将 pendingRequest 构建为 Time((now() + (k-1)*∆stage) , O) 。然后将其放入按 Time 排序的最小堆中。并按照最小堆堆顶的 pendingRequest 的 Time 重设一个 tTransmit(Time) 超时触发器。 当 tTransmit(Time) 计时到了后,会触发对堆顶的 pendingRequest 的提交流程,将其从堆中取出来,并从数据库中删除,叫做 P ,并且判断

  • C.epoch <= P.epoch and C.round <= P.round (代表自己这个report还没过期),并且
  • configDigest 是否还和合约中的一致

满足上面条件后,就会将 report 提交到合约了。

视图切换

整个视图切换的流程是由

  • 计时器 Tprocess 和
  • 事件 EventChangeView 驱动的。

其中,计时器触发事件切换,属于异常切换,设计如下

  • Followers 开启计时
  • 如果计时结束没有收到 MessageFinal ,则广播 MessageNewEpoch 申请视图切换
  • 如果这轮聚合共识完成,重置 Tprocess 计时

(主要是防止节点出错导致流程卡住,所以要求每轮聚合共识必须在 Tprocess 事件内完成,不然就视图切换)

而事件驱动视图切换属于视图正常切换,当一个 Leader 主持的聚合轮次数量达到上限后,需要正常切换到下一个 Leader 来主持

  • 每个 Leader 最后主持 RMAX 回聚合共识
  • 当 Follower 检测到 Leader 发送的 MessageObserveReq 中的 Round > RMAX ,则广播 MessageNewEpoch 去申请视图切换

(这个设计主要是让每一个节点都有机会成为 Leader ,同时也是为了防止 Leader 不按照 Tround 轮次间隔规则,快速发起大量共识轮次) 对于 MessageNewEpoch 的具体处理细节见代码

func (pace *pacemakerState) messageNewepoch(msg MessageNewEpoch, sender commontypes.OracleID) {
    // 检查sender的id范围
    if int(sender) < 0 || int(sender) >= len(pace.newepoch) {
        pace.logger.Error("Pacemaker: dropping NewEpoch message from invalid sender", commontypes.LogFields{
            "sender": sender,
            "N":      len(pace.newepoch),
        })
        return
    }
    // 该 sender 发过来的最高 epoch < 这个消息的 epoch
    if pace.newepoch[sender] < msg.Epoch {
        pace.newepoch[sender] = msg.Epoch // 更新
        pace.persist() // 持久化
    } else {
        // 说明是 past epoch 不用处理了
        return
    }
 
    // upon |{p_j ∈ P | newepoch[j] > ne}| > f do
    {   // 这里的功能是,计算我这个节点是否需要向p2p网络中广播newepoch消息
        // pace.ne记录的是我发出的newepoch消息中最高的epoch
        // 如果我收到的周围节点发给我的最高epoch比我高,且人数达到了 F 。
        // 我就需要发出newepoch消息,并更新我的pace.ne
        // 至于我发出的newepoch的epoch number是多少,则选取我收到的满足 F 人的前提下最高的epoch number
        candidateEpochs := sortedGreaterThan(pace.newepoch, pace.ne)
        if len(candidateEpochs) > pace.config.F {
            // ē ← max {e' | {p_j ∈ P | newepoch[j] ≥ e' } > f}
            newEpoch := candidateEpochs[len(candidateEpochs)-(pace.config.F+1)]
            pace.sendNewepoch(newEpoch) // 更新并持久化pace.ne,然后广播newepoch消息
        }
    }
 
    // upon |{p_j ∈ P | newepoch[j] > e}| > 2f do
    {
        // pace.e是当前所处的epoch(即最高的完成聚合共识的epoch)
        candidateEpochs := sortedGreaterThan(pace.newepoch, pace.e)
        if len(candidateEpochs) > 2*pace.config.F {
            // ē ← max {e' | {p_j ∈ P | newepoch[j] ≥ e' } > 2f}
            //
            // since candidateEpochs contains, in increasing order, the epochs from
            // the received newepoch messages, this value of newEpoch was sent by at
            // least 2F+1 processes
            // 2f+1的人都同意的epoch,这个epoch算达成共识了
            newEpoch := candidateEpochs[len(candidateEpochs)-(2*pace.config.F+1)]
            pace.logger.Debug("Moving to epoch, based on candidateEpochs", commontypes.LogFields{
                "newEpoch":        newEpoch,
                "candidateEpochs": candidateEpochs,
            })
            // 计算 leader 的 id 。
            // 将 (epoch number, 共享密钥) 组合后进行hash后求和再根据总人数N取余
            l := Leader(newEpoch, pace.config.N(), pace.config.LeaderSelectionKey())
            // 更新pace.e(达成共识的最高epoch)和pace.l(最高共识epoch的leader)
            pace.e, pace.l = newEpoch, l // (e, l) ← (ē, leader(ē))
            if pace.ne < pace.e {        // ne ← max{ne, e}
                pace.ne = pace.e
            }
            pace.persist() // 持久化
 
            // abort instance [...], initialize instance (e,l) of report generation
            pace.spawnReportGeneration() // 生成 reportGeneration
 
            // 通知 NewEpoch 产生了
            pace.notifyOracleOfNewEpoch = true
 
            // 重置超时计时
            pace.tProgress = time.After(pace.config.DeltaProgress) // restart timer T_{progress}
        }
    }
}

当 pacemakerState 模块中完成了对于视图切换的确认后,需要通知到其他模块,

  • 其中 Transmission 模块的运行和视图无关,所以不需要通知
  • 会向 chPacemakerToOracle 这个 channal 发送新的视图号来通知 OracleState 。主要做的事情就是将缓存中的 future msg 取出来重新进行 past,current,future 判断并相应处理。
func (o *oracleState) run() {
    ......
    for {
        select {
        ......
        case epoch := <-chPacemakerToOracle: // epoch更新后,会进入到这下面
            o.epochChanged(epoch)
        ......
        }
    }
    ......
}
  
func (o *oracleState) epochChanged(e uint32) {
   // 清理缓存中的消息
   o.epoch = e
   o.logger.Trace("epochChanged: getting messages for new epoch", commontypes.LogFields{
      "epoch": e,
   })
   // id其实就是sender的id,buffer就是这个sender发过来的所有MessageToReportGeneration
   for id, buffer := range o.bufferedMessages {
      for {
         msg := buffer.Peek() // 取出 MessageToReportGeneration
         if msg == nil {
            // no messages left in buffer
            break
         }
         msgEpoch := (*msg).epoch() // 获取这个 msg 的 epoch
         if msgEpoch < e {
            // 我们当前变更到的epoch要低,就是past msg
            // 从 buffer 移出就行了,不用管
            buffer.Pop()
            o.logger.Debug("epochChanged: unbuffered and dropped message", commontypes.LogFields{
               "remoteOracleID": id,
               "epoch":          e,
               "message":        *msg,
            })
         } else if msgEpoch == e {
            // 如果这是我这个epoch的msg,说明是current msg需要处理
            // remove from buffer
            buffer.Pop()
 
            o.logger.Trace("epochChanged: unbuffered messages for new epoch", commontypes.LogFields{
               "remoteOracleID": id,
               "epoch":          e,
               "message":        *msg,
            })
            // 将消息转发到 chNetToReportGeneration channal 中
            o.chNetToReportGeneration <- MessageToReportGenerationWithSender{
               *msg,
               commontypes.OracleID(id),
            }
         } else {
            // 说明依旧还是future msg,保持原样
            // this and all subsequent messages are for future epochs
            // leave them in the buffer
            break
         }
      }
   }
   o.logger.Trace("epochChanged: done getting messages for new epoch", commontypes.LogFields{
      "epoch": e,
   })
}
  • 每一轮视图都会重新重新初始化一个 ReportGenerationState 模块,用于该视图中,每轮聚合的处理
// 收到的 MessageNewEpoch 满足了 2f+1 要求,视图切换确认
if len(candidateEpochs) > 2*pace.config.F {
......
    pace.spawnReportGeneration() // 重新初始化一个新的 reportGenerationState 模块,用于该新视图中的聚合轮次处理
    ......
}

选举规则

每个 Epoch 会有相应的 Leader ,选举的规则会根据一个公认的 leaderSelectionkey ,当前的 epoch 以及 oracles的数量去计算 Sum(Hash(leaderSelectionKey, Epoch)) % numer of oracles

func Leader(epoch uint32, n int, key [16]byte) (leader commontypes.OracleID) {
   // No need for HMAC. Since we use Keccak256, prepending
   // with key gives us a PRF already.
   h := sha3.NewLegacyKeccak256()
   h.Write(key[:])
   b := make([]byte, 8)
   binary.LittleEndian.PutUint64(b, uint64(epoch))
   h.Write(b)
 
   result := big.NewInt(0)
   r := big.NewInt(0).SetBytes(h.Sum(nil))
   // This is biased, but we don't care because the prob of us hitting the bias are
   // less than 2**5/2**256 = 2**-251.
   result.Mod(r, big.NewInt(int64(n)))
   return commontypes.OracleID(result.Int64())
}

视图恢复

除了视图切换会涉及到广播 MessageNewEpoch 外,每个节点都会按 TResend 时间间隔去不同的重复发送一个包含自己目前所处 Epoch 的 MessageNewEpoch(current epoch)。 这个行为的目的是,通知那些因为某些原因导致视图落后的节点能够快速更新到最新 epoch 。


{ % if page.mermaid % } { % endif % }