Flask Mysql分布式设计(读写分离)

Mysql分布式设计

作用

  • 对数据备份, 实现高可用 HA (主要)
  • 通过读写分离, 提高吞吐量, 实现高性能

原理

Flask Mysql分布式设计(读写分离)
  • Mysql的复制 是一个异步的复制过程
  • 过程本质为 Slave 从 Master 端获取 Binary Log, 然后再在自己身上完全顺序的执行日志中所记录的各种操作
  • MySQL 复制的基本过程如下:
    1)Slave 上面的 IO 线程连接上 Master, 并请求从指定日志文件的指定位置之后的日志内容;
    2)Master 接收到来自 Slave 的 IO 线程的请求后, 通过负责复制的IO线程 根据请求信息读取日志信息,返回给 Slave 端的 IO 线程。
    3)Slave 的 IO 线程接收到信息后,将接收到的日志内容依次写入到 Slave 端的 Relay Log文件
    4)Slave 的 SQL 线程检测到 Relay Log 中新增加了内容后,会马上解析该文件中的内容, 并在自身执行这些 原始SQL语句。

注:
I/O thread从Binary log中读取到的数据并不是读一条执行一条而是先写入到Relay log中,否则效率会很慢

这里有几点需要注意:

  • 主从复制不是强一致性,只能保证最终一致
  • master配合binlog复制会影响性能,所以尽量不要在master上挂太多的slave,如果对时间要求不高,可以在slave上挂slave

; 常用架构

主从架构

  • 性能
    一主多从, 读写分离, 提高吞吐量
  • 可用性
    主库单点, 一旦挂了, 无法写入
    从库高可用

Flask Mysql分布式设计(读写分离)

主备架构

  • 性能
    单库读写, 性能一般
  • 可用性
    高可用, 一旦主库挂了, 就启用备库
  • 这种方案被阿里云、美团等企业广泛使用

Flask Mysql分布式设计(读写分离)

主备架构搭建除了配置双主同步, 还需要搭配第三方故障转移/高可用方案, 属于DBA和运维专业领域

MySQL + Keepalived 双主热备高可用操作记录
我们通常说的双机热备是指两台机器都在运行,但并不是两台机器都同时在提供服务。当提供服务的一台出现故障的时候,另外一台会马上自动接管并且提供服务,而且切换的时间非常短。MySQL双主复制,即互为Master-Slave(只有一个Master提供写操作),可以实现数据库服务器的热备,但是一个Master宕机后不能实现动态切换。使用Keepalived,可以通过虚拟IP,实现双主对外的统一接口以及自动检查、失败切换机制,从而实现MySQL数据库的高可用方案

1)先实施Master->Slave的主主同步。主主是数据双向同步,主从是数据单向同步。一般情况下,主库宕机后,需要手动将连接切换到从库上。(但是用keepalived就可以自动切换)
2)再结合Keepalived的使用,通过VIP实现Mysql双主对外连接的统一接口。即客户端通过Vip连接数据库;当其中一台宕机后,VIP会漂移到另一台上,这个过程对于客户端的数据连接来说几乎无感觉,从而实现高可用。

Flask Mysql分布式设计(读写分离)
如果我们基于代码层面而不考虑去安装部署keepalive,只需要在配置访问数据库地址时设置为VIP虚拟IP即可

问题: 既然主备互为备份, 为什么不采用双主方案, 提供两台主进行负载均衡?

  • 原因是为了避免数据的冲突,防止造成数据的不一致性。 虽然在两边执行的修改有先后顺序,但由于 Replication 是异步的实现机制,同样可能会导致 晚做的修改被早做的修改所覆盖

Flask Mysql分布式设计(读写分离)
  • 不仅B库数据错误, 且A&B库数据不一致

高可用复合架构

  • 性能
    读写分离, 提高吞吐量
  • 可用性
    高可用, 一旦主库挂了, 就启用备库

Flask Mysql分布式设计(读写分离)

Flask Mysql分布式设计(读写分离)
Flask Mysql分布式设计(读写分离)

读写分离

  • sqlalchemy 并没有像 django-orm 一样内置完善的读写分离方案, 但是提供了可以自定义的接口:官方参考文档, 我们可以借此对 flask-sqlalchemy 进行二次开发, 实现读写分离
engines = {
    'leader':create_engine("sqlite:///leader.db"),
    'other':create_engine("sqlite:///other.db"),
    'follower1':create_engine("sqlite:///follower1.db"),
    'follower2':create_engine("sqlite:///follower2.db"),
}

from sqlalchemy.sql import Update, Delete
from sqlalchemy.orm import Session, sessionmaker
import random

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):

        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines['other']

        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines['leader']
        else:

            return engines[
                random.choice(['follower1','follower2'])
            ]

通过class_ 这个属性指定自定义的session类
Session = sessionmaker(class_=RoutingSession)

基本实现思路:
实现自定义的 session类, 继承 SignallingSession类

  • 重写 get_bind方法, 根据读写需求选择对应的数据库地址

实现自定义的 SQLAlchemy类, 继承 SQLAlchemy类

  • 重写 create_session方法, 在内部使用自定义的 Session类

import random
from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase

app = Flask(__name__)

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.140:3306/test31'

app.config['SQLALCHEMY_BINDS'] = {
    'master': 'mysql://root:mysql@192.168.105.140:3306/test31',
    'slave1': 'mysql://root:mysql@192.168.105.140:8306/test31',
    'slave2': 'mysql://root:mysql@192.168.105.140:3306/test31'
}

app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

class RoutingSession(SignallingSession):
    def __init__(self, db, autocommit=False, autoflush=True, **options):

        super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)

        self.slave = random.choice(['slave1', 'slave2'])

    def get_bind(self, mapper=None, clause=None):
        """每次数据库操作(增删改查及事务操作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""

        state = get_state(self.app)

        if mapper is not None:
            try:

                persist_selectable = mapper.persist_selectable
            except AttributeError:

                persist_selectable = mapper.mapped_table

            info = getattr(persist_selectable, 'info', {})
            bind_key = info.get('bind_key')

            if bind_key is not None:
                return state.db.get_engine(self.app, bind=bind_key)

        if self._flushing or isinstance(clause, UpdateBase):
            print('写操作')
            return state.db.get_engine(self.app, bind='master')
        else:

            print('读操作: ', self.slave)
            return state.db.get_engine(self.app, bind=self.slave)

class RoutingSQLAlchemy(SQLAlchemy):
    def create_session(self, options):
        return orm.sessionmaker(class_=RoutingSession, db=self, **options)

db = RoutingSQLAlchemy(app)

class User(db.Model):
    __tablename__ = 't_user'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column('username', db.String(20), unique=True)
    age = db.Column(db.Integer, default=0, index=True)

@app.route('/')
def index():
    """增加数据"""

    print('---读-----------')

    users = User.query.all()
    for user in users:
        print(user.id, user.name, user.age)

    print('---读-----------')

    users = User.query.all()
    for user in users:
        print(user.id, user.name, user.age)

    return "index"

if __name__ == '__main__':

    db.drop_all()
    db.create_all()
    app.run(debug=True, host='0.0.0.0')

项目集成

  • 将工具包routing_db 导入 common/models中 , 其中的 routing_sqlalchemy.py文件实现了读写分离

import random
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase

class RoutingSession(SignallingSession):
    """自定义Session类, 继承SignallingSession"""

    def __init__(self, db, autocommit=False, autoflush=True, **options):
        super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)

        self.slave = random.choice(['slave1', 'slave2'])

    def get_bind(self, mapper=None, clause=None):
        """每次数据库操作(增删改查及事务操作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""

        state = get_state(self.app)

        if mapper is not None:
            try:

                persist_selectable = mapper.persist_selectable
            except AttributeError:

                persist_selectable = mapper.mapped_table

            info = getattr(persist_selectable, 'info', {})
            bind_key = info.get('bind_key')

            if bind_key is not None:
                return state.db.get_engine(self.app, bind=bind_key)

        if self._flushing or isinstance(clause, UpdateBase):
            print('写操作')
            return state.db.get_engine(self.app, bind='master')
        else:

            print('读操作: ', self.slave)
            return state.db.get_engine(self.app, bind=self.slave)

class RoutingSQLAlchemy(SQLAlchemy):
    """自定义SQLALchemy类"""
    def create_session(self, options):
        """重写create_session方法: 使用自定义Session类"""
        return orm.sessionmaker(class_=RoutingSession, db=self, **options)
  • 在 app/settings/config.py文件中 设置主从数据库的URI地址

class DefaultConfig:
    """默认配置"""

    ...

    SQLALCHEMY_BINDS = {
        "master": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
        "slave1": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
        "slave2": 'mysql://root:mysql@192.168.105.140:8306/hm_topnews'
    }

    ...

  • 在 app/ init.py文件 中使用自定义SQLAlchemy类

...

from models.routing_db.routing_sqlalchemy import RoutingSQLAlchemy

db = RoutingSQLAlchemy()

...

优化
修改前

class LoginResource(Resource):
    """注册登录"""
    def post(self):

        parser = RequestParser()
        parser.add_argument('mobile', required=True, location='json', type=mobile_type)
        parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
        args = parser.parse_args()
        mobile = args.mobile
        code = args.code

        key = 'app:code:{}'.format(mobile)
        real_code = redis_client.get(key)

        if not real_code or real_code != code:
            return {'message': 'Invalid Code', 'data': None}, 400

        user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()

        if user:
             user.last_login = datetime.now()

        else:
            user = User(mobile=mobile, name=mobile, last_login=datetime.now())
            db.session.add(user)

        db.session.commit()

        token = generate_jwt({'userid': user.id},
                             expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))

        return {'token': token}, 201

修改后

class LoginResource(Resource):
    """注册登录"""
    def post(self):

        parser = RequestParser()
        parser.add_argument('mobile', required=True, location='json', type=mobile_type)
        parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
        args = parser.parse_args()
        mobile = args.mobile
        code = args.code

        key = 'app:code:{}'.format(mobile)
        real_code = redis_client.get(key)

        if not real_code or real_code != code:
            return {'message': 'Invalid Code', 'data': None}, 400

        user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()

        if user:
             user.last_login = datetime.now()

        else:
            user = User(mobile=mobile, name=mobile, last_login=datetime.now())
            db.session.add(user)
            db.session.flush()
        userid = user.id

        db.session.commit()

        token = generate_jwt({'userid': userid},
                             expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))

        return {'token': token}, 201

Original: https://blog.csdn.net/weixin_47906106/article/details/124376426
Author: 季布,
Title: Flask Mysql分布式设计(读写分离)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/745179/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球