chainlink源码分析五

DataFeed链上聚合源码解析

Posted by Emo on October 13, 2022

chainlink源码分析

EMO’s Blog

简介

DataFeed 这边的核心功能主要涉及到对多节点提交的多结果进行根据聚合规则得到一个确定性的唯一结果。其他的主要就是一些经济模型相关的奖励机制了。DataFeed 主要分为两种,一种是链上聚合,一种是链下聚合。其中链上聚合,需要每个节点都直接将自己获取到的结果上传到聚合合约当中,缺点是会产生大量的 gas 消耗,优点是开发简单,节点间也不需要通信。链下聚合则是节点间先进行一次通信,将彼此获取到的结果聚合成一份 report ,然后由一个节点一次性提交到聚合合约当中,这种模式的优点是可以节省大量的 gas 消耗,而缺点则是开发成本变高。

角色

在聚合合约里面分为三种角色

  • oracle 为这个合约提交数据的节点的账户
  • owner 这个合约的所有人
  • requester 这个角色比较特殊,他在合约里面的功能是可以发起一个新的聚合轮次
  • validator 验证者合约,这是一个可选项。如果存在的话,则每一次聚合产生的新结果都需要通过 validator 验证才能生效,这个需要遵循 AggregatorValidatorInterface 接口来实现

聚合合约通用接口标准

这里的接口标准定义的方法主要是一些供外部访问的 view 方法。无论是链上聚合方式还是链下聚合方式,都需要提供这些接口定义的访问服务。

  • AggregatorInterface
      interface AggregatorInterface {
          // 获取最新的答案
          function latestAnswer() external view returns (int256);
    
          // 获取最新的时间戳
          function latestTimestamp() external view returns (uint256);
    
          // 获取最新的回合轮次
          function latestRound() external view returns (uint256);
    
          // 获取指定轮次的答案
          function getAnswer(uint256 roundId) external view returns (int256);
    
          // 获取指定轮次的时间戳
          function getTimestamp(uint256 roundId) external view returns (uint256);
    
          event AnswerUpdated(int256 indexed current, uint256 indexed roundId, uint256 updatedAt);
    
          event NewRound(uint256 indexed roundId, address indexed startedBy, uint256 startedAt);
      }
    
  • AggregatorV3Interface
      interface AggregatorV3Interface {
          // 获取 decimals
          function decimals() external view returns (uint8);
    
          // 获取该聚合合约的 description
          function description() external view returns (string memory);
    
          // 获取版本
          function version() external view returns (uint256);
    
          // 获取指定轮次的所有信息
          function getRoundData(uint80 _roundId)
          external
          view
          returns (
              uint80 roundId,
              int256 answer,
              uint256 startedAt,
              uint256 updatedAt,
              uint80 answeredInRound
          );
    
          // 获取最新轮次的所有信息
          function latestRoundData()
          external
          view
          returns (
              uint80 roundId,
              int256 answer,
              uint256 startedAt,
              uint256 updatedAt,
              uint80 answeredInRound
          );
      }
    
  • AggregatorV2V3Interface 上面两个的合并

  • AggregatorValidatorInterface 这是用于 validator 验证者的接口标准
      interface AggregatorValidatorInterface {
          function validate(
              uint256 previousRoundId,
              int256 previousAnswer,
              uint256 currentRoundId,
              int256 currentAnswer
          ) external returns (bool);
      }
    

    聚合合约代码分析

    这里会针对链上聚合模式的合约进行分析,这里分析的是官方的 chainlink/contracts/src/v0.6/FluxAggregator.sol 合约。

    结构体

    // 每轮聚合后的信息
    struct Round {
      int256 answer; // 这次聚合产生的结果
      uint64 startedAt; // 聚合开始的时间
      uint64 updatedAt; // 聚合完成的时间
      uint32 answeredInRound; // 轮次,当聚合完成时的_roundId
    }
      
    // 每轮聚合流程中的详细信息
    struct RoundDetails {
      int256[] submissions; // 存所有的提交的结果
      uint32 maxSubmissions; // 最多接收几个提交,满了后,就不会接受提交了
      uint32 minSubmissions; // 进行聚合所需要的最少提交数
      uint32 timeout; // 聚合轮次超时的时间限制
      uint128 paymentAmount; // 该轮次会付给提交了结果的 oracle 的金额
    }
    
    // Oracle 的状态
    struct OracleStatus {
      uint128 withdrawable; // oracle 账户中有多少钱可以取出
      uint32 startingRound; // oracle 从哪一轮开始提交
      uint32 endingRound; // oracle 从哪一轮结束提交
      uint32 lastReportedRound; // 最近一次提交结果的轮次号
      uint32 lastStartedRound; // 最近一次由该oracle进行轮次初始化的轮次号
      int256 latestSubmission; // 最近一次的提交结果
      uint16 index;
      address admin; // oracle 的admin地址,能发起取钱的地址
      address pendingAdmin; // 这个是在更替 admin 的时候会涉及到的,具体可以在后面 <oracle更改自己的 admin address> 这个板块看
    }
      
    // 请求者的结构,请求者可以发起新轮次
    struct Requester {
      bool authorized; // 是否授权
      uint32 delay; // 发起新轮次的延时
      uint32 lastStartedRound; // 最近发起新轮次的轮次号
    }
      
    // 该合约当中的link代币
    struct Funds {
      uint128 available; // 有多少可用于发放奖励的 link 币
      uint128 allocated; // 有多少已经发放出去的 link 币
    }
    

状态变量

  LinkTokenInterface public linkToken; // Link Token 合约的地址
  AggregatorValidatorInterface public validator; // 验证者合约的地址

  // Round related params
  uint128 public paymentAmount; // 每次给 oracle 的奖励数额
  uint32 public maxSubmissionCount; // 每轮的最大提交数量
  uint32 public minSubmissionCount; // 每轮开始聚合的最少提交数量
  uint32 public restartDelay; // oracle发起轮次初始化的间隔延迟
  uint32 public timeout; // 每轮聚合的 timeout
  uint8 public override decimals; // 对结果要进多少位处理
  string public override description; // 描述

  // 提交的结果必须在下面设定的 min 和 max 之间
  int256 immutable public minSubmissionValue;
  int256 immutable public maxSubmissionValue;

  uint256 constant public override version = 3; // 版本号

  uint256 constant private RESERVE_ROUNDS = 2; // 这个合约里面必须还有至少留足够两个回合奖励的代币数量。防止owner取走太多钱,导致合约停摆
  uint256 constant private MAX_ORACLE_COUNT = 77; // 最多容纳 77 个 oracle
  uint32 constant private ROUND_MAX = 2**32-1; // 最多进行这么多个轮次
  uint256 private constant VALIDATOR_GAS_LIMIT = 100000; // 验证者合约最多消耗这么多gas
  // An error specific to the Aggregator V3 Interface, to prevent possible
  // confusion around accidentally reading unset values as reported values.
  string constant private V3_NO_DATA_ERROR = "No data present";

  uint32 private reportingRoundId; // 正在进行的轮次
  uint32 internal latestRoundId; // 最新的达成聚合共识的轮次
  mapping(address => OracleStatus) private oracles; // 所有oracles的账户信息
  mapping(uint32 => Round) internal rounds; // 所有轮次的信息
  mapping(uint32 => RoundDetails) internal details; // 所有轮次的详细信息
  mapping(address => Requester) internal requesters; // 所有的requester的账户信息
  address[] private oracleAddresses; // 所有oracle的oracle地址信息,不是admin地址
  Funds private recordedFunds; // 记录这个合约的资金池

构造函数

  constructor(
    address _link, // linkToken合约的地址
    uint128 _paymentAmount, // 每轮聚合需要支付给每个oracle多少钱
    uint32 _timeout, // 每一轮的超时时间
    address _validator, // 验证者的地址(可选)
    int256 _minSubmissionValue, // 提交答案值的最小值
    int256 _maxSubmissionValue, // 提交答案值得最大值
    uint8 _decimals, // 答案偏移的位数
    string memory _description // 该合约的描述
  ) public {
    linkToken = LinkTokenInterface(_link);
    updateFutureRounds(_paymentAmount, 0, 0, 0, _timeout);
    setValidator(_validator);
    minSubmissionValue = _minSubmissionValue;
    maxSubmissionValue = _maxSubmissionValue;
    decimals = _decimals;
    description = _description;
    rounds[0].updatedAt = uint64(block.timestamp.sub(uint256(_timeout)));
  }

功能函数

oracle相关操作

oracle提交结果(最核心的函数)

  /**
   * @notice called by oracles when they have witnessed a need to update
   * @param _roundId 当前共识/聚合的轮次
   * @param _submission 提交的答案
   */
  function submit(uint256 _roundId, int256 _submission)
    external
  {
    // 验证该oracle汇报的轮次是不是合法的
    // _roundId必须在oracle设定的 startingRound 和 endingRound 之间
    bytes memory error = validateOracleRound(msg.sender, uint32(_roundId));
    
    // 验证提交的答案是否满足范围要求
    require(_submission >= minSubmissionValue, "value below minSubmissionValue");
    require(_submission <= maxSubmissionValue, "value above maxSubmissionValue");
    require(error.length == 0, string(error));

    oracleInitializeNewRound(uint32(_roundId));
    recordSubmission(_submission, uint32(_roundId));
    (bool updated, int256 newAnswer) = updateRoundAnswer(uint32(_roundId));
    payOracle(uint32(_roundId));
    deleteRoundDetails(uint32(_roundId));
    if (updated) {
      validateAnswer(uint32(_roundId), newAnswer);
    }
  }

上面函数大量的调用了各种函数,我们按照这个函数制定的流程,来逐步分析其中调用的函数。

  1. 传进提交的答案和该次提交对用的聚合轮次
  2. 验证该oracle是否具备在该轮次提交结果的资格
     function validateOracleRound(address _oracle, uint32 _roundId)
     private
     view
     returns (bytes memory)
     {
         // 每个oracle都有startingRound和endingRound,这个是其的生存周期,其提交只在start->end之间的轮次中有效
         // cache storage reads
         uint32 startingRound = oracles[_oracle].startingRound;
         uint32 rrId = reportingRoundId;
    
         if (startingRound == 0) return "not enabled oracle";
         if (startingRound > _roundId) return "not yet enabled oracle";
         if (oracles[_oracle].endingRound < _roundId) return "no longer allowed oracle";
            
         // 看这个oracle在该轮次是否已经提交过了
         if (oracles[_oracle].lastReportedRound >= _roundId) return "cannot report on previous rounds";
            
         // 只能对 [reportingRoundId-1, reportingRoundId+1] 区间内的轮次投票 
         if (_roundId != rrId && _roundId != rrId.add(1) && !previousAndCurrentUnanswered(_roundId, rrId)) return "invalid round to report";
         // 上一轮必须已经完成聚合,或者上一轮聚合超时了。这一轮聚合才能开始
         if (_roundId != 1 && !supersedable(_roundId.sub(1))) return "previous round not supersedable";
     }
    
    • oracle是否在该轮次具备提交权
    • oracle是否在该轮次提交过
    • oracle的提交和最新进行中轮次reportingRoundId相比,是否在 [reportingRoundId-1, reportingRoundId+1] 区间内
    • 该轮次的上一轮次是否完成聚合或者是否超时了,调用 supersedable
  3. 验证提交结果是否符合设定的范围
     require(_submission >= minSubmissionValue, "value below minSubmissionValue");
     require(_submission <= maxSubmissionValue, "value above maxSubmissionValue");
    
  4. 初始化新轮次(每轮只初始化一次)
     function oracleInitializeNewRound(uint32 _roundId)
     private
     {
         // 是否是新回合
         if (!newRound(_roundId)) return;
         uint256 lastStarted = oracles[msg.sender].lastStartedRound; // cache storage reads
    
         // restartDelay 是一个oracle进行轮次初始化需要等待的轮次数量
         // 所以如果 (当前轮次 < 上一次进行初始化的轮次 + 延迟) 时无法发起初始化轮次
         if (_roundId <= lastStarted + restartDelay && lastStarted != 0) return;
    
         initializeNewRound(_roundId);
    
         oracles[msg.sender].lastStartedRound = _roundId; // 这一轮的轮次是我初始化的
     }
    
     function newRound(uint32 _roundId)
     private
     view
     returns (bool)
     {
         // 轮次是不是刚好是记录的最新轮次的下一轮
         return _roundId == reportingRoundId.add(1);
     }
    
     function initializeNewRound(uint32 _roundId)
     private
     {   // 记录轮次聚合超时信息,如果超时的话,里面会进行判断
         updateTimedOutRoundInfo(_roundId.sub(1));
    
         // 新轮次开始初始化
         reportingRoundId = _roundId; // 更新最新轮次
         RoundDetails memory nextDetails = RoundDetails(
             new int256[](0),
             maxSubmissionCount,
             minSubmissionCount,
             timeout,
             paymentAmount
         );
         details[_roundId] = nextDetails; // 更新该轮次相关detail
         rounds[_roundId].startedAt = uint64(block.timestamp); // 记录该轮次的开始时间戳
         emit NewRound(_roundId, msg.sender, rounds[_roundId].startedAt); // 抛出event
     }
    
  5. 记录提交结果,并记录oracle此次提交
     function recordSubmission(int256 _submission, uint32 _roundId)
     private
     {
         // 检查提交是不是已经到达上限
         require(acceptingSubmissions(_roundId), "round not accepting submissions");
    
         // 记录调提交教结果
         details[_roundId].submissions.push(_submission);
         // 记录oracle的最新提交内容
         oracles[msg.sender].lastReportedRound = _roundId;
         oracles[msg.sender].latestSubmission = _submission;
    
         emit SubmissionReceived(_submission, _roundId, msg.sender);
     }
    
  6. 判断是否可以更新答案,如果可以就更新
     function updateRoundAnswer(uint32 _roundId)
     internal
     returns (bool, int256)
     {
         // 是否满足最小提交要求
         if (details[_roundId].submissions.length < details[_roundId].   minSubmissions) {
             return (false, 0);
         }
         // 计算最新 answer ,这里使用的中位数聚合规则
         int256 newAnswer = Median.calculateInplace(details[_roundId].submissions);
         // 更新聚合信息
         rounds[_roundId].answer = newAnswer;
         rounds[_roundId].updatedAt = uint64(block.timestamp);
         rounds[_roundId].answeredInRound = _roundId;
         latestRoundId = _roundId;
    
         emit AnswerUpdated(newAnswer, _roundId, now);
    
         return (true, newAnswer);
     }
    
  7. 发钱给oracle
     function payOracle(uint32 _roundId)
     private
     {
         // 账户发生相应的金额变动即可
         uint128 payment = details[_roundId].paymentAmount; // 需要给的钱
         Funds memory funds = recordedFunds;
         // funds记录总体金额变动的地方,发生相应的加减
         funds.available = funds.available.sub(payment);
         funds.allocated = funds.allocated.add(payment);
         recordedFunds = funds;
            
         // 给 oracle 发钱
         oracles[msg.sender].withdrawable = oracles[msg.sender].withdrawable.add(payment);
    
         emit AvailableFundsUpdated(funds.available);
     }
    
  8. 如果该轮次已经达到最大提交则删除该轮次信息
     function deleteRoundDetails(uint32 _roundId)
     private
     {
         // 如果这个轮次收到的提交已经达到了最大提交数
         if (details[_roundId].submissions.length < details[_roundId].maxSubmissions) return;
         // 删除该轮次的信息
         delete details[_roundId];
     }
    
  9. 验证答案,如果需要的话
    function validateAnswer(
    uint32 _roundId,
    int256 _newAnswer
    )
    private
    {
        // 验证者验证,这是可选的
        AggregatorValidatorInterface av = validator; // cache storage reads
        if (address(av) == address(0)) return;
    
        // 把答案交给外部的验证者合约去验证
        uint32 prevRound = _roundId.sub(1);
        uint32 prevAnswerRoundId = rounds[prevRound].answeredInRound;
        int256 prevRoundAnswer = rounds[prevRound].answer;
        // We do not want the validator to ever prevent reporting, so we limit its
        // gas usage and catch any errors that may arise.
        try av.validate{gas: VALIDATOR_GAS_LIMIT}(
            prevAnswerRoundId,
            _roundId,
            prevRoundAnswer,
            _newAnswer
        ) {} catch {}
    }
    

更改oracles

  function changeOracles(
    address[] calldata _removed, // 要移除的oracles
    address[] calldata _added, // 要新增加的oracles
    address[] calldata _addedAdmins, // 要新增的oracle的地址
    uint32 _minSubmissions, // 最小提交数量
    uint32 _maxSubmissions, // 最大提交数量
    uint32 _restartDelay // 可以初始化轮次的延迟
  )
    external
    onlyOwner()
  {
    for (uint256 i = 0; i < _removed.length; i++) {
      removeOracle(_removed[i]);
    }

    require(_added.length == _addedAdmins.length, "need same oracle and admin count");
    require(uint256(oracleCount()).add(_added.length) <= MAX_ORACLE_COUNT, "max oracles allowed");

    for (uint256 i = 0; i < _added.length; i++) {
      addOracle(_added[i], _addedAdmins[i]);
    }

    updateFutureRounds(paymentAmount, _minSubmissions, _maxSubmissions, _restartDelay, timeout);
  }

oracle取钱

  function withdrawPayment(address _oracle, address _recipient, uint256 _amount)
    external
  {
    require(oracles[_oracle].admin == msg.sender, "only callable by admin");

    // Safe to downcast _amount because the total amount of LINK is less than 2^128.
    uint128 amount = uint128(_amount);
    uint128 available = oracles[_oracle].withdrawable;
    require(available >= amount, "insufficient withdrawable funds");

    oracles[_oracle].withdrawable = available.sub(amount);
    recordedFunds.allocated = recordedFunds.allocated.sub(amount);

    assert(linkToken.transfer(_recipient, uint256(amount)));
  }

oracle更改自己的 admin address

  function transferAdmin(address _oracle, address _newAdmin)
    external
  {
    require(oracles[_oracle].admin == msg.sender, "only callable by admin");
    oracles[_oracle].pendingAdmin = _newAdmin;

    emit OracleAdminUpdateRequested(_oracle, msg.sender, _newAdmin);
  }

先将 address 预转交,然后需要接手的 address 自己来接受一下。也就是admin address 的转移,不仅需要我自己同意,还需要接收者同意。

  function acceptAdmin(address _oracle)
    external
  {
    require(oracles[_oracle].pendingAdmin == msg.sender, "only callable by pending admin");
    oracles[_oracle].pendingAdmin = address(0);
    oracles[_oracle].admin = msg.sender;

    emit OracleAdminUpdated(_oracle, msg.sender);
  }

owner操作

给合约充钱

充 link 币到此合约,这些钱是用来给 oracle 发奖励的。

  function onTokenTransfer(address, uint256, bytes calldata _data)
    external
  {
    require(_data.length == 0, "transfer doesn't accept calldata");
    updateAvailableFunds();
  }
  function updateAvailableFunds()
    public
  {
    Funds memory funds = recordedFunds;

    uint256 nowAvailable = linkToken.balanceOf(address(this)).sub(funds.allocated);

    if (funds.available != nowAvailable) {
      recordedFunds.available = uint128(nowAvailable);
      emit AvailableFundsUpdated(nowAvailable);
    }
  }

owner取钱

  function withdrawFunds(address _recipient, uint256 _amount)
    external
    onlyOwner()
  {
    uint256 available = uint256(recordedFunds.available);
    require(available.sub(requiredReserve(paymentAmount)) >= _amount, "insufficient reserve funds");
    require(linkToken.transfer(_recipient, _amount), "token transfer failed");
    updateAvailableFunds();
  }

requester相关操作

设置requester许可

  function setRequesterPermissions(address _requester, bool _authorized, uint32 _delay)
    external
    onlyOwner()
  {
    if (requesters[_requester].authorized == _authorized) return;

    if (_authorized) {
      requesters[_requester].authorized = _authorized;
      requesters[_requester].delay = _delay;
    } else {
      delete requesters[_requester];
    }

    emit RequesterPermissionsSet(_requester, _authorized, _delay);
  }

requester可以去发起一个新的轮次

  function requestNewRound()
    external
    returns (uint80)
  {
    require(requesters[msg.sender].authorized, "not authorized requester");

    uint32 current = reportingRoundId;
    require(rounds[current].updatedAt > 0 || timedOut(current), "prev round must be supersedable");

    uint32 newRoundId = current.add(1);
    requesterInitializeNewRound(newRoundId);
    return newRoundId;
  }

验证者相关操作

设置验证者(onlyOwner)

  function setValidator(address _newValidator)
    public
    onlyOwner()
  {
    address previous = address(validator);

    if (previous != _newValidator) {
      validator = AggregatorValidatorInterface(_newValidator);

      emit ValidatorUpdated(previous, _newValidator);
    }
  }

{ % if page.mermaid % } { % endif % }