SQLAlchemy完全入门

最近想要学习 SQLAlchemy, 发现网上的中文文档大多是机翻的, 读起来特别变扭, 因此对照着最新的英文文档梳理了一遍, 写下来记录一下
目前SQLAlchemy的版本为 1.4.x, 风格处于1.x过渡到2.0的时代. 为了尽量让这篇文章的兼容之后的版本, 本文将讲述1.x和2.0两种风格的接口(主要是查询的接口)

其实在2.0风格中, 主要受到影响的是ORM的查询方式, 详情见文档: 2.0 Migration – ORM Usage

安装

pip install sqlalchemy

检测 sqlalchemy版本:

>>>import sqlalchemy
>>>sqlalchemy.__version__
'1.4.27'

使用步骤

一般来说 SQLAlchemy的使用方式有两种: CoreORM
两种有什么不同呢?

  1. ORM是构建在 Core之上的
  2. Core更加底层, 可以执行直接执行SQL语句
  3. ORM类似于Django的ORM, 由于sqlalchemy提供了一套接口, 所以不需要我们直接写SQL语句 (1.x版本)
  4. 至于要用哪个, 等到你用到时, 你会知道的

组件依赖关系图:

SQLAlchemy完全入门

Core

一般来说, 使用步骤如下:

  1. 配置数据库连接
  2. 建立连接
  3. 创建表
  4. 执行SQL语句, 按需开启事件是否自动提交
  5. 拿到返回数据, 执行其他代码

数据库的连接的格式

我们在创建引擎(连接)时, 需要指定数据库的URL, URL格式, 见: Engine Configuration
, 总的来说, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]

  • dialect 数据库名称(方言): 如mysql
  • driver 连接数据库的库: 如: pymysql
  • user 用户名
  • password 密码
  • host 地址
  • dbname 数据库名称
  • key=value 指的是给数据库的参数

如下面的URL:

mysql+pymysql://root:passwd@127.0.0.1:3306/test_db?charset=utf8

建立连接

调用 sqlalchemy.create_engine方法, 为了兼容2.0风格的接口, 可以加上 future参数. 至于什么是2.0风格的接口, 可以看看官方文档: 2.0 style
create_engine有几个参数需要我们注意:

  • url 即数据库url, 其格式见上文:数据库的连接的格式
  • echo参数为 True时, 将会将engine的SQL记录到日志中 ( 默认输出到标准输出)
  • echo_poolTrue时,会将连接池的记录信息输出
  • future 使用2.0样式 EngineConnection API

更多参数见官方文档: sqlalchemy.create_engine

例子

from sqlalchemy import create_engine

兼容2.0的写法
返回对象不一样
engine1 = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
print(type(engine1))
#

engine2 = create_engine("sqlite+pysqlite:///:memory:", echo=True)
print(type(engine2))
#

注意, 由于 sqlalchemy使用 lazy initialization的策略连接数据库, 故此时还未真正地连接上数据库

创建表

我们想要让数据库创建一个表, 需要利用 MetaData对象, 关于一些常用的 MetaData方法, 见: MetaData
除了要 MetaData对象外, 我们还需要 Table对象, 用于定义一个表的结构
Table的一般使用

mytable = Table("mytable", metadata,
    Column('mytable_id', Integer, primary_key=True),
    Column('value', String(50))
    )

Table的参数:

  • name 表名称
  • metadata 该表所属的MetaData对象
  • 其他参数: 通过 Column指定一列数据, 格式见:Column定义

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    # 定义外键
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)
相当于执行 CREATE TABLE 语句
metadata_obj.create_all(engine)

"""
-- 相当于:
CREATE TABLE user_account (
    id INTEGER NOT NULL AUTO_INCREMENT,
    username VARCHAR(30),
    PRIMARY KEY (id)
);
CREATE TABLE address (
    id INTEGER NOT NULL AUTO_INCREMENT,
    uid INTEGER NOT NULL,
    email_address VARCHAR(32) NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY(uid) REFERENCES user_account (id)
)
"""

create_all方法, 默认会在创建表之间检测一下表是否存在, 不存在时才创建.

Table的一些属性

---------- 访问所有列
.c  => Column
print(user_table.c.keys())
['id', 'username']

---------- 访问某一列
print(repr(user_table.c.username))
Column('username', String(length=30), table=)

---------- 返回主键
print(user_table.primary_key)
隐式生成
PrimaryKeyConstraint(Column('id', Integer(), table=, primary_key=True, nullable=False))

在事务中执行SQL

通常, 我们通过调用 engine.connectengine.begin方法开始一个事件
sqlalchemy使用事务有两种风格 commit as you goBegin once, 前者需要我们手动提交, 后者会自动提交

手动提交

engine.connect方法符合python的上下文管理协议, 会返回一个 Connection对象, 该方法会在不手动提交的情况下回滚.举个例子:

from sqlalchemy import create_engine
from sqlalchemy import text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 执行
    result = conn.execute(text("select 'hello world'")) # text 可以使用SQL语句
    print(result.all())
    # conn.commit()
    # [('hello world',)]

    # 最后会ROLLBACK

上面的代码中, 相当于开启了事务, 由于最后没有调用 commit方法, 所以会回滚.

自动提交

engine.begin方法也符合python的上下文管理协议, 只要执行时不报错就会自动提交, 报错时会回滚.

from sqlalchemy import create_engine
from sqlalchemy import text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select 'hello world'"))
    print(result.all())
    # [('hello world',)]

    # COMMIT

绑定参数

上面在事务中执行SQL语句时, 我们用到了 sqlalchemy.text, 可以直接定义文本SQL字符串
为了避免被SQL注入, 故在需要传入参数的场景中需要根据 sqlalchemy的方式传入, 而不是直接拼接成字符串.

使用 :y的格式定义参数, 且将值以字典的形式传给 execute

from sqlalchemy import create_engine
from sqlalchemy import text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"})
    print(result.all())
    # [('lczmx',)]

    # COMMIT

多个参数时, 可以这样

with engine.connect() as conn:
    conn.execute(
        text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"),
        [{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}])
    conn.commit()

这种方式也可以

stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

with engine.connect() as conn:
    conn.execute(stmt)
    conn.commit()

增删改查

处理使用 text直接执行SQL外, 你还可以使用其他语法增删改查数据
假如表结构如下:

$show create table address;
+---------+-----------------------------------------+
| Table   | Create Table                            |
+---------+-----------------------------------------+
| address | CREATE TABLE address (
  id int NOT NULL AUTO_INCREMENT,
  uid int NOT NULL,
  email_address varchar(32) NOT NULL,
  PRIMARY KEY (id),
  KEY uid (uid),
  CONSTRAINT address_ibfk_1 FOREIGN KEY (uid) REFERENCES user_account (id)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=gbk |
+---------+------------------------------------------+

$show create table user_account;
+--------------+------------------------------------+
| Table        | Create Table                       |
+--------------+------------------------------------+
| user_account | CREATE TABLE user_account (
  id int NOT NULL AUTO_INCREMENT,
  username varchar(30) DEFAULT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=gbk |
+--------------+-------------------------------------+
1 row in set (0.00 sec)

插入数据

使用 insert(...).values(...)形式为数据库插入数据

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, insert

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 插入一条普通数据
    conn.execute(insert(user_table).values(id=1, username="lczmx"))
    # 插入外键等数据
    conn.execute(insert(address_table).values(uid=1, email_address="lczmx@foxmail.com"))

    # 自动生成value, 不需要我们手动指定

    conn.execute(insert(user_table),
                 [{"username": "张三"},
                  {"username": "李四"},
                  {"username": "王五"},
                  {"username": "赵六"},
                  ])

    conn.commit()

SQLAlchemy还提供了更复杂的用法, 见: Inserting Rows with Core

注意: 插入数据没有返回值

删除数据

使用 delete(...).where(...)的形式删除数据

目前的表数据:

select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join  address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | lczmx@foxmail.com |
|   2 | 张三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 赵六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, delete

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般删除
    # user_table.c 获取的是 列数据
    result1 = conn.execute(delete(user_table).where(user_table.c.id == 3))
    print(f"受影响行数: {result1.rowcount}")  # 受影响行数: 1

    # and 删除
    result2 = conn.execute(delete(user_table).where(user_table.c.username == "张三", user_table.c.id == 2))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    conn.commit()

.rowcount属性获取受影响的行数

更多见: The delete() SQL Expression Construct

更新数据

使用 update(...).where(...).values(...)的形式更新数据

select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join  address as a on u.id=a.uid;

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | lczmx@foxmail.com |
|   2 | 张三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 赵六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, update, bindparam, select

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".

                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般更新
    result1 = conn.execute(update(user_table).where(
        user_table.c.username == "王五").values(username="王老五"))
    print(f"受影响行数: {result1.rowcount}")  # 受影响行数: 1

    # 更新数据 加上 原来的数据
    result2 = conn.execute(
        update(user_table).where(user_table.c.username == "赵六").values(
            username=user_table.c.username + "一号"))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    # 以字典的形式, 替换更新多个值
    result3 = conn.execute(
        update(user_table).where(user_table.c.username == bindparam('old_name')).values(
            username=bindparam('new_name')),
        [
            {"old_name": "张三", "new_name": "新张三"},
            {"old_name": "李四", "new_name": "新李四"},
        ]
    )

    print(f"受影响行数: {result3.rowcount}")  # 受影响行数: 2

    # 以 子查询 的方式 更新数据
    scalar_subq = (
        select(address_table.c.email_address).

            where(address_table.c.uid == user_table.c.id).

            order_by(address_table.c.id).

            limit(1).

            scalar_subquery()
    )
    # 将email_address的值 赋给 username
    update(user_table).values(username=scalar_subq)

"""
    -- 以上查询, 相当于:
    UPDATE user_account SET username=(SELECT address.email_address
    FROM address
    WHERE address.uid = user_account.id ORDER BY address.id
    LIMIT :param_1)
"""
    conn.commit()

修改后的结果:

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | lczmx@foxmail.com |
|   2 | 新张三   | NULL | NULL              |
|   3 | 新李四   | NULL | NULL              |
|   4 | 王老五   | NULL | NULL              |
|   5 | 赵六一号 | NULL | NULL              |
+-----+----------+------+-------------------+

更多见: Updating and Deleting Rows with Core

查询数据

由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

处理查询返回的数据

我们执行 conn.execute方法的结果为: CursorResult对象
其本质上是继承与 Result对象, 其 使用方式见:Result

例子:
假如查询的表:

mysql> select * from user_account;
+----+----------+
| id | username |
+----+----------+
|  9 | lczmx    |
| 10 | jack     |
| 11 | tom      |
| 12 | mike     |
+----+----------+
4 rows in set (0.00 sec)

mysql>

利用SQLAlchemy获取数据:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 执行
    result = conn.execute(text("select * from user_account;"))

    for row in result.all():
        # 使用f-strings 格式化字符串
        print(f"id: {row.id:3}, username: {row.username:20}")
    # 打印的结果:
"""
    id:   9, username: lczmx
    id:  10, username: jack
    id:  11, username: tom
    id:  12, username: mike
"""
    conn.commit()

ORM

和Core一样, ORM也有一定的使用步骤:

  1. 配置数据库连接, 见上文:数据库的连接的格式
  2. 创建会话
  3. 创建表
  4. 使用接口, 增删改查数据
  5. 拿到返回数据, 执行其他代码

在学习SQLAlcehmy的ORM之前, 建议先了解一些概念, 以免后面会混淆

  1. 会话 Session
    会话是SQLAlchemy ORM与数据库的交互对象
    它可以管理建立连接engine, 并为通过会话加载或与会话关联的对象提供标识映射 (identity map)
    在使用时与 Connection非常相似, 你可以对比着使用
  2. Base
    通过 sqlalchemy.orm.declarative_base创建
    作为定义表的基类, 内部有包含 MetaData对象
    可以类似于 Django一样定义表

SQLAlchemy中, session是一个连接池, 的由其管理, 因此, 假如我们需要操作数据库的话, 需要在 session中拿到 Connection(连接)

创建会话

SQLAlchemy提供了两种创建会话的方法:

  1. sqlalchemy.orm.Session
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

创建引擎
engine = create_engine('postgresql://scott:tiger@localhost/')

创建会话
以下with可以简写成 with Session(engine) as session, session.begin():
with Session(engine) as session:
    # 开启自动提交
    with session.begin():
        # add方法 会将some_object 保存到数据库
        # session.add(some_object)
        # session.add(some_other_object)
        pass

  1. sqlalchemy.orm.sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

创建引擎
engine = create_engine('postgresql://scott:tiger@localhost/')

创建session
Session = sessionmaker(engine)

一般使用
with Session() as session:
    # session.add(some_object)
    # session.add(some_other_object)
    # 提交
    session.commit()

自动提交
with Session.begin() as session:
    # session.add(some_object)
    # session.add(some_other_object)
    pass

虽然有两种方法创建会话, 但我们一般使用 sessionmaker创建会话

另外补充一下 session的其它使用方式:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://scott:tiger@localhost/')

Session = sessionmaker(engine)

从连接指定到session
with engine.connect() as connection:
    with Session(bind=connection) as session:
        # 一些操作
        pass

下面列出 session的一些常用方法, 增删改查数据时要用到

方法 参数 描述 add instance

下次刷新操作时, 将 instance

保留到数据库中 delete instance

下次刷新操作时, 将 instance

从数据库中删除 begin subtransactions nested _subtrans

开始事务 rollback

无 回滚当前事务 commit

无 提交当前事务 close

无 关闭此Session execute statement params execution_option bind_arguments

等 执行SQL表达式构造 query *entities **kwargs

返回 Query

对象, 可用于查询数据 refresh instance attribute_names with_for_update instance

执行刷新操作

例子:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1)
with Session(engine) as session:
    result = session.execute(stmt)
    print(result.all())
    # [(2, 'name2'), (3, 'name2')]

    # ROLLBACK

在ORM中创建表

使用ORM时, 我们也需要 MetaData, 不同的是, 我们是通过 sqlalchemy.orm.registry构造的. 而且, 我们不需要像 Core那样直接声明 Table, 而是继承某个公共基类 (Base), 添加属性即可. 有两种方式定义基类.

方式一:

from sqlalchemy.orm import registry
mapper_registry = registry()
print(mapper_registry.metadata)  # MetaData对象
公共基类
Base = mapper_registry.generate_base()

方法二:

from sqlalchemy.orm import declarative_base

内部 return registry(...).generate_base(...)
Base = declarative_base()

现在你可以像在 Django ORM中一样, 定义表并在数据库中创建表, 每一个 Column表示一列数据, 关于 Column的写法, 见: Column定义

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年龄")
    gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")

class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")

class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教师名")

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

"""
-- 对于sql
CREATE TABLE student (
    sid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(32) NOT NULL COMMENT '姓名',
    age SMALLINT NOT NULL COMMENT '年龄',
    gender BOOL NOT NULL COMMENT '性别, True: 男, False: 女',
    PRIMARY KEY (sid)
)
CREATE TABLE teacher (
    tid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(10) NOT NULL COMMENT '教师名',
    PRIMARY KEY (tid)
)
CREATE TABLE course (
    cid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(10) NOT NULL COMMENT '科目名',
    tid INTEGER COMMENT '课程教师',
    PRIMARY KEY (cid),
    FOREIGN KEY(tid) REFERENCES teacher (tid)
)
CREATE TABLE score (
    sid INTEGER NOT NULL AUTO_INCREMENT,
    score SMALLINT NOT NULL COMMENT '成绩',
    student_id INTEGER COMMENT '成绩所属学生',
    course_id INTEGER COMMENT '成绩所属科目',
    PRIMARY KEY (sid),
    FOREIGN KEY(student_id) REFERENCES student (sid),
    FOREIGN KEY(course_id) REFERENCES course (cid)
)

"""

Base.metadataMetaData 对象, 常用的 MetaData方法见: MetaData

注: 你通过 Student.__table__属性可以查看 Table, 也可以通过 Student.name访问某一列
你也可以通过 __init__显示定义某些列

增删改查数据

插入数据

接上文 “在ORM中创建表” 中的表

1.x的接口与2.0的接口一样, 都是调用 session.add(instance)方法添加到数据库 (add方法下次刷新操作时, 将 instance保存到数据库)
注意: 自动生成的数据, 在未插入到数据库之前, 都为 None, 如: 自动生成的主键

你也可以调用 add_all(instance1, instance2, ...)方法, 区别只是插入一条和多条数据而已

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.orm import Session
from typing import Any

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年龄")
    gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")

class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")

class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教师名")

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

一般将 添加到数据库 封装成一个函数
def create_data(db: Session, target_cls: Any, **kwargs):
    try:
        cls_obj = target_cls(**kwargs)
        # 添加一个
        db.add(cls_obj)
        # 添加多个:
        # db.add_all([obj1, obj2, ...])

        db.commit()
        # 手动将 数据 刷新到数据库
        db.refresh(cls_obj)
        return cls_obj
    except Exception as e:
        # 别忘记发生错误时回滚
        db.rollback()
        raise e

session = SessionLocal()

-------------- 创建学生数据
student = create_data(session, Student, sid=1, name="张三", age=22, gender=True)

-------------- 创建教师数据
teacher = create_data(session, Teacher, tid=1, name="语文老师")

-------------- 创建课程数据
course = create_data(session, Course, cid=1, name="语文", tid=teacher.tid)

-------------- 创建成绩数据
score = create_data(session, Score, sid=1, score=89, student_id=student.sid, course_id=course.cid)

注意: 自动生成主键时, 只有在刷新到数据库中后, 才能获取主键

总的来说, 插入数据代码一般为:

1. 实例化一个表类
db_city = CityTable(....)

2. 调用session的add方法
session.add(db_city)

3. 调用session的commit方法 提交事务
session.commit()

4. 手动调用session的refresh方法 将数据刷新到数据库
session.refresh(db_city)

删除数据

1.x的方法

主要步骤是先查询再删除, 一般形式为: session.query(...).filter(...).delete()

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 方法一: 调用 session.delete 方法
    s = session.query(Score).filter(Score.score == 59).first()
    session.delete(s)

    # 方法二: 查询后直接删除
    session.query(Score).filter(Score.score == 59).delete()
    session.commit()

2.0的方法

像Core一样删除数据, 即 delte(...).where(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import select, delete

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    session.execute(
        delete(Score).where(Score.sid == 1)
    )
    session.commit()

修改数据

1.x的方法

主要步骤是先查询再更新, 即: session.query(...).filter(...).update(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    row = session.query(Score).filter(Score.score == 59).update({"score": 60})

    print(f"修改的行数: {row}")
    session.commit()

2.0的方法

同样和Core一样, 使用 update(...).where(...).values(...)的形式更新数据

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import update, bindparam

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 一般更新
    result1 = session.execute(update(Score).where(Score.score == 59).values(score=60))
    print(f"受影响行数: {result1.rowcount}")

    # # 更新数据 加上 原来的数据
    result2 = session.execute(
        update(Score).where(Score.score == 59).values(score=Score.score + 1))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    # 以字典的形式, 替换更新多个值
    result3 = session.execute(
        update(Score).where(Score.score == bindparam('old_score')).values(score=bindparam('new_score')),
        [
            {"old_score": 59, "new_score": 60},
        ]
    )

    print(f"受影响行数: {result3.rowcount}")

    session.commit()

同样 .rowcount属性获取受影响行数

查询数据

1.x的方法

在SQLAlchemy1.x查询方式中, 使用 Query对象进行查询, 类似于 Django ORM的管理器, 可以较为简单地查询数据
假如要查询的表如下:

class User(Base):
    __tablename__ = "User"  # 设置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '' % self.username

表的数据:

mysql> select * from User;
+-----+----------+-------------------+------+
| uid | username | email             | tags |
+-----+----------+-------------------+------+
|   1 | 张三     | zhangesan@xx.com  | 热情 |
|   2 | 李四     | lisi@xx.com       | 热情 |
|   3 | 王五     | wangwu@xx.com     | 开朗 |
|   4 | lczmx    | lczmx@foxmail.com | 热情 |
+-----+----------+-------------------+------+
4 rows in set (0.10 sec)

mysql>

使用例子:

from sqlalchemy import Column, Integer, create_engine, String
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import not_, or_, desc

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class User(Base):
    __tablename__ = "User"  # 设置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '' % self.username

Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # ------------------  查询所有User数据
    session.query(User).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
"""

    # ------------------  查询有多少条数据
    session.query(User).count()
"""
    对应SQL
    SELECT count(*) AS count_1 FROM (SELECT User.uid AS User_uid,
    User.username AS User_username, User.email AS User_email,
    User.tags AS User_tags FROM User) AS anon_1
"""

    # ------------------  查询第1条数据
    session.query(User).first()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    LIMIT 1
"""

    # ------------------  根据主键查询
    session.query(User).get(1)
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.uid = 1
"""

    # ------------------  简单查询, 使用 关键字实参 的形式来设置字段名
    session.query(User).filter_by(uid=1).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.uid = 1
"""

    # ------------------  复杂查询, 可以多个表一起,使用 恒等式'==' 等形式 来设置条件
    session.query(User).filter(User.uid == 1).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.uid = 1
"""

    # ------------------  filter 查询开头
    session.query(User).filter(User.username.startswith("l")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE (User.username LIKE concat('l', '%%'))
"""

    # ------------------  filter 查询结尾
    session.query(User).filter(User.username.endswith("x")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE (User.username LIKE concat('%%', 'x'))
"""

    # ------------------  filter 查询是否包含
    session.query(User).filter(User.username.contains("lcz")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE (User.username LIKE concat(concat('%%', "lcz", '%%')))
"""

    # ------------------  filter 模糊查询
    session.query(User).filter(User.username.like("%cz%")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.username LIKE "%cz%"
"""

    # ------------------  filter 条件取反 (not)
    session.query(User).filter(not_(User.username == "lczmx")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.username != "lczmx"
"""

    # ------------------  filter条件 或 (or), 默认为and
    session.query(User).filter(
        or_(User.uid == 1, User.uid == 3), ).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.uid = 1 OR User.uid = 3
"""

    # ------------------  filter条件 and or not 一起使用
    session.query(User).filter(or_(User.uid == 1, User.uid == 4), User.username == "lczmx",
                               not_(User.email == "wangwu@xx.com")).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE (User.uid = 1 OR User.uid = 4)
    AND User.username = "lczmx" AND User.email = "lczmx@foxmail.com"
"""

    # ------------------   filter 取反查询
    session.query(User).filter(User.username != "lczmx").all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.username != "lczmx";
"""

    # ------------------  查询uid为[1, 3, 5, 7, 9]的用户
    session.query(User).filter(User.uid.in_([1, 3, 5, 7, 9])).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    WHERE User.uid IN (1, 3, 5, 7, 9)
"""

    # ------------------  分组查询
    # !! 注意不是query(User), 因为Query(User)对应的SQL为:
    # SELECT User.uid AS User_uid, User.username AS User_username,
    # User.email AS User_email, User.tags AS User_tags
    session.query(User.tags).group_by(User.tags).all()
"""
    对应SQL
    SELECT User.tags AS User_tags FROM User GROUP BY User.tags
"""

    # ------------------  排序 顺序
    session.query(User).order_by(User.uid).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags
    FROM User ORDER BY User.uid
"""

    # ------------------  排序 倒序
    session.query(User).order_by(desc(User.uid)).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    ORDER BY User.uid DESC
"""

    # ------------------ 去重
    session.query(User.tags).distinct().all()
"""
    对应SQL:
    SELECT DISTINCT User.tags AS User_tags FROM User;
"""

    # ------------------ 取几条数据
    session.query(User).limit(2).all()
"""
    对应SQL:
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    LIMIT 2;
"""

    # ------------------ 跳过几条个数据
    session.query(User).offset(1).limit(2).all()
"""
    对应SQL
    SELECT User.uid AS User_uid, User.username AS User_username,
    User.email AS User_email, User.tags AS User_tags FROM User
    LIMIT 1, 2;
"""

关于query返回的对象
query的返回对象为 sqlalchemy.orm.query.Query对象, 你可以与 Result对象进行对比, 主要有以下的方法:

  • all()
    返回由表对象组成的列表
  • first()
    返回第一个结果 (表对象), 内部执行 limit SQL
  • one()
    只返回一行数据或引发异常 (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • one_or_none()
    最多返回一行数据或引发异常 (无数据时返回 None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • scalar()
    获取第一行的第一列数据. 如果没有要获取的行, 则返回 None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound

以上查询不包含一些连表操作, 见: relationship连表操作

2.0的方法

2.0的返回结果也是 Result对象, 关于 Result对象, 见: Result

注意: 由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

relationship连表操作

我们自定义外键时, 一般的步骤是:

  1. 子表使用 字段名= Column(Integer, ForeignKey('主表名.主键'))的格式定义
  2. 除此外, 还需要在主表中定义 relationship用于子表与主表之间的跨表查询

完整例子:

from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)
    # Article是类名 user是反向访问的属性名称
    article = relationship("Article", backref="user")
"""
    article = relationship("Article", backref="user")
    相当于:
    class User(Base):
        # Article是类名 user是反向访问的属性名称
        article = relationship("Article", back_populates="user")

    class Article(Base):
        # User是类名 addresses是反向访问的属性名称
        user = relationship("User", back_populates="addresses")

"""

    def __repr__(self):
        return "" % self.username

class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "" % self.title

Base.metadata.create_all(bind=engine)

关于relationship
relationship在定义外键时, 有非常重要的作用, 如: 1. 跨表操作 2. 设置删除主表数据时子表的值
relationship的参数很多, 这里只列出常用的几个, 全部参数见文档: relationship

  • back_populates
    指定反向访问的属性名称
  • backref
    快捷设置两个 relationship (设置 back_populates的话, 要设置两个表)
    SQLAlchemy完全入门
  • cascade
    用于控制修改数据时的选项值 说明 save-update 默认选项, 在添加一条数据的时候,会把其他和它相关联的数据都添加到数据库中。这种行为就是 save-update属性影响的 delete 表示当删除某一个模型中的数据的时候,是否也删除掉使用 relationship和它关联的数据 delete-orphan 表示当对一个ORM对象解除了父表中的关联对象的时候,自己便会被删除掉。当然如果表中的数据被删除,自己也会被删除。这个选项只能用在一对多上,不能用在多对多以及多对一上。并且还需要在子表中的 relationship中,增加一个 single_parent=True的参数 merge 默认选项, 当在使用 session.merge,合并一个对象的时候,会将使用了 relationship相关联的对象也进行 merge操作 expunge 移除操作的时候,会将相关联的对象也进行移除。这个操作 只是从 session 中移除,并不会真正的从数据库中删除 all 是对 save-updatemergerefresh-expireexpungedelete几种的填写 比如:
articles = relationship("Article",cascade="save-update,delete")
  • order_by
    子表列表的排序方式
倒序
article = relationship("Article", backref="user", order_by="Article.aid.desc()")
正序
article = relationship("Article", backref="user", order_by="Article.aid")

本部分包括Core与ORM的跨表增删改查操作

select user.uid as uid, user.username, user.password,
 (select GROUP_CONCAT(article.title) from article
   where article.author_id = user.uid) as article
 from user;

+-----+----------+----------+---------------+
| uid | username | password | article       |
+-----+----------+----------+---------------+
|   1 | 张三     | 12345    | C++入门,C入门 |
|   2 | 李四     | 12346    | python入门    |
+-----+----------+----------+---------------+

GROUP_CONCAT可以让多行数据拼接成一行数据, 以 ,链接

通过relationship双向访问

即直接通过 relationship访问主表或子表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表访问主表, 返回主表对象
"""
    实质上内部执行的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE user.uid = %(pk_1)s
"""
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  #
    # ----------- 2.0 方法
    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  #

    # +++++++++++++++++++++++++ 主表访问子表, 返回子表列表
    # 实质是 sqlalchemy.orm.collections.InstrumentedList 对象
    # 是List的子类
"""
    实质上内部执行的SQL
    SELECT article.aid AS article_aid, article.title AS article_title,
    article.content AS article_content, article.author_id AS article_author_id
    FROM article
    WHERE %(param_1)s = article.author_id ORDER BY article.aid
"""
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)
    # [, ]
    # ----------- 2.0方法
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)
    # [, ]

通过relationship修改关联关系

上面例子中说过, 主表.relationship字段InstrumentedList对象 (类似于 List), 我们可以修改它, 然后调用 commit方法即可. 子表.relationship字段是对应的主表, 可以修改为自己想要的, 同样调用 commit方法即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表修改属于的主表
"""
    对于SQL
    UPDATE article
    SET author_id=%(author_id)s
    WHERE article.aid = %(article_aid)s
"""
    # ----------- 1.x方法
    lisi = session.query(User).get(2)

    article_1x = session.query(Article).first()
    print(article_1x.user)  #
    article_1x.user = lisi  # 改为
    session.commit()  # 记得提交
    # ----------- 2.0 方法
    zhangsan = session.execute(select(User).where(User.uid == 1)).first()

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  #
    article_20.Article.user = zhangsan.User  # 改为
    session.commit()  # 记得提交

    # +++++++++++++++++++++++++ 主表修改子表列表

    # ----------- 1.x方法 增加
    user_1x = session.query(User).first()
    # 添加一个新的子表数据
    # 会在Article中插入一条新的数据
    user_1x.article.append(Article(title="javascript 入门", content="console.log(hello world)"))
    session.commit()  # 记得提交

    # ----------- 2.0 方法 移除
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)  # [, , ]

    article_js = session.execute(select(Article).where(Article.aid == 4)).scalar()
    # 从主表中移除与子表的关系
    user_20.User.article.remove(article_js)
    session.commit()  # 记得提交

通过relationship修改子/主表数据

同样非常简单, 找到对应的类, 然后修改数据并 commit即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 通过子表修改对应主表的数据
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  #

    article_1x.user.username = "张三二号"
    session.commit()  # 记得提交
    # ----------- 2.0 方法

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  #

    article_20.Article.user.username = "张三"
    session.commit()  # 记得提交

    # +++++++++++++++++++++++++ 通过主表修改对应子表的数据
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)  # [, , ]

    user_1x.article[-1].title = "js入门"
    session.commit()  # 记得提交
    # ----------- 2.0 方法
    user_20 = session.execute(select(User)).scalar()

    print(user_20.article)  # [, , ]

    user_20.article[-1].title = "javascript 入门"
    session.commit()  # 记得提交

通过relationship查询数据

正向查询, 子表利用主表的条件查询, 使用 has, 条件和普通查询的条件一样
反向查询, 主表利用子表的条件查询, 使用 any, 条件和普通查询的条件一样

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 反向查询
"""
    对应的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE EXISTS (SELECT 1
    FROM article
    WHERE user.uid = article.author_id AND article.title LIKE 'python%')
"""
    # ********* 查询有以python开头的Article的User
    # ----------- 1.x方法
    user_1x = session.query(User).filter(User.article.any(Article.title.like("python%")))
    print(user_1x.all())  # []

    # ----------- 2.0方法
    user_20 = session.execute(
        select(User).where(User.article.any(Article.title.like("python%"))))
    print(user_20.all())  # [(,)]

    # +++++++++++++++++++++++++ 正向查询
"""
    对应的SQL
    SELECT article.aid AS article_aid, article.title AS article_title,
     article.content AS article_content, article.author_id AS article_author_id
    FROM article
    WHERE EXISTS (SELECT 1
    FROM user
    WHERE user.uid = article.author_id AND (user.username LIKE concat(concat('%%', '四', '%%')))
"""
    # ********* 查询User表中username有 四 的Article
    # ----------- 1.x方法
    article_1x = session.query(Article).filter(Article.user.has(User.username.contains("四")))
    print(article_1x.all())  # []

    # ----------- 2.0方法
    article_20 = session.execute(
        select(Article).where(Article.user.has(User.username.contains("四"))))
    print(article_20.all())  # [(,)]

建立多对多关系

可以通过 relationship便捷使用多对多关系

from sqlalchemy import Table, Text, Column, ForeignKey

第三张表
post_keywords = Table("post_keywords", Base.metadata,
                      Column('post_id', ForeignKey('posts.id'), primary_key=True),
                      Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
                      )

class BlogPost(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    headline = Column(String(255), nullable=False)
    body = Column(Text)

    # 多对多关系 BlogPostKeyword
    keywords = relationship('Keyword',
                            secondary=post_keywords,
                            back_populates='posts')

    def __init__(self, headline, body):
        self.headline = headline
        self.body = body

    def __repr__(self):
        return "BlogPost(%r, %r)" % (self.headline, self.body)

class Keyword(Base):
    __tablename__ = 'keywords'
    id = Column(Integer, primary_key=True)
    keyword = Column(String(50), nullable=False, unique=True)
    # 多对多关系 KeywordBlogPost
    posts = relationship('BlogPost',
                         secondary=post_keywords,
                         back_populates='keywords')

    def __init__(self, keyword):
        self.keyword = keyword

Base.metadata.create_all(bind=engine)

添加操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 比如添加一篇文章, 为文章添加多个keyword
    blog = BlogPost(headline="起飞!", body="我是文章的内容")
    # 获取子表列表 并 append
    blog.keywords.append(Keyword("新闻"))
    blog.keywords.append(Keyword("热门"))
    session.add(blog)
    session.commit()
    # ------- 添加第二篇文章
    keyword1 = session.execute(
        select(Keyword).filter_by(keyword="热门")
    ).scalar()
    new_blog = BlogPost(headline="震惊!", body="我是第二篇文章的内容")
    new_blog.keywords.append(keyword1)
    session.add(new_blog)
    session.commit()

查询操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 查询所有 "热门" 文章
    blog = session.execute(
        select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "热门"))
    ).all()

    print(blog)
    # [(BlogPost('起飞!', '我是文章的内容'),),
    # (BlogPost('震惊!', '我是第二篇文章的内容'),)]

其他操作也像一般的 relationship一样操作

项目示范

一般来说, 我们使用SQLAlchemy的步骤大多相同, 下面
文件结构:

+--- database.py  # 用于 初始化session 和 公共基类
+--- models.py   # 定义表
+--- crud.py     # 封装增删改查的方法
+--- schemas.py  # 定义pydantic模型, 用于格式化已经取得的数据 [可选]
+--- main.py    # 执行主逻辑

database.py 初始化 session和 公共基类:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

数据库的URL
替换成自己的
SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'

创建引擎
engine = create_engine(
    # echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
    SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True
)

SessionLocal用于对数据的增删改查
flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)

创建基本映射类, 用于创建表
Base = declarative_base(bind=engine, name='Base')

models.py 定义表结构

from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
导入公共基类
from .database import Base

class City(Base):
    __tablename__ = 'city'

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    province = Column(String(100), unique=True, nullable=False, comment='省/直辖市')
    country = Column(String(100), nullable=False, comment='国家')
    country_code = Column(String(100), nullable=False, comment='国家代码')
    country_population = Column(BigInteger, nullable=False, comment='国家人口')
    data = relationship('Data', back_populates='city')  # 'Data'是关联的类名;back_populates来指定反向访问的属性名称

    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    __mapper_args__ = {"order_by": country_code}  # 默认是正序,倒序加上.desc()方法

    def __repr__(self):
        return f'{self.country}_{self.province}'

class Data(Base):
    __tablename__ = 'data'

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    city_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市')  # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名
    date = Column(Date, nullable=False, comment='数据日期')
    confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量')
    deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量')
    recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量')
    city = relationship('City', back_populates='data')  # 'City'是关联的类名;back_populates来指定反向访问的属性名称

    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    __mapper_args__ = {"order_by": date.desc()}  # 按日期降序排列

    def __repr__(self):
        return f'{repr(self.date)}:确诊{self.confirmed}例'

crud.py 封装增删改查的方法:

from sqlalchemy.orm import Session

import models
import schemas

def get_city(db: Session, city_id: int):
    return db.query(models.City).filter(models.City.id == city_id).first()

def get_city_by_name(db: Session, name: str):
    return db.query(models.City).filter(models.City.province == name).first()

def get_cities(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.City).offset(skip).limit(limit).all()

def create_city(db: Session, city: schemas.CreateCity):
"""
    创建City数据
"""
    db_city = models.City(**city.dict())
    db.add(db_city)
    db.commit()
    db.refresh(db_city)
    return db_city

def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
    if city:
        # 外键关联查询,这里不是像Django ORM那样Data.city.province
        return db.query(models.Data).filter(models.Data.city.has(province=city))
    return db.query(models.Data).offset(skip).limit(limit).all()

def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
"""
    创建Data数据
"""
    db_data = models.Data(**data.dict(), city_id=city_id)
    db.add(db_data)
    db.commit()
    db.refresh(db_data)
    return db_data

schemas.py 定义pydantic模型, 用于格式化已经取得的数据:

from datetime import date as date_
from datetime import datetime

from pydantic import BaseModel

class CreateData(BaseModel):
    date: date_
    confirmed: int = 0
    deaths: int = 0
    recovered: int = 0

class CreateCity(BaseModel):
    province: str
    country: str
    country_code: str
    country_population: int

class ReadData(CreateData):
    id: int
    city_id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True

class ReadCity(CreateCity):
    id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True

main.py 执行主逻辑:

from sqlalchemy.orm import Session

import crud
import schemas

from database import engine, Base, SessionLocal
from models import City, Data

创建表, 已经存在的将被忽略
Base.metadata.create_all(bind=engine)
db = SessionLocal()
调用 crud
db_city = crud.get_city_by_name(db, name="广东省")
if db_city:
    raise Exception("City already registered")

创建数据
city = City(...)
crud.create_city(db=db, city=city)

查询数据详解

只有涉及SQL 的查询数据, 必然绕不开 FROM WHERE SELECT GROUP BY HAVING ORDER BY LIMIT INNER JOIN ... ON LEFT JOIN ... ON UNION这些SQL查询语法, 那么它们在SQLAlchemy中是如何表示的呢? 实际上和SQL语句一样, SQLAlchemy的语法也有 select where join order_by group_by having 等 …

注意: 这些方法在Core与ORM中都适用

刚接触可能会觉得比较复杂, 但是假如有SQL基础的话, 用起来比较简单.

要查询的表结构为:

+++++++++++++++++++ 使用Core定义 +++++++++++++++++++
from sqlalchemy import MetaData, create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text

 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".

                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user",
    metadata_obj,
    Column("uid", Integer, primary_key=True, autoincrement=True),
    Column("username", String(16), nullable=False),
    Column("password", String(32), nullable=False),
)

article_table = Table(
    'article',
    metadata_obj,
    Column("aid", Integer, primary_key=True, autoincrement=True),
    Column("title", String(36), nullable=False),
    Column("content", Text, nullable=False),
    Column("author_id", Integer, ForeignKey("user.uid"))
)

metadata_obj.create_all(bind=engine)

+++++++++++++++++++ 使用ORM定义  +++++++++++++++++++
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".

                       format(**DATABASE_CONFIG), echo=True, future=True)

class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)

    # Article是类名 user是反向访问的属性名称
    article = relationship("Article", backref="user")

    def __repr__(self):
        return "" % self.username

class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "" % self.title

Base.metadata.create_all(bind=engine)

表数据为:

mysql> select  * from article, user where user.uid=article.author_id;
+-----+------------+--------------------+-----------+-----+----------+----------+
| aid | title      | content            | author_id | uid | username | password |
+-----+------------+--------------------+-----------+-----+----------+----------+
|   1 | C++入门    | c++ hello world    |         1 |   1 | 张三     | 12345    |
|   2 | C入门      | c hello world      |         1 |   1 | 张三     | 12345    |
|   3 | python入门 | print(hello world) |         2 |   2 | 李四     | 12346    |
+-----+------------+--------------------+-----------+-----+----------+----------+
3 rows in set (0.12 sec)

select

该函数可以指定要查询的表、列及添加别名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ 一般查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password FROM user
WHERE user.uid = 2
"""
-------------- Core
result_core_1 = conn.execute(select(user_table).where(user_table.c.uid == 2))
print(result_core_1.all())  # [(2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid == 2))
print(result_orm_1.all())  # [(,)]

2 ++++++++++++++++++ 查询全部列 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
"""
-------------- Core
result_core_2 = conn.execute(select(user_table))
print(result_core_2.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_2 = session.execute(select(User))
print(result_orm_2.all())  # [(,), (,)]

3 ++++++++++++++++++ 查询指定的列 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
"""
-------------- Core
result_core_3 = conn.execute(select(user_table.c.username))
print(result_core_3.all())  # [('张三',), ('李四',)]
-------------- ORM
result_orm_3 = session.execute(select(User.username))
print(result_orm_3.all())  # [('张三',), ('李四',)]

4 ++++++++++++++++++ 两个表 联合查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user, article
WHERE user.uid = article.author_id
"""
-------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).where(
        user_table.c.uid == article_table.c.author_id))
[('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_core_4.all())
-------------- ORM
result_orm_4 = session.execute(select(User.username, Article.title).where(
    User.uid == Article.author_id))
[('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_orm_4.all())

5 ++++++++++++++++++ 为列添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user
WHERE user.uid = 1
"""
-------------- Core
result_core_5 = conn.execute(
    select((user_table.c.username).label("name")).where(
        user_table.c.uid == 1))
print(result_core_5.all())  # [('张三',)]
-------------- ORM
result_orm_5 = session.execute(
    select((User.username).label("name")).where(
        User.uid == 1))
print(result_orm_5.all())  # [('张三',)]

利用别名 取值
print(result_core_5.first().name)  # 张三
print(result_orm_5.first().name)  # 张三

6 ++++++++++++++++++ 为表添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user_1.username
FROM user AS user_1
WHERE user_1.uid = 1
"""
-------------- Core
user_table_core_alias = user_table.alias()
result_core_6 = conn.execute(
    select(user_table_core_alias.c.username).where(
        user_table_core_alias.c.uid == 1))
print(result_core_6.all())  # [('张三',)]
-------------- ORM
from sqlalchemy.orm import aliased

user_table_orm_alias = aliased(User)
result_orm_6 = session.execute(
    select(user_table_orm_alias.username).where(
        user_table_orm_alias.uid == 1))
print(result_orm_6.all())  # [('张三',)]

7 ++++++++++++++++++ 与text 结合, 添加额外列 ++++++++++++++++++
"""
对应SQL
SELECT now() AS now, user.username, '自定义字符'
FROM user
"""

from sqlalchemy import literal_column, text

literal_column表示一列数据
text可以转化成SQL

-------------- Core
result_core_7 = conn.execute(
    select(literal_column("now()").label("now"), user_table.c.username, text("'自定义字符'")))
print(result_core_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 54, 44), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定义字符')]
"""
-------------- ORM
result_orm_7 = session.execute(
    select(literal_column("now()").label("now"), User.username, text("'自定义字符'")))
print(result_orm_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 59, 42), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定义字符')]
"""

where

过滤数据, Coretable.c.xxx获取行, 假如是ORM的话为: 类名.属性名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ 条件默认为and ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid < 3 AND user.uid > 1
"""
-------------- Core
result_core_1 = conn.execute(
    select(user_table).where(user_table.c.uid < 3, user_table.c.uid > 1))
print(result_core_1.all())  # [(2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1))
print(result_orm_1.all())  # [(,)]

2 ++++++++++++++++++ 修改条件为not or ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四"
"""
from sqlalchemy import or_, not_

-------------- Core
result_core_2 = conn.execute(
    select(user_table).where(
        or_(user_table.c.uid == 1, user_table.c.uid == 2),
        not_(user_table.c.username == "李四"),
    ))
print(result_core_2.all())  # [(1, '张三', '12345')]
-------------- ORM
result_orm_2 = session.execute(select(User).where(
    or_(User.uid == 1, User.uid == 2),
    not_(User.username == "李四")))
print(result_orm_2.all())  # [(,)]

3 ++++++++++++++++++ startswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('张', '%%'))
"""

-------------- Core
result_core_3 = conn.execute(
    select(user_table).where(
        user_table.c.username.startswith("张")))
print(result_core_3.all())  # [(1, '张三', '12345')]
-------------- ORM
result_orm_3 = session.execute(select(User).where(
    User.username.startswith("张")))
print(result_orm_3.all())  # [(,)]

4 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('%%', '三'))
"""

-------------- Core
result_core_4 = conn.execute(
    select(user_table).where(
        user_table.c.username.endswith("三")))
print(result_core_4.all())  # [(1, '张三', '12345')]
-------------- ORM
result_orm_4 = session.execute(select(User).where(
    User.username.endswith("三")))
print(result_orm_4.all())  # [(,)]

5 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat(concat('%%', '三', '%%'))
"""

-------------- Core
result_core_5 = conn.execute(
    select(user_table).where(
        user_table.c.username.contains("三")))
print(result_core_5.all())  # [(1, '张三', '12345')]
-------------- ORM
result_orm_5 = session.execute(select(User).where(
    User.username.contains("三")))
print(result_orm_5.all())  # [(,)]

6 ++++++++++++++++++ like ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.username LIKE '%三'
"""

-------------- Core
result_core_6 = conn.execute(
    select(user_table).where(
        user_table.c.username.like("%三")))
print(result_core_6.all())  # [(1, '张三', '12345')]
-------------- ORM
result_orm_6 = session.execute(select(User).where(
    User.username.like("%三")))
print(result_orm_6.all())  # [(,)]

7 ++++++++++++++++++ in ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IN (1, 2)
"""

-------------- Core
result_core_7 = conn.execute(
    select(user_table).where(
        user_table.c.uid.in_((1, 2))
    ))
print(result_core_7.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_7 = session.execute(
    select(User).where(
        User.uid.in_((1, 2))
    ))
print(result_orm_7.all())  # [(,), (,)]

8 ++++++++++++++++++ between ... and ... ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid BETWEEN 1 AND 2
"""

-------------- Core
result_core_8 = conn.execute(
    select(user_table).where(
        user_table.c.uid.between(1, 2)
    ))
print(result_core_8.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_8 = session.execute(
    select(User).where(
        User.uid.between(1, 2)
    ))
print(result_orm_8.all())  # [(,), (,)]

9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IS NOT NULL
"""

from sqlalchemy import not_

-------------- Core
result_core_9 = conn.execute(
    select(user_table).where(
        # user_table.c.uid.is_(None),  # IS NULL
        not_(user_table.c.uid.is_(None)),  # IS NOT NULL
    ))
print(result_core_9.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_9 = session.execute(
    select(User).where(
        # User.uid.is_(None),  # IS NULL
        not_(User.uid.is_(None)),  # IS NOT NULL
    ))
print(result_orm_9.all())  # [(,), (,)]

除此之外, 你还可以使用一些其他运算符, 详情见: Operator Reference

order_by

order_by用于排序

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ 根据某一列排序 (默认升序) ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid
"""

-------------- Core
result_core_1 = conn.execute(
    select(user_table).order_by(user_table.c.uid))
print(result_core_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(
    select(User).order_by(User.uid))
print(result_orm_1.all())  # [(,), (,)]

2 ++++++++++++++++++ 手动指定升序/降序 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid ASC/DESC
"""

-------------- Core
result_core_2 = conn.execute(
    # select(user_table).order_by(user_table.c.uid.asc())  # 升序
    select(user_table).order_by(user_table.c.uid.desc())  # 降序
)
print(result_core_2.all())
[(1, '张三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '张三', '12345')]
-------------- ORM
result_orm_2 = session.execute(
    # select(User).order_by(User.uid.asc())  # 升序
    select(User).order_by(User.uid.desc())  # 降序
)
print(result_orm_2.all())
[(,), (,)] / [(,), (,)]

3 ++++++++++++++++++ 根据别名排序 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user ORDER BY name DESC
"""
from sqlalchemy import desc, asc

-------------- Core
result_core_3 = conn.execute(
    select((user_table.c.username).label("name")).order_by(desc("name"))
)
print(result_core_3.all())  # [('张三',), ('李四',)]

-------------- ORM
result_orm_3 = session.execute(
    select((User.username).label("name")).order_by(desc("name"))
)
print(result_orm_3.all())  # [('张三',), ('李四',)]

group_by和having

group_by用于分组, having类似于 where, 但可以对已分组数据使用聚合函数

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ group_by一般使用 ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
"""

func 内有很多内置函数

-------------- Core
result_core_1 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(article_table.c.author_id)
)
print(result_core_1.all())  # [(2,), (1,)]

-------------- ORM
result_orm_1 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(Article.author_id)
)
print(result_orm_1.all())  # [(2,), (1,)]

2 ++++++++++++++++++ group by + having ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
HAVING count(article.author_id) > 1
"""

func 内有很多内置函数

-------------- Core
result_core_2 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(
        article_table.c.author_id).having(func.count(article_table.c.author_id) > 1)
)
print(result_core_2.all())  # [(2,)]

-------------- ORM
result_orm_2 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(
        Article.author_id).having(func.count(Article.author_id) > 1)
)
print(result_orm_2.all())  # [(2,)]

除了 count外, 还有其他的方法, 详情见: 内置函数

limit和offset

limit: 表示取几条数据, offset: 表示要跳过多少条数据

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ 仅使用LIMIT ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1
"""

-------------- Core
result_core_1 = conn.execute(select(user_table).limit(1))
print(result_core_1.all())  # [(1, '张三', '12345')]

-------------- ORM
result_orm_1 = session.execute(select(User).limit(1))
print(result_orm_1.all())  # [(,)]

2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1, 1
"""

-------------- Core
result_core_2 = conn.execute(select(user_table).limit(1).offset(1))
print(result_core_2.all())  # [(2, '李四', '12346')]

-------------- ORM
result_orm_2 = session.execute(select(User).limit(1).offset(1))
print(result_orm_2.all())  # [(,)]

去重

你可以在查询的时候使用SQL进行去重, 亦可以在取到数据后进行去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++
"""
对应SQL
SELECT DISTINCT article.author_id
FROM article
"""

-------------- Core
result_core_1 = conn.execute(
    select(article_table.c.author_id).distinct()
)
print(result_core_1.all())  # [(1,), (2,)]
-------------- ORM
result_orm_1 = session.execute(
    select(Article.author_id).distinct())
print(result_orm_1.all())  # [(1,), (2,)]

2 ++++++++++++++++++ 在结果中去重 ++++++++++++++++++
"""
对应SQL
SELECT article.author_id
FROM article
"""

-------------- Core
result_core_2 = conn.execute(select(article_table.c.author_id))
print(result_core_2.unique().all())  # [(1,), (2,)]

-------------- ORM
result_orm_2 = session.execute(select(Article.author_id))
print(result_orm_2.unique().all())  # [(1,), (2,)]

连接查询

&#x5185;&#x8FDE;&#x63A5;&#x67E5;&#x8BE2; &#x5916;&#x8FDE;&#x63A5;&#x67E5;&#x8BE2; &#x5B8C;&#x5168;&#x8FDE;&#x63A5;&#x67E5;&#x8BE2;三种连接查询方式, 在SQLAlchemy中分别对应着 join(...) join(..., isouter=True) join(..., full=True)

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式一 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_1 = conn.execute(
    select(user_table.c.username, article_table.c.title).join_from(user_table, article_table))
print(result_core_1.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_1 = session.execute(
    select(User.username, Article.title).join_from(User, Article))
print(result_orm_1.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

2 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式二 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_2 = conn.execute(
    select(user_table.c.username, article_table.c.title).join(article_table))
print(result_core_2.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_2 = session.execute(select(User.username, Article.title).join(Article))
print(result_orm_2.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

3 ++++++++++++++++++ 内连接查询 (手动指定join的表) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_3 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(user_table).join(article_table))
print(result_core_3.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_3 = session.execute(
    select(User.username, Article.title).select_from(User).join(Article))
print(result_orm_3.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

4 ++++++++++++++++++ 内连接查询 (手动指定on的条件) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(
        user_table).join(article_table, user_table.c.uid == article_table.c.author_id))
print(result_core_4.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_4 = session.execute(
    select(User.username, Article.title).select_from(
        User).join(Article, User.uid == Article.author_id))
print(result_orm_4.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

5 ++++++++++++++++++ 外连接查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_5 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, isouter=True))
print(result_core_5.all())
[(1, '张三', '12345', 'C入门'), (1, '张三', '12345', 'C++入门'), (2, '李四', '12346', 'python入门')]
-------------- ORM
result_orm_5 = session.execute(
    select(User, Article.title).join(Article, isouter=True))
print(result_orm_5.all())  # [(, 'C入门'), (, 'C++入门'), (, 'python入门')]

6 ++++++++++++++++++ 完全连接查询 ++++++++++++++++++
# !!! 注意: MYSQL 中没有  FULL OUTER JOIN, 执行时会报错
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

-------------- Core
result_core_6 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, full=True))
print(result_core_6.all())

-------------- ORM
result_orm_6 = session.execute(
    select(User, Article.title).join(Article, full=True))
print(result_orm_6.all())

注意: 使用ORM的方式进行连表操作时, 可以通过 relationship, 即不需要直接指定第二张表, 指定 relationship

比如:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

result_orm = session.execute(
    select(User.username, Article.title).select_from(User).join(User.article))
print(result_orm.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

UNION和UNION ALL

UNION ALL: 合并两个或多个 SELECT语句的结果, 结果不会去重
UNION: 合并两个或多个 SELECT语句的结果, 结果会去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, union_all, union

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ UNION ALL ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION ALL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 2
"""

-------------- Core
result_core_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_core_1_u = union_all(result_core_1_stmt1, result_core_1_stmt2)

result_core_1 = conn.execute(result_core_1_u)
print(result_core_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]

-------------- ORM
result_orm_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_orm_1_u = union_all(result_orm_1_stmt1, result_orm_1_stmt2)

result_orm_1 = session.execute(result_orm_1_u)
print(result_orm_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]

2 ++++++++++++++++++ UNION ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
"""

-------------- Core
result_core_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_core_2_u = union(result_core_2_stmt1, result_core_2_stmt2)

result_core_2 = conn.execute(result_core_2_u)
print(result_core_2.all())  # [(1, '张三', '12345')]

-------------- ORM
result_orm_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_u = union(result_orm_2_stmt1, result_orm_2_stmt2)

result_orm_2 = session.execute(result_orm_2_u)
print(result_orm_2.all())  # [(1, '张三', '12345')]

子查询

即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
SELECT * FROM data WHERE EXISTS (SELECT name FROM user);的查询
或使用 WITH temp AS (...) 作为临时表
注意: 在使用子查询后,SQL语句的查询性能变得非常糟糕, 至于如何取舍看个人权衡了

关于子查询的优化, 见: 深入理解MySql子查询IN的执行和优化

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

1 ++++++++++++++++++ EXISTS 子查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
WHERE EXISTS (SELECT count(article.author_id) AS count_1
FROM article GROUP BY article.author_id
HAVING count(article.author_id) >= 2)
"""

-------------- Core
subquery_core_1 = (select(func.count(article_table.c.author_id)).group_by(
    article_table.c.author_id).having(
    func.count(article_table.c.author_id) >= 2)).exists()

result_core_1 = conn.execute(select(user_table.c.username).where(subquery_core_1))
print(result_core_1.all())  # [('张三',), ('李四',)]

-------------- ORM
subquery_orm_1 = (select(func.count(Article.author_id)).group_by(
    Article.author_id).having(
    func.count(Article.author_id) >= 2)).exists()

result_orm_1 = session.execute(select(User.username).where(subquery_orm_1))
print(result_orm_1.all())  # [('张三',), ('李四',)]

2 ++++++++++++++++++ 其它 子查询 ++++++++++++++++++
"""
对应SQL
SELECT article.title, anon_1.username
FROM article, (SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1) AS anon_1
WHERE article.author_id = anon_1.uid
"""

-------------- Core
subquery_core_2 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).subquery()

result_core_2 = conn.execute(
    select(article_table.c.title, subquery_core_2.c.username).where(
        article_table.c.author_id == subquery_core_2.c.uid))
print(result_core_2.all())  # [('C++入门', '张三'), ('C入门', '张三')]

# -------------- ORM
subquery_orm_2 = select(User.uid, User.username).where(User.uid == 1).subquery()

result_orm_2 = session.execute(
    select(Article.title, subquery_orm_2.c.username).where(
        Article.author_id == subquery_orm_2.c.uid))
print(result_orm_2.all())  # [('C++入门', '张三'), ('C入门', '张三')]

3 ++++++++++++++++++ with 添加临时表 ++++++++++++++++++
"""
对应SQL
WITH anon_1 AS
(SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1)
 SELECT article.title, anon_1.username
FROM article, anon_1
WHERE article.author_id = anon_1.uid
"""

-------------- Core
subquery_core_3 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).cte()

result_core_3 = conn.execute(
    select(article_table.c.title, subquery_core_3.c.username).where(
        article_table.c.author_id == subquery_core_3.c.uid))
print(result_core_3.all())  # [('C++入门', '张三'), ('C入门', '张三')]

-------------- ORM
subquery_orm_3 = select(User.uid, User.username).where(User.uid == 1).cte()

result_orm_3 = session.execute(
    select(Article.title, subquery_orm_3.c.username).where(
        Article.author_id == subquery_orm_3.c.uid))
print(result_orm_3.all())  # [('C++入门', '张三'), ('C入门', '张三')]

从1.x迁移到2.0的接口

一些 Query的接口用起来还是特别好用的, 因此在2.0风格的接口中, 一些接口仍然可以使用

get根据主键查询

注意: 这是session的方法

---------------- 1.x
session.query(User).get(42)
---------------- 2.0
session.get(User, 42)

filter_by简单查询

注意: 这是select的方法

result = session.execute(
    select(User).filter_by(username="张三")
)
print(result.all())  # [(,)]

filter复杂查询

注意: 这是select的方法

result = session.execute(
    select(User).filter(User.username == "张三")
)
print(result.all())  # [(,)]

1.x与2.0的ORM接口对比

以下表格来自于: 2.0 Migration – ORM Usage

SQLAlchemy完全入门
SQLAlchemy完全入门

一些类的介绍

Result

表示从数据库中返回的结果, 一行数据使用 Row对象表示, 关于Row, 见: Row

从SQLAlchemy1.4开始, Core和ORM的结果(Result), 使用的接口相同

from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    #
    print(type(session_res))

with engine.connect() as conn:
    conn_res = conn.execute(
        text("select tid, name from teacher;")
    )
    #
    print(type(conn_res))

注意: 例子中的 teacher表的数据为:

+-----+----------+
| tid | name     |
+-----+----------+
|   1 | 语文老师 |
|   2 | 英语老师 |
|   3 | 数学老师 |
+-----+----------+

Result的全部方法

  • unique(strategy=None)
    去重, 但 需要注意何时调用, 应该在调用如 .all()这种生成 Row的方法之前调用, 否则返回的对象都没有 unique这个方法
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher;")
    )
    session_res2 = session.execute(
        text("select tid, name from teacher;")
    )

    session_res = session_res1.merge(session_res2)

    # [1, 2, 3, 1, 2, 3] ==> [1, 2, 3]
    print(session_res.scalars().unique().all())

  • all
    返回所有Row数据的列表, 之后的调用将返回一个空列表
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
   "username": "root",
   "password": "123456",
   "host": "localhost",
   "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                      echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
   session_res = session.execute(
       text("select tid, name from teacher;")
   )

   # ---------------------------------- 第一次调用all
   first_all = session_res.all()
   print(type(session_res.all()))  #
   for row in first_all:
       print(type(row))  #
       print(f"{row.tid}-{row.name}")

   # ---------------------------------- 第二次调用all
   print(session_res.all())  # []

  • fetchall()
    all方法一样
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    first_fetchall = session_res.fetchall()
    second_fetchall = session_res.fetchall()
    # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
    print(first_fetchall)
    # []
    print(second_fetchall)

  • fetchmany(size=None)
    取多行数据, size表示取多少行数据 , 当所有行都用完时, 返回一个空列表
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    # 一共3行数据

    # 取两行数据: [(1, '语文老师'), (2, '英语老师')]
    print(session_res.fetchmany(2))

    # 取一行数据: [(3, '数学老师')]
    print(session_res.fetchmany(1))

    # 没有数据了, 返回空列表: []
    print(session_res.fetchmany(1))
  • fetchone()
    取一行数据, 当所有行都用完时,返回 None
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
   "username": "root",
   "password": "123456",
   "host": "localhost",
   "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                      echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
   session_res = session.execute(
       text("select tid, name from teacher;")
   )
   while True:
       row = session_res.fetchone()
       if not row:
           break
       print(row)
"""
       (1, '语文老师')
       (2, '英语老师')
       (3, '数学老师')
"""

  • first()
    获取第一行数据, 关闭Result并丢弃其余行, 如果没有行,则不获取 (即返回值为 None)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )

    # (1, '语文老师')
    print(session_res.first())

    # !!! 由于Result已经关闭, 继续操作会报错:
    # sqlalchemy.exc.ResourceClosedError: This result object is closed.

    print(session_res.first())

  • one()
    只返回一行数据或引发异常, 并 关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher where tid=1;")
    )

    # (1, '语文老师')
    print(session_res.one())

  • one_or_none()
    最多返回一行数据或引发异常, 并 关闭Result (无数据时返回 None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher where tid < 1;")
    )
    # None
    print(session_res1.one_or_none())

    session_res2 = session.execute(
        text("select tid, name from teacher where tid = 1;")
    )
    # (1, '语文老师')
    print(session_res2.one_or_none())

  • columns(*col_expressions)
    限制返回列, 也可以对列进行重新排序
    即假如结果的列为(a, b, c, d), 但我只需要a和b, 那么只需要 result.columns("a", "b"), 你还可以调整它们的顺序, 以方便解包
    注意: 这会修改Result的列, 且该方法的返回值就是修改后的Result对象
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher;")
    )
    session_res2 = session.execute(
        text("select tid, name from teacher;")
    )

    for row in session_res1.columns("tid"):
        # 返回值时修改后的Result

        # 获取字段名元组
        print(row._fields)  # ('tid',)

    # 会修改原Result
    session_res2.columns("name")
    for row in session_res2:
        # 获取字段名元组
        print(row._fields)  # ('name',)

  • scalar()
    获取第一行的第一列数据, 并 关闭Result. 如果没有要获取的行, 则返回 None
    如: [(1, "lczmx"), (2, "jack")], 返回 1
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    print(session_res.scalar())  # 1
  • scalar_one()
    只返回一行数据的第一列或引发异常, 并 关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one() + Result.scalar()
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher where tid=1;")
    )

    print(session_res.scalar_one())  # 1

  • scalar_one_or_none()
    最多返回一行数据的第一列或引发异常, 并 关闭Result (无数据时返回 None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one_or_none() + Result.scalar()
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher where tid
  • scalars(index=0)
    返回一个 ScalarResult对象, 该对象以每行数据的 第 index列 元素作为数据 (而不是 ResultRow)
    该对象的方法有: unique partitions fetchall fetchmany all first one_or_none one
    具体使用与 Result类似
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )

    # [1, 2, 3]
    print(session_res.scalars().all())

  • mappings()
    返回一个 MappingResult对象, MappingResult对象与 Result对象类似, 但是一行数据使用 RowMapping对象表示, RowMapping对象类似于字典对象, 简而言之: 调用该方法, 你可以将一行数据由类元组变为类字典
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )

    for d in session_res.mappings():
        # 像操作字典一样操作 d 即可
        print(d.get("tid"), d.get("name"))

  • keys()
    从SQLAlchemy1.4起 (之前的版本返回一个列表), 该方法将返回一个 RMKeyView对象, 该对象可迭代, 其 _keys属性存放列的名称, 由于实现的 __contains__方法, 因此也可以使用 in运算符作判断.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
from collections import Iterable

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )

    keys = session_res.keys()
    print(keys)  # RMKeyView(['tid', 'name'])

    #
    print(type(keys))

    # 可迭代的
    print(isinstance(keys, Iterable))  # True

    if "name" in keys:
        print("name in keys")

  • freeze()
    可以对 Result进行缓存, 见官方文档: Re-Executing Statements
  • merge(*others)
    该方法合并其他 Result, 返回一个 MergedResult对象, 你可以像一般的 Result一样操作它, 但是取值的时候注意游标的位置(MergedResult关闭, Result也关闭; MergedResult取完了值, Result的值也被取完)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher where tid=1;")
    )
    session_res2 = session.execute(
        text("select tid, name from teacher where tid=2;")
    )

    session_res = session_res2.merge(session_res1)

    # 注意 先session_res2再session_res1

    # (2, '英语老师')
    print(session_res.fetchone())

    # [(1, '语文老师')]
    print(session_res1.all())

    # session_res已经取过一次
    # 所以返回: []
    print(session_res2.all())

  • partitions(size=None)
    迭代生成 size大小的行的子列表, sizeNone时调用 Result.yield_per(), 否则调用 Result.fetchmany(size)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res1 = session.execute(
        text("select tid, name from teacher;")
    )
    # 每次迭代 取 1行 数据
    for i in session_res1.partitions():
        print(i)
"""
        [(1, '语文老师')]
        [(2, '英语老师')]
        [(3, '数学老师')]
"""

    session_res2 = session.execute(
        text("select tid, name from teacher;")
    )
    # 每次迭代 取 2行 数据
    for i in session_res2.partitions(2):
        print(i)
"""
        [(1, '语文老师'), (2, '英语老师')]
        [(3, '数学老师')]
"""
    # 已经迭代完了, 就没有值了
    print(list(session_res2.partitions()))  # []

  • yield_per(num)
    迭代 num行数据, 返回的是 Result对象
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
    print(session_res.yield_per(1).all())

  • close
    关闭此 Result, 再操作的话会抛出异常: sqlalchemy.exc.ResourceClosedError: This result object is closed.

Row

一般来说, 一行数据是一个 Row对象
常用的属性或方法:

属性/方法 属性或方法 描述 _asdict

方法 返回字段名与值的字典数据 并添加到 _mapping

属性 _fields

属性 返回字符名的元组 _mapping

属性 返回字段名与值的字典数据

一般使用:
注意: 我们每一次迭代Result对象, 得到的是Row对象

通过属性取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    y = row.y

    # illustrate use with Python f-strings
    print(f"Row: {row.x} {row.y}")

在ORM中, select(Article)的属性是 Article, select(Article.title)的属性是 title

通过元组解包

result = conn.execute(text("select x, y from some_table"))

for x, y in result:
    # ...

通过索引取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    x = row[0]

MetaData 是包含 TableEngine 的对象, 也就是说它主要是用来管理 Table (表)的
下面列出 MetaData对象的一些方法

方法 参数 描述 clear

无 清除此元数据中的所有表对象 create_all(bind=None, tables=None, checkfirst=True) bind

: 数据库Engine tables Table

对象列表 checkfirst

: 是否 仅不存在表时 创建 在数据库中创建 元数据中的所有表 drop_all(bind=None, tables=None, checkfirst=True) bind

: 数据库Engine tables Table

对象列表 checkfirst

: 是否 仅存在表时 删除 在数据库中删除 元数据中存储的所有表 remove(table) table

: 表对象 从此元数据中删除给定的表对象 tables tables

是属性, 无参数 返回 Table

的字段对象

内置函数

常用的SQL函数:

函数名 对应的SQL函数 描述 max

MAX 返回一组值中的最大值 min

MIN 返回一组值中的最小值 count

COUNT 返回匹配指定条件的行数, 没有参数时为: COUNT(*) sum

SUM 计算一组值的总和 rank

RAND 产生 0 至 1 之间的随机数 concat

CONCAT 多个字符串连接为一个字符串 char_length

CHAR_LENGTH 计算字符串字符数 coalesce

COALESCE 接受一系列的表达式或列, 返回第一个非空的值 session_user

SESSION_USER 返回当前连接的当前用户名和主机名, 形如: root@localhost user

USER 返回连接的当前用户名和主机名, 形如: root@localhost current_user

CURRENT_USER 返回用户名和主机名, 形如: root@localhost current_date

CURRENT_DATE 函数返回当前日期, 格式: YYYY-MM-DD current_time

CURRENT_TIME 返回当前时间, 格式: HH-MM-SS current_timestamp

CURRENT_TIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS localtime

LOCALTIME 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS localtimestamp

LOCALTIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS now

NOW 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS sysdate

SYSDATE 返回当前日期和时间, 格式: YYYY-MM-DD HH&#xFF1A;MM&#xFF1A;SS array_agg

ARRAY_AGG PostgreSql可用, 把表达式变成一个数组 dense_rank

DENSE_RANK 用于排名, 见:
MySQL DENSE_RANK() 函数 percent_rank

PERCENT_RANK 计算分区或结果集中行的百分位数排名 Function

描述一个SQL函数, 见:
Function GenericFunction

见:
GenericFunction grouping_sets

GROUPING SETS 定义分组集, 见:
SQL Grouping Sets运算符 rollup

ROLLUP 生成小计和总计, 见:
MySQL ROLLUP cube

见:
cube cume_dist

见:
cume_dist

全部函数见: SQL and Generic Functions

使用例子:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy import select, func

导入公共基类
Base = declarative_base()
 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

class Teacher(Base):
    __tablename__ = "teacher"

    tid = Column("tid", Integer, primary_key=True, autoincrement=True)

    name = Column("name", String(10), nullable=False, comment="教师名")

with Session() as session:
    session_res = session.execute(
        select(func.count()).select_from(Teacher)
    )
    # (3,)
    print(session_res.one())

Column定义

一个 Column即表的一列数据, 和我们用SQL语句定义一列数据一样, 参数主要包括: 字段名、字段类型、约束条件, 比如:

from sqlalchemy import Column, String

Column("name", String(30), unique=True, nullable=False, comment='姓名')

字段类型, 一般你可以用直接指定数据库的字段类型, 也可以让SQLAlchemy的DDL自动选择字段类型
直接使用数据库的字段类型

字段类型 描述 ARRAY(item_type, ...)

数组类型, 目前只支持 PostgreSQL

, 因此建议用: sqlalchemy.dialects.postgresql.ARRAY BIGINT

SQL BIGINT类型 BINARY(length)

SQL BINARY类型 BLOB

SQL BLOB类型 BOOLEAN

SQL布尔类型 CHAR

SQLCHAR类型 CLOB

SQL CLOB型 DATE

SQL DATE期类型 DATETIME

SQL DATETIME类型 DECIMAL

SQL DECIMAL类型 FLOAT

SQL FLOAT类型 INT sqlalchemy.sql.sqltypes.INTEGER

的别名 INTEGER

SQL INT或INTEGER类型 JSON

SQL JSON类型 NCHAR

SQL NChar类型 NUMERIC

SQL NUMERIC类型 NVARCHAR

SQL NVARCHAR类型 REAL

SQL REAL类型 SMALLINT

SQL SMALLINT类型 TEXT

SQL TEXT类型 TIME

SQL TIME类型 TIMESTAMP

SQL TIMESTAMP类型 VARBINARY

SQLVARBINARY类型 VARCHAR

SQL VARCHAR类型

关于SQL的数据类型, 见: SQL 数据类型

自动转化的字段类型

字段类型 描述 通常对应的字段类型 Boolean

布尔数据类型 Boolean或SMALLINT String

所有字符串和字符类型的基 VARCHAR Text

大小可变的字符串类型 CLOB或TEXT LargeBinary

大的二进制字节数据类型 BLOB或BYTEA Unicode

长度可变的Unicode字符串类型 UnicodeText

无限长的Unicode字符串类型 SmallInteger

较小的一种 int 整数 SMALLINT Integer

int类型 BigInteger

BIGINT数据类型 BIGINT Numeric

用于固定精度数字的类型 NUMERIC或DECIMAL Float

浮点类型 FLOAT 或 REAL . Date datetime.date

类型 Time datetime.time

类型 DateTime datetime.datetime

类型 Interval datetime.timedelta

类型 Enum

枚举类型 ENUM或VARCHAR PickleType

保存使用pickle序列化的python对象 二进制类型 SchemaType

将类型标记为可能需要架构级DDL才能使用 MatchType

引用match运算符的返回类型 MySQL的是浮点型

约束条件

约束条件和其他参数 描述 autoincrement

是否自增 default

设置默认参数, 可以是可调用对象 index

是否创建索引 info SchemaItem.info

的属性 nullable

是否非空, False not null primary_key

是否为主键 unique

是否唯一 comment

注释字段, 会写入SQL 中的COMMENT onupdate

调用更新数据时传入的值, 如: updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) server_default

为SQLAlchemy的DDL设置默认值, 可以是 str unicode text()

, 如: sqlalchemy.func.now()

注 : sqlalchemy.func, 用于生成SQL函数表达式, 详情见: 内置函数

Column的例子:

from sqlalchemy import DateTime, func

class Data(Base):
    __tablename__ = 'data'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

Original: https://www.cnblogs.com/lczmx/p/15781883.html
Author: 403·Forbidden
Title: SQLAlchemy完全入门

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/581343/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 设计模式 — Flyweight(享元模式)

    享元模式(Flyweight) 运用共享技术有效地支持大量的细粒度对象 在软件系统采用纯粹对象方案的问题在于大量细粒度的对象会很快充斥在系统中,从而带来很高的运行是代价——主要指内…

    Java 2023年6月16日
    068
  • 给项目配置一个公共线程池

    /** * 公共线程池 * @return */ @Bean public ThreadPoolExecutor commonTreadPool(){ int poolSize =…

    Java 2023年5月30日
    070
  • Windows安装jdk1.8和配置环境变量

    Original: https://www.cnblogs.com/qtiger/p/15986266.htmlAuthor: 搬砖滴Title: Windows安装jdk1.8和…

    Java 2023年5月30日
    062
  • Mybatis系列全解(二):Mybatis简介与环境搭建

    封面:洛小汐 作者:潘潘 Mybatis 是一套持久层框架,灵活易用,特别流行。 ; 前言 Mybatis系列全解,我们预计准备10+篇文章,让我们了解到 Mybatis 的基本全…

    Java 2023年6月13日
    050
  • Redis安装(CentOS 8.5 64位)

    Redis安装 1. 准备工作 1.1 下载安装包 官网下载地址:https://redis.io/ 1.2 传输文件到服务器 使用ssh工具连接到服务器,把下载好的文件上传到服务…

    Java 2023年6月5日
    090
  • 后端基于方法的权限控制–Spirng-Security

    后端基于方法的权限控制–Spirng-Security默认情况下, Spring Security 并不启用方法级的安全管控. 启用方法级的管控后, 可以针对不同的方法…

    Java 2023年6月10日
    095
  • 【Java开发基础】生成两个正数之间的随机数

    java;gutter:true; int rangeStart = 99; int rangeEnd = 180;</p> <pre><code&g…

    Java 2023年5月29日
    0100
  • SLF4J 快速入门 / 绑定原理

    官网: http://www.slf4j.org/GitHub: https://github.com/qos-ch/slf4j 一、简介 SLF4J(Simple Logging…

    Java 2023年6月6日
    076
  • 并发编程之:深入解析线程池

    大家好,我是小黑,一个在互联网苟且偷生的农民工。 本期带来线程池的第二期内容,如果对线程池的基本概念还不是很清楚,可以先看我上一篇文章。 面试官:谈谈你对线程池的理解 本期内容会从…

    Java 2023年6月7日
    080
  • java中static{}语句块详解

    一、在程序的一次执行过程中,static{}语句块中的内容只被执行一次,看下面的示例: 示例一 结果:你会发现虽然执行了两条Class.forName(“Test&#8…

    Java 2023年5月29日
    033
  • 清理忽略springboot控制台启动的banner和启动日志

    清理忽略springboot控制台启动的banner和启动日志 1、springboot的banner spring: main: banner-mode: off 2、mybat…

    Java 2023年6月9日
    067
  • Java如何仅将字符串中的数字部分转成数字类型

    需要将字符串中的数字部分转成数字类型,看到网上很多都是利用Integer.parseInt(string)用过循环的方式来判断单个字符能否转换成功再进行拼接转换 其实可以使用Str…

    Java 2023年6月7日
    074
  • TCP 和 UDP 协议简介

    一、TCP TCP(Transmission Control Protocol),传输控制协议,对”传输、发送、通信”进行”控制”的…

    Java 2023年6月7日
    068
  • unity游戏中存档和读档的方法

    参考文章: https://blog.csdn.net/a1728351227/article/details/103638106 实现游戏的存读档有三个方式二进制方法XMLJSO…

    Java 2023年5月29日
    068
  • Java中的重载和重写

    关于Java中的重载与重写,每一个java人肯定都学习过,这里就再梳理一遍,加深一下印象,忘记的时候拿出来看一看就好了 重载与重写的区别 重载发生在 同一个类里面,同一个方法, 拥…

    Java 2023年6月5日
    074
  • 部署-gitlab克隆地址踩坑

    gitlab克隆地址踩坑 gitlab中的web界面的默认端口是80,ssh端口为22端口。而一般情况下,我们的服务器或者本地电脑 已经占用了这俩个端口,那么我们就需要进行端口映射…

    Java 2023年6月7日
    079
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球