Group Replication 是一种 Shared-Nothing 的架构,每个节点都会保留一份数据。
虽然支持多点写入,但实际上系统的吞吐量是由处理能力最弱的那个节点决定的。
如果各个节点的处理能力参差不齐,那处理能力慢的节点就会出现事务堆积。
在事务堆积的时候,如果处理能力快的节点出现了故障,这个时候能否让处理能力慢的节点(存在事务堆积)接受业务流量呢?
为了避免出现上述两难场景,Group Replication 引入了流控机制。
在实现上,Group Replication 的流控模块会定期检查各个节点的事务堆积情况,如果超过一定值,则会触发流控。
流控会基于上一周期各个节点的事务认证情况和事务应用情况,决定当前节点(注意是当前节点,不是其它节点)下个周期的写入配额。
超过写入配额的事务操作会被阻塞,等到下个周期才能执行。
接下来,我们通过源码分析下流控的实现原理。
本文主要包括以下几部分:
流控触发的条件
默认情况下,节点的状态信息是每秒发送一次(节点的状态信息是在 flow_control_step 中发送的,发送周期由 group_replication_flow_control_period 决定)。
当接受到其它节点的状态信息时,会调用 Flow_control_module::handle_stats_data 来处理。
下面我们看看 Flow_control_module::handle_stats_data 函数的处理逻辑。
int Flow_control_module::handle_stats_data(const uchar *data, size_t len,<br>                                           const std::string &member_id) {<br>  DBUG_TRACE;<br>  int error = 0;<br>  Pipeline_stats_member_message message(data, len);<br><br>  m_flow_control_module_info_lock->wrlock();<br>  // m_info 是个字典,定义是 std::map<std::string, pipeline_member_stats><br>  // 其中,key 是节点的地址,value 是节点的状态信息。<br>  Flow_control_module_info::iterator it = m_info.find(member_id);<br>  // 如果 member_id 对应节点的状态信息在 m_info 中不存在,则插入。<br>  if (it == m_info.end()) {<br>    Pipeline_member_stats stats;<br><br>    std::pair<flow_control_module_info::iterator, bool> ret = m_info.insert(<br>        std::pair<std::string, pipeline_member_stats>(member_id, stats));<br>    error = !ret.second;<br>    it = ret.first;<br>  }<br>  // 更新节点的统计信息<br>  it->second.update_member_stats(message, m_stamp);<br><br>  // 检查是否需要流控<br>  if (it->second.is_flow_control_needed()) {<br>    ++m_holds_in_period;<br>#ifndef NDEBUG<br>    it->second.debug(it->first.c_str(), m_quota_size.load(),<br>                     m_quota_used.load());<br>#endif<br>  }<br><br>  m_flow_control_module_info_lock->unlock();<br>  return error;<br>}<br></std::string, pipeline_member_stats></flow_control_module_info::iterator, bool></std::string, pipeline_member_stats>
首先判断节点的状态信息是否在 m_info 中存在。如果不存在,则插入。
接着通过 update_member_stats 更新节点的统计信息。
更新后的统计信息包括以下两部分:
- 当前数据:如 m_transactions_waiting_certification(当前等待认证的事务数),m_transactions_waiting_apply(当前等待应用的事务数)。
- 上一周期的增量数据:如 m_delta_transactions_certified(上一周期进行认证的事务数)。 m_delta_transactions_certified 等于 m_transactions_certified (这一次的采集数据) – previous_transactions_certified (上一次的采集数据)
最后会通过is_flow_control_needed判断是否需要流控。如果需要流控,则会将 m_holds_in_period 自增加 1。
如果是 Debug 版本,且将 log_error_verbosity 设置为 3。当需要流控时,会在错误日志中打印以下信息。
[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33071 stats certifier_queue 0, applier_queue 20 certified 387797 (308), applied 387786 (289), local 0 (0), quota 400 (274) mode=1'
什么时候会触发流控呢?
接下来我们看看 is_flow_control_needed 函数的处理逻辑。
bool Pipeline_member_stats::is_flow_control_needed() {<br>  return (m_flow_control_mode == FCM_QUOTA) &&<br>         (m_transactions_waiting_certification ><br>              get_flow_control_certifier_threshold_var() ||<br>          m_transactions_waiting_apply ><br>              get_flow_control_applier_threshold_var());<br>}
由此来看,触发流控需满足以下条件:
除了条件 1,条件 2,3 满足其一即可。
当需要流控时,会将 m_holds_in_period 自增加 1。
m_holds_in_period 这个变量会用在 Flow_control_module::flow_control_step 中。
而 Flow_control_module::flow_control_step 是在 Certifier_broadcast_thread::dispatcher() 中调用的,每秒执行一次。
void Certifier_broadcast_thread::dispatcher() {<br>  ...<br>  while (!aborted) {<br>    ...<br>    applier_module->run_flow_control_step();<br>    ...<br>    struct timespec abstime;<br>    // 定义超时时长 1s。<br>    set_timespec(&abstime, 1);<br>    mysql_cond_timedwait(&broadcast_dispatcher_cond, &broadcast_dispatcher_lock,<br>                         &abstime);<br>    mysql_mutex_unlock(&broadcast_dispatcher_lock);<br><br>    broadcast_counter++;<br>  }<br>}<br><br>void run_flow_control_step() override {<br>  flow_control_module.flow_control_step(&pipeline_stats_member_collector);<br>}
配额的计算逻辑
接下来我们重点分析下 flow_control_step 函数的处理逻辑。
这个函数非常关键,它是整个流控模块的核心。
它主要是用来计算 m_quota_size 和 m_quota_used。
其中,m_quota_size 决定了下个周期允许提交的事务数,即我们所说的配额。
m_quota_used 用来统计下个周期已经提交的事务数,在该函数中会重置为 0。
void Flow_control_module::flow_control_step(<br>    Pipeline_stats_member_collector *member) {<br>  // 这里的 seconds_to_skip 实际上就是 group_replication_flow_control_period,后面会有定义。<br>  // 虽然 flow_control_step 是一秒调用一次,但实际起作用的还是 group_replication_flow_control_period。<br>  if (--seconds_to_skip > 0) return;<br>  <br>  // holds 即 m_holds_in_period<br>  int32 holds = m_holds_in_period.exchange(0);<br>  // get_flow_control_mode_var() 即 group_replication_flow_control_mode<br>  Flow_control_mode fcm =<br>      static_cast<flow_control_mode>(get_flow_control_mode_var());<br>  // get_flow_control_period_var() 即 group_replication_flow_control_period<br>  seconds_to_skip = get_flow_control_period_var();<br>  // 计数器<br>  m_stamp++;<br>  // 发送当前节点的状态信息<br>  member->send_stats_member_message(fcm);<br><br>  switch (fcm) {<br>    case FCM_QUOTA: {<br>      // get_flow_control_hold_percent_var() 即 group_replication_flow_control_hold_percent,默认是 10<br>      // 所以 HOLD_FACTOR 默认是 0.9<br>      double HOLD_FACTOR =<br>          1.0 -<br>          static_cast<double>(get_flow_control_hold_percent_var()) / 100.0;<br>      // get_flow_control_release_percent_var() 即 group_replication_flow_control_release_percent,默认是 50<br>      // 所以 RELEASE_FACTOR 默认是 1.5<br>      double RELEASE_FACTOR =<br>          1.0 +<br>          static_cast<double>(get_flow_control_release_percent_var()) / 100.0;<br>      // get_flow_control_member_quota_percent_var() 即 group_replication_flow_control_member_quota_percent,默认是 0<br>      // 所以 TARGET_FACTOR 默认是 0<br>      double TARGET_FACTOR =<br>          static_cast<double>(get_flow_control_member_quota_percent_var()) /<br>          100.0;<br>      // get_flow_control_max_quota_var() 即 group_replication_flow_control_max_quota,默认是 0<br>      int64 max_quota = static_cast<int64>(get_flow_control_max_quota_var());<br><br>      // 将上一个周期的 m_quota_size,m_quota_used 赋值给 quota_size,quota_used,同时自身重置为 0<br>      int64 quota_size = m_quota_size.exchange(0);<br>      int64 quota_used = m_quota_used.exchange(0);<br>      int64 extra_quota = (quota_size > 0 && quota_used > quota_size)<br>                              ? quota_used - quota_size<br>                              : 0;<br><br>      if (extra_quota > 0) {<br>        mysql_mutex_lock(&m_flow_control_lock);<br>        // 发送一个信号,释放 do_wait() 处等待的事务<br>        mysql_cond_broadcast(&m_flow_control_cond);<br>        mysql_mutex_unlock(&m_flow_control_lock);<br>      }<br>      // m_holds_in_period 大于 0,则意味着需要进行流控<br>      if (holds > 0) {<br>        uint num_writing_members = 0, num_non_recovering_members = 0;<br>        // MAXTPS 是 INT 的最大值,即 2147483647<br>        int64 min_certifier_capacity = MAXTPS, min_applier_capacity = MAXTPS,<br>              safe_capacity = MAXTPS;<br><br>        m_flow_control_module_info_lock->rdlock();<br>        Flow_control_module_info::iterator it = m_info.begin();<br>        // 循环遍历所有节点的状态信息<br>        while (it != m_info.end()) {<br>            // 这一段源码中没有,加到这里可以直观的看到触发流控时,每个节点的状态信息。<br>#ifndef NDEBUG<br>            it->second.debug(it->first.c_str(), quota_size,<br>                     quota_used);<br>#endif<br>          if (it->second.get_stamp() < (m_stamp - 10)) {<br>            // 如果节点的状态信息在最近 10 个周期内都没有更新,则清掉<br>            m_info.erase(it++);<br>          } else {<br>            if (it->second.get_flow_control_mode() == FCM_QUOTA) {<br>              // 如果 group_replication_flow_control_certifier_threshold 大于 0,<br>              // 且上一个周期进行认证的事务数大于 0,<br>              // 且当前等待认证的事务数大于 group_replication_flow_control_certifier_threshold,<br>              // 且上一个周期进行认证的事务数小于 min_certifier_capacity<br>              // 则会将上一个周期进行认证的事务数赋予 min_certifier_capacity<br>              if (get_flow_control_certifier_threshold_var() > 0 &&<br>                  it->second.get_delta_transactions_certified() > 0 &&<br>                  it->second.get_transactions_waiting_certification() -<br>                          get_flow_control_certifier_threshold_var() ><br>                      0 &&<br>                  min_certifier_capacity ><br>                      it->second.get_delta_transactions_certified()) {<br>                min_certifier_capacity =<br>                    it->second.get_delta_transactions_certified();<br>              }<br><br>              if (it->second.get_delta_transactions_certified() > 0)<br>                // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_certified() 中的较小值<br>                safe_capacity =<br>                    std::min(safe_capacity,<br>                             it->second.get_delta_transactions_certified());<br><br><br>              // 针对的是 applier,逻辑同 certifier 一样<br>              if (get_flow_control_applier_threshold_var() > 0 &&<br>                  it->second.get_delta_transactions_applied() > 0 &&<br>                  it->second.get_transactions_waiting_apply() -<br>                          get_flow_control_applier_threshold_var() ><br>                      0) {<br>                if (min_applier_capacity ><br>                    it->second.get_delta_transactions_applied())<br>                  min_applier_capacity =<br>                      it->second.get_delta_transactions_applied();<br><br>                if (it->second.get_delta_transactions_applied() > 0)<br>                  // 如果上一个周期有事务应用,说明该节点不是 recovering 节点<br>                  num_non_recovering_members++;<br>              }<br><br>              if (it->second.get_delta_transactions_applied() > 0)<br>                // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_applied() 中的较小值<br>                safe_capacity = std::min(<br>                    safe_capacity, it->second.get_delta_transactions_applied());<br><br>              if (it->second.get_delta_transactions_local() > 0)<br>                // 如果上一个周期有本地事务,则意味着该节点存在写入<br>                num_writing_members++;<br>            }<br>            ++it;<br>          }<br>        }<br>        m_flow_control_module_info_lock->unlock();<br><br>        num_writing_members = num_writing_members > 0 ? num_writing_members : 1;<br>        // min_capacity 取 min_certifier_capacity 和 min_applier_capacity 的较小值<br>        int64 min_capacity = (min_certifier_capacity > 0 &&<br>                              min_certifier_capacity < min_applier_capacity)<br>                                 ? min_certifier_capacity<br>                                 : min_applier_capacity;<br><br>        // lim_throttle 是最小配额<br>        int64 lim_throttle = static_cast<int64>(<br>            0.05 * std::min(get_flow_control_certifier_threshold_var(),<br>                            get_flow_control_applier_threshold_var()));<br>        // get_flow_control_min_recovery_quota_var() 即 group_replication_flow_control_min_recovery_quota<br>        if (get_flow_control_min_recovery_quota_var() > 0 &&<br>            num_non_recovering_members == 0)<br>          lim_throttle = get_flow_control_min_recovery_quota_var();<br>        // get_flow_control_min_quota_var() 即 group_replication_flow_control_min_quota<br>        if (get_flow_control_min_quota_var() > 0)<br>          lim_throttle = get_flow_control_min_quota_var();<br><br>        // min_capacity 不能太小,不能低于 lim_throttle<br>        min_capacity =<br>            std::max(std::min(min_capacity, safe_capacity), lim_throttle);<br><br>        // HOLD_FACTOR 默认是 0.9<br>        quota_size = static_cast<int64>(min_capacity * HOLD_FACTOR);<br><br>        // max_quota 是由 group_replication_flow_control_max_quota 定义的,即 quota_size 不能超过 max_quota<br>        if (max_quota > 0) quota_size = std::min(quota_size, max_quota);<br>        <br>        // num_writing_members 是有实际写操作的节点数<br>        if (num_writing_members > 1) {<br>          // 如果没有设置 group_replication_flow_control_member_quota_percent,则按照节点数平分 quota_size<br>          if (get_flow_control_member_quota_percent_var() == 0)<br>            quota_size /= num_writing_members;<br>          else<br>          // 如果有设置,则当前节点的 quota_size 等于 quota_size * group_replication_flow_control_member_quota_percent / 100<br>            quota_size = static_cast<int64>(static_cast<double>(quota_size) *<br>                                            TARGET_FACTOR);<br>        }<br>        // quota_size 还会减去上个周期超额使用的 quota<br>        quota_size =<br>            (quota_size - extra_quota > 1) ? quota_size - extra_quota : 1;<br>#ifndef NDEBUG<br>        LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_FLOW_CONTROL_STATS,<br>                     quota_size, get_flow_control_period_var(),<br>                     num_writing_members, num_non_recovering_members,<br>                     min_capacity, lim_throttle);<br>#endif<br>      } else {<br>        // 对应 m_holds_in_period = 0 的场景,RELEASE_FACTOR 默认是 1.5<br>        if (quota_size > 0 && get_flow_control_release_percent_var() > 0 &&<br>            (quota_size * RELEASE_FACTOR) < maxtps) {<br>          // 当流控结束后,quota_size = 上一个周期的 quota_size * 1.5<br>          int64 quota_size_next =<br>              static_cast<int64>(quota_size * RELEASE_FACTOR);<br>          quota_size =<br>              quota_size_next > quota_size ? quota_size_next : quota_size + 1;<br>        } else<br>          quota_size = 0;<br>      }<br><br>      if (max_quota > 0)<br>        // quota_size 会取 quota_size 和 max_quota 中的较小值<br>        quota_size =<br>            std::min(quota_size > 0 ? quota_size : max_quota, max_quota);<br>      // 最后,将 quota_size 赋值给 m_quota_size,m_quota_used 重置为 0<br>      m_quota_size.store(quota_size);<br>      m_quota_used.store(0);<br>      break;<br>    }<br><br>    // 如果 group_replication_flow_control_mode 为 DISABLED,<br>    // 则会将 m_quota_size 和 m_quota_used 置为 0,这个时候会禁用流控。<br>    case FCM_DISABLED:<br>      m_quota_size.store(0);<br>      m_quota_used.store(0);<br>      break;<br><br>    default:<br>      assert(0);<br>  }<br><br>  if (local_member_info->get_recovery_status() ==<br>      Group_member_info::MEMBER_IN_RECOVERY) {<br>    applier_module->get_pipeline_stats_member_collector()<br>        ->compute_transactions_deltas_during_recovery();<br>  }<br>}<br></int64></ maxtps) {<br></double></int64></int64></int64></ min_applier_capacity)<br></ (m_stamp - 10)) {<br></int64></double></double></double></flow_control_mode>
代码的逻辑看上去有点复杂。
接下来,我们通过一个具体的示例看看 flow_control_step 函数的实现逻辑。
基于案例定量分析
测试集群有三个节点组成:127.0.0.1:33061,127.0.0.1:33071 和 127.0.0.1:33081。
运行在多主模式下。
使用 sysbench 对 127.0.0.1:33061 进行插入测试(oltp_insert)。
为了更容易触发流控,这里将 127.0.0.1:33061 节点的 group_replication_flow_control_applier_threshold 设置为了 10。
以下是触发流控时 127.0.0.1:33061 的日志信息。
[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33061 stats certifier_queue 0, applier_queue 0 certified 7841 (177), applied 0 (0), local 7851 (177), quota 146 (156) mode=1'<br>[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33071 stats certifier_queue 0, applier_queue 0 certified 7997 (186), applied 8000 (218), local 0 (0), quota 146 (156) mode=1'<br>[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33081 stats certifier_queue 0, applier_queue 15 certified 7911 (177), applied 7897 (195), local 0 (0), quota 146 (156) mode=1'<br>[Note] [MY-011727] [Repl] Plugin group_replication reported: 'Flow control: throttling to 149 commits per 1 sec, with 1 writing and 1 non-recovering members, min capacity 177, lim throttle 0'
以 127.0.0.1:33081 的状态数据为例,我们看看输出中各项的具体含义:
- certifier_queue 0:认证队列的长度。
- applier_queue 15:应用队列的长度。
- certified 7911 (177):7911 是已经认证的总事务数,177 是上一周期进行认证的事务数(m_delta_transactions_certified)。
- applied 7897 (195):7897 是已经应用的总事务数,195 是上一周期应用的事务数(m_delta_transactions_applied)。
- local 0 (0):本地事务数。括号中的 0 是上一周期的本地事务数(m_delta_transactions_local)。
- quota 146 (156):146 是上一周期的 quota_size,156 是上一周期的 quota_used。
- mode=1:mode 等于 1 是开启流控。
因为 127.0.0.1:33081 中 applier_queue 的长度(15)超过 127.0.0.1:33061 中的 group_replication_flow_control_applier_threshold 的设置(10),所以会触发流控。
触发流控后,会调用 flow_control_step 计算下一周期的 m_quota_size。
-
循环遍历各节点的状态信息。集群的吞吐量(min_capacity)取各个节点 m_delta_transactions_certified 和 m_delta_transactions_applied 的最小值。具体在本例中, min_capacity = min(177, 186, 218, 177, 195) = 177。
-
min_capacity 不能太小,不能低于 lim_throttle。im_throttle 的取值逻辑如下:
-
初始值是 0.05 * min (group_replication_flow_control_applier_threshold, group_replication_flow_control_certifier_threshold)。 具体在本例中,min_capacity = 0.05 * min(10, 25000) = 0.5。
- 如果设置了 group_replication_flow_control_min_recovery_quota 且 num_non_recovering_members 为 0,则会将 group_replication_flow_control_min_recovery_quota 赋值给 min_capacity。 num_non_recovering_members 什么时候会为 0 呢?在新节点加入时,因为认证队列中积压的事务过多而触发的流控。
-
如果设置了 group_replication_flow_control_min_quota,则会将 group_replication_flow_control_min_quota 赋值给 min_capacity。
-
quota_size = min_capacity * 0.9 = 177 * 0.9 = 159。这里的 0.9 是 1 – group_replication_flow_control_hold_percent /100。之所以要预留部分配额,主要是为了处理积压事务。
-
quota_size 不能太大,不能超过 group_replication_flow_control_max_quota。
-
注意,这里计算的 quota_size 是集群的吞吐量,不是单个节点的吞吐量。如果要计算当前节点的吞吐量,最简单的办法是将 quota_size / 有实际写操作的节点数(num_writing_members)。怎么判断一个节点是否进行了实际的写操作呢?很简单,上一周期有本地事务提交,即 m_delta_transactions_local > 0。具体在本例中,只有一个写节点,所以,当前节点的 quota_size 就等于集群的 quota_size,即 159。除了均分这个简单粗暴的方法,如果希望某些节点比其它节点承担更多的写操作,也可通过 group_replication_flow_control_member_quota_percent 设置权重。这个时候,当前节点的吞吐量就等于 quota_size * group_replication_flow_control_member_quota_percent / 100。
-
最后,当前节点的 quota_size 还会减去上个周期超额使用的 quota(extra_quota)。上个周期的 extra_quota 等于上个周期的 quota_used – quota_size = 156 – 146 = 10。所以,当前节点的 quota_size 就等于 159 – 10 = 149,和日志中的输出完全一致。为什么会出现 quota 超额使用的情况呢?这个后面会提到。
-
当 m_holds_in_period 又恢复为 0 时,就意味着流控结束。流控结束后,MGR 不会完全放开 quota 的限制,否则写入量太大,容易出现突刺。MGR 采取的是一种渐进式的恢复策略,即下一周期的 quota_size = 上一周期的 quota_size * (1 + group_replication_flow_control_release_percent / 100)。
-
group_replication_flow_control_mode 是 DISABLED ,则会将 m_quota_size 和 m_quota_used 置为 0。m_quota_size 置为 0,实际上会禁用流控。为什么会禁用流控,这个后面会提到。
配额的作用时机
既然我们已经计算出下一周期的 m_quota_size,什么时候使用它呢?事务提交之后,GCS 广播事务消息之前。
int group_replication_trans_before_commit(Trans_param *param) {<br>  ...<br>  // 判断事务是否需要等待<br>  applier_module->get_flow_control_module()->do_wait();<br><br>  // 广播事务消息<br>  send_error = gcs_module->send_transaction_message(*transaction_msg);<br>  ...<br>}
接下来,我们看看 do_wait 函数的处理逻辑。
int32 Flow_control_module::do_wait() {<br>  DBUG_TRACE;<br>  // 首先加载 m_quota_size<br>  int64 quota_size = m_quota_size.load();<br>  // m_quota_used 自增加 1。<br>  int64 quota_used = ++m_quota_used;<br><br>  if (quota_used > quota_size && quota_size != 0) {<br>    struct timespec delay;<br>    set_timespec(&delay, 1);<br><br>    mysql_mutex_lock(&m_flow_control_lock);<br>    mysql_cond_timedwait(&m_flow_control_cond, &m_flow_control_lock, &delay);<br>    mysql_mutex_unlock(&m_flow_control_lock);<br>  }<br><br>  return 0;<br>}
可以看到,如果 quota_size 等于 0,do_wait 会直接返回,不会执行任何等待操作。这也就是为什么当 m_quota_size 等于 0 时,会禁用流控操作。
如果 quota_used 大于 quota_size 且 quota_size 不等于 0,则意味着当前周期的配额用完了。这个时候,会调用 mysql_cond_timedwait 触发等待。
这里的 mysql_cond_timedwait 会在两种情况下退出:
需要注意的是,m_quota_used 是自增在前,然后才进行判断,这也就是为什么 quota 会出现超额使用的情况。
在等待的过程中,如果客户端是多线程并发写入(且单个线程的下个操作会等待上个操作完成),这里会等待多个事务,并且超额使用的事务数不会多于客户端并发线程数。
所以,在上面的示例中,为什么 quota_used(156) 比 quota_size(146)多 10,这个实际上是 sysbench 并发线程数的数量。
接下来,我们看看示例中这 156 个事务在 do_wait 处的等待时间。
...<br>0.000020<br>0.000017<br>0.000023<br>0.000073<br>0.000023<br>0.000018<br>0.570180<br>0.567999<br>0.561916<br>0.561162<br>0.558930<br>0.557714<br>0.556683<br>0.550581<br>0.548102<br>0.547176
前 146 个事务的平均等待时间是 0.000035s,后 10 个事务的平均等待时间是 0.558044s。
流控的相关参数
group_replication_flow_control_mode
是否开启流控。默认是 QUOTA,基于配额进行流控。如果设置为 DISABLED ,则关闭流控。
group_replication_flow_control_period
流控周期。有效值 1 – 60,单位秒。默认是 1。注意,各个节点的流控周期应保持一致,否则的话,就会将周期较短的节点配额作为集群配额。
看下面这个示例,127.0.0.1:33061 这个节点的 group_replication_flow_control_period 是 10,而其它两个节点的 group_replication_flow_control_period 是 1。
2022-08-27T19:01:50.699939+08:00 63 [Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33061 stats certifier_queue 0, applier_queue 0 certified 217069 (1860), applied 1 (0), local 217070 (1861), quota 28566 (1857) mode=1'<br>2022-08-27T19:01:50.699955+08:00 63 [Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33071 stats certifier_queue 0, applier_queue 2 certified 218744 (157), applied 218746 (165), local 0 (0), quota 28566 (1857) mode=1'<br>2022-08-27T19:01:50.699967+08:00 63 [Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33081 stats certifier_queue 16383, applier_queue 0 certified 0 (0), applied 0 (0), local 0 (0), quota 28566 (1857) mode=1'<br>2022-08-27T19:01:50.699979+08:00 63 [Note] [MY-011727] [Repl] Plugin group_replication reported: 'Flow control: throttling to 141 commits per 10 sec, with 1 writing and 0 non-recovering members, min capacity 157, lim throttle 100'
最后,会将 127.0.0.1:33071 这个节点 1s 的配额(157 * 0.9)当作 127.0.0.1:33061 10s 的配额。
所以,我们会观察到下面这个现象:
执行时间   TPS<br>19:01:50   49<br>19:01:51   93<br>19:01:52    1<br>19:01:53    1<br>19:01:54    1<br>19:01:55    1<br>19:01:56    1<br>19:01:57    1<br>19:01:58    1<br>19:01:59    1<br>19:02:00    1
127.0.0.1:33061 在头两秒就使用完了所有配额,导致后面的事务会等待 1s(mysql_cond_timedwait 的超时时长)才处理。因为模拟时指定的并发线程数是 1,所以这里的 TPS 会是 1。
为什么不是被 flow_control_step 中的m_flow_control_cond 信号释放呢?因为127.0.0.1:33061 这个节点的 group_replication_flow_control_period 是 10,所以 flow_control_step 10s 才会执行一次。
group_replication_flow_control_applier_threshold
待应用的事务数如果超过 group_replication_flow_control_applier_threshold 的设置,则会触发流控,该参数默认是 25000。
group_replication_flow_control_certifier_threshold
待认证的事务数如果超过 group_replication_flow_control_certifier_threshold 的设置,则会触发流控,该参数默认是 25000。
group_replication_flow_control_min_quota
group_replication_flow_control_min_recovery_quota
两个参数都会决定当前节点下个周期的最小配额,只不过 group_replication_flow_control_min_recovery_quota 适用于新节点加入时的分布式恢复阶段。group_replication_flow_control_min_quota 则适用于所有场景。如果两者同时设置了, group_replication_flow_control_min_quota 的优先级更高。两者默认都为 0,即不限制。
group_replication_flow_control_max_quota
当前节点下个周期的最大配额。默认是 0,即不限制。
group_replication_flow_control_member_quota_percent
分配给当前成员的配额比例。有效值 0 – 100。默认为 0,此时,节点配额 = 集群配额 / 上个周期写节点的数量。
注意,这里的写节点指的是有实际写操作的节点,不是仅指 PRIMARY 节点。毕竟不是所有的 PRIMARY 节点都会有写操作。
另外,设置配额比例时,不要求所有节点的配额比例加起来等于 100。
group_replication_flow_control_hold_percent
预留配额的比例。有效值 0 – 100,默认是 10。预留的配额可用来处理落后节点积压的事务。
group_replication_flow_control_release_percent
当流控结束后,会逐渐增加吞吐量以避免出现突刺。
下一周期的 quota_size = 上一周期的 quota_size * (1 + group_replication_flow_control_release_percent / 100)。有效值 0 – 1000,默认是 50。
-
从可用性的角度出发,不建议线上关闭流控。虽然主节点出现故障的概率很小,但墨菲定律告诉我们,任何有可能发生的事情最后一定会发生。在线上还是不要心存侥幸。
-
流控限制的是当前节点的流量,不是其它节点的。
-
流控参数在各节点应保持一致,尤其是 group_replication_flow_control_period。
参考资料
[1] WL#9838: Group Replication: Flow-control fine tuning: https://dev.mysql.com/worklog/task/?id=9838
[2] MySQL Group Replication流控实现分析: https://zhuanlan.zhihu.com/p/39541394
Original: https://www.cnblogs.com/ivictor/p/16797830.html
Author: iVictor
Title: 从源码分析 MGR 的流控机制
相关阅读
Title: 关于账本数据库:你想知道的这里都有
💕前言:十二月份出个openGuass集合专栏,带领大家浅浅的认识一下国产数据库吧💕

1. 什么是账本数据库
区块链一定是大家都熟悉的。比特币、以太坊甚至狗币等代币作为区块链的代名词,不仅影响到一些人账户的盈亏,甚至影响到市场上显卡和硬盘的价格。但作为数据库相关的技术人员或爱好者,对于这项新技术,我们更多的是与其核心技术相关。
[En]
Blockchain must be familiar to everyone. Tokens such as Bitcoin, Ethernet Square and even Dog Coin, as synonyms for blockchain, affect not only the profits and losses of some people’s accounts, but even the price of graphics cards and hard drives in the market. But as database-related technicians or enthusiasts, for this new technology, we are more related to its core technology.
区块链作为一种分布式账本技术,克服了传统集中式账本存储效率低、可信度低、易受单点攻击的缺点,在技术上保证了其具有分布式共享、多方共识、不可篡改和可追溯性等特点。
[En]
As a distributed ledger technology, blockchain overcomes the disadvantages of low storage efficiency, low credibility and vulnerability to a single point of attack of the traditional centralized ledger, and technically ensures that it has the characteristics of distributed sharing, multi-party consensus, untamper and traceability.
那么区块链这么好,我们能用它来替代数据库吗?答案当然是NO!因为区块链往往有着交易性能低下,查询不便等诸多弊端。比特币系统仅支持每秒处理7笔交易,如果用它来承担主要的金融交易,效率自然是十分低下的。业界往往采用数据库来提高区块链的数据存储、检索能力。我们不妨换个角度,利用openGauss数据库天然具有的高性能、高可靠、高安全等优势,从openGauss出发,融入一些区块链的密码学防篡改、多方共识等技术,来提高数据库自身的防篡改、可追溯能力。防篡改账本数据库的idea应运而生。
在基础设施模式方面,区块链通常分为七层:
[En]
In terms of the infrastructure model, blockchain is usually divided into seven layers:
-
应用层
-
查询层
-
合约层
-
激励层
-
共识层
-
网络层
-
数据层
各层的详细技术要点如下图所示:
[En]
The detailed technical points for each layer are shown in the following figure:

图 1 区块链基础架构模型
数据库吸纳区块链防篡改的能力,首先想到的就是从区块链技术的最底层:数据层出发,让数据库提供数据的校验信息记录以及数据的篡改校验的能力,保证数据库在处理敏感信息时能够忠实的记录每一笔交易造成的数据更改,形成一个忠实、完整的数据变更”账本”。我们本次要介绍的openGauss账本数据库,即是在openGauss内核中植入了在数据修改时,对数据的变更操作进行记录这一功能,保证整个数据链路可查询、可溯源;同时提供高效的篡改校验接口,提供给上层的应用系统或者多个参与方之间互相校验数据的一致性。之后,我们将详细介绍账本数据库的实现原理以及其对openGauss的改造。
2. openGauss账本数据库原理剖析

图 2 账本数据库新增模块
客户端发送SQL对数据库中数据进行修改时,要经过通信模块的接收,解析模块的处理,转成解析树,然后经过优化生成执行计划。执行模块拿到执行计划,会调用存储层接口对数据进行修改。如上图所示,我们在数据的修改过程中,增加了篡改校验信息的记录;同时,提供了篡改校验模块,供用户调用接口执行校验。篡改信息记录和篡改校验的基础是我们针对数据库增、删、改操作设计的篡改校验信息。下面我们针对新增的篡改校验信息进行介绍。
2.1 防篡改用户表

图 3 防篡改用户表结构
在账本数据库特性中,我们使用schema级别进行防篡改表和普通表的隔离。在防篡改schema中的表,具有校验信息,且每次涉及到增、删、改的操作均会记录相应的数据变化以及操作的语句,我们称这些表为防篡改表。而普通的schema中的表,我们称其为普通表。
防篡改表有如图 3所示的结构。在创建防篡改表时,系统会增加一行hash列,该列在发生数据插入或者数据修改时,都会实时计算数据的摘要。数据与摘要存在一个tuple中,密不可分。由hash函数的单向性,我们将每一行的摘要,作为该行数据在摘要空间的逻辑表示。
2.2 用户历史表

图 4 用户历史表结构
用户历史表结构见上图,主要包含四列:xid、hash_ins、hash_del、pre_hash。用户历史表的每一行对应着用户表的每一次行级数据更改,其中xid记录数据更改时的xid号,代表着操作进行的逻辑时间顺序。hash_ins记录INSERT或者UPDATE操作插入的数据行的hash值,hash_del记录着DELETE或者UPDATE删除数据行的hash值。同时,hash_ins和hash_del是否为空,代表着INSERT、DELETE、UPDATE三种不同的操作类型,其对应关系如下表。
hash_ins
hash_del
Insert
√(插入数据hash)
—
Delete
—
√(删除数据hash)
Update
√(新数据hash)
√(删除前数据hash)
pre_hash将历史表的当前行数据和上一行的pre_hash数据进行拼接,生成当前用户历史表的数据整体摘要,计算公式如下:

这里i代表用户历史表的第i行,rowdatai为第i行xid||hash_ins||hash_del拼接的数据。
在校验用户历史表的完整性时,通过使用rowdata数据从前往后依次计算pre_hash值,并与表中的pre_hash进行比对,如果数据不一致,则说明用户历史表的完整性被破坏。
2.3 全局区块表结构

图 5 全局区块表结构
全局区块表结构见上图,表中每一行对应一次防篡改表修改行为,作为一个区块保存。全局区块表主要包括三部分内容:区块信息主要保存了区块相关的标记信息,包括区块号、时间戳。操作信息包括了用户对防篡改数据表的操作信息,包括数据库名、用户名、表名等标识信息,以及对应的SQL语句。校验信息保存用于一致性或完整性校验的hash信息,包括表级hash(rel_hash)、全局hash(global_hash)。
2.4 篡改校验算法

当用户调用防篡改检查接口时,系统可以使用防篡改用户表并行生成表级总检查信息;使用该用户表对应的历史表中的记录来生成变更记录的整体检查信息。然后通过比较生成的两个检查信息的一致性来判断数据和操作之间的一致性。如果不一致,则发生了绕过系统记录修改数据的行为,即篡改。
[En]
When the user invokes the tamper check interface, the system can use the tamper-proof user table to generate table-level total check information in parallel; use the records in the history table corresponding to the user table to generate the overall check information of the change record. Then the consistency between the data and the operation is judged by comparing the consistency of the two check information generated. If it is inconsistent, the behavior of modifying data that bypasses the system record, that is, tampering, has occurred.
图6显示了从防篡改用户表中的行级检查信息生成表级验证的过程。在检查过程中,扫描表中的数据,获取每行的检查信息,并使用行检查信息对行数据进行验证。在扫描整行和校对信息的过程中,我们可以通过内置的可交换支票信息聚合算法不断生成扫描数据的整体奇偶信息。由于信息聚合算法的互换性,这一过程可以完全并行执行。
[En]
The process of generating table-level verification from the row-level check information in the tamper-proof user table is shown in figure 6. During the check, the data in the table is scanned, the check information in each row is obtained, and the row data is verified using the row check information. In the process of scanning the overall row and proofreading information, we can constantly generate the overall parity information of the scanned data through the built-in exchangeable check information aggregation algorithm. Because of the exchangeability of the information aggregation algorithm, this process can be executed completely in parallel.
通过用户历史表生成变更记录总体的校验信息如图 7 所示。通过我们设计的用户历史表的结构,其hash_ins列中的非空元素代表了所有操作导致的数据校验信息的增加,hash_del列中的非空元素则代表了校验数据减少。我们通过对两列元素做差集,得到剩余的校验信息的集合。然后利用可交换校验信息聚合算法得到用户历史表中记录操作造成的变更记录整体的校验信息。这一过程,由于聚合算法的可交换性,可以对每行先进行hash_ins – hash_del,然后在扫描的时候不断叠加生成。这里,变更记录整体校验信息的生成也是完全可以并行的。

图 7 用户历史表校验信息生成
3. openGauss账本数据库发展展望
账本数据库作为openGauss防篡改数据的基础,目前支持了数据库内校验信息的记录以及提供高性能校验接口。提供了区块链技术层次中存储层的部分功能。为了实现防篡改, 我们还需要增加多个数据库间的高性能远程执行能力,以及提供可插拔的高性能多方共识协议,这样才能形成完整的openGauss多方可信防篡改能力。在数据库融合区块链的领域,openGauss会不断进化,为大家带来更加易用、更加高效的防篡改数据库。
公众实时更新:叶秋学长
下一篇解密openGauss DB4AI框架的内部机理
Original: https://blog.csdn.net/m0_63722685/article/details/128124881
Author: 叶秋学长
Title: 关于账本数据库:你想知道的这里都有
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/363377/
转载文章受原作者版权保护。转载请注明原作者出处!