最近想要学习
SQLAlchemy
, 发现网上的中文文档大多是机翻的, 读起来特别变扭, 因此对照着最新的英文文档梳理了一遍, 写下来记录一下
目前SQLAlchemy的版本为1.4.x
, 风格处于1.x过渡到2.0的时代. 为了尽量让这篇文章的兼容之后的版本, 本文将讲述1.x和2.0两种风格的接口(主要是查询的接口)
其实在2.0风格中, 主要受到影响的是ORM的查询方式, 详情见文档: 2.0 Migration – ORM Usage
安装
pip install sqlalchemy
检测 sqlalchemy
版本:
>>>import sqlalchemy
>>>sqlalchemy.__version__
'1.4.27'
使用步骤
一般来说 SQLAlchemy
的使用方式有两种: Core
和 ORM
两种有什么不同呢?
ORM
是构建在Core
之上的Core
更加底层, 可以执行直接执行SQL语句ORM
类似于Django的ORM, 由于sqlalchemy提供了一套接口, 所以不需要我们直接写SQL语句 (1.x版本)- 至于要用哪个, 等到你用到时, 你会知道的
组件依赖关系图:
Core
一般来说, 使用步骤如下:
- 配置数据库连接
- 建立连接
- 创建表
- 执行SQL语句, 按需开启事件是否自动提交
- 拿到返回数据, 执行其他代码
数据库的连接的格式
我们在创建引擎(连接)时, 需要指定数据库的URL, URL格式, 见: Engine Configuration
, 总的来说, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]
dialect
数据库名称(方言): 如mysqldriver
连接数据库的库: 如: pymysqluser
用户名password
密码host
地址dbname
数据库名称key=value
指的是给数据库的参数
如下面的URL:
mysql+pymysql://root:passwd@127.0.0.1:3306/test_db?charset=utf8
建立连接
调用 sqlalchemy.create_engine
方法, 为了兼容2.0风格的接口, 可以加上 future
参数. 至于什么是2.0风格的接口, 可以看看官方文档: 2.0 style
create_engine
有几个参数需要我们注意:
url
即数据库url, 其格式见上文:数据库的连接的格式echo
参数为True
时, 将会将engine的SQL记录到日志中 ( 默认输出到标准输出)echo_pool
为True
时,会将连接池的记录信息输出future
使用2.0样式Engine
和Connection API
更多参数见官方文档: sqlalchemy.create_engine
例子
from sqlalchemy import create_engine
兼容2.0的写法
返回对象不一样
engine1 = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
print(type(engine1))
#
engine2 = create_engine("sqlite+pysqlite:///:memory:", echo=True)
print(type(engine2))
#
注意, 由于
sqlalchemy
使用lazy initialization
的策略连接数据库, 故此时还未真正地连接上数据库
创建表
我们想要让数据库创建一个表, 需要利用 MetaData
对象, 关于一些常用的 MetaData
方法, 见: MetaData
除了要 MetaData
对象外, 我们还需要 Table
对象, 用于定义一个表的结构
Table
的一般使用
mytable = Table("mytable", metadata,
Column('mytable_id', Integer, primary_key=True),
Column('value', String(50))
)
Table
的参数:
name
表名称metadata
该表所属的MetaData对象- 其他参数: 通过
Column
指定一列数据, 格式见:Column定义
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
# 定义外键
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
相当于执行 CREATE TABLE 语句
metadata_obj.create_all(engine)
"""
-- 相当于:
CREATE TABLE user_account (
id INTEGER NOT NULL AUTO_INCREMENT,
username VARCHAR(30),
PRIMARY KEY (id)
);
CREATE TABLE address (
id INTEGER NOT NULL AUTO_INCREMENT,
uid INTEGER NOT NULL,
email_address VARCHAR(32) NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(uid) REFERENCES user_account (id)
)
"""
create_all
方法, 默认会在创建表之间检测一下表是否存在, 不存在时才创建.
Table
的一些属性
---------- 访问所有列
.c => Column
print(user_table.c.keys())
['id', 'username']
---------- 访问某一列
print(repr(user_table.c.username))
Column('username', String(length=30), table=)
---------- 返回主键
print(user_table.primary_key)
隐式生成
PrimaryKeyConstraint(Column('id', Integer(), table=, primary_key=True, nullable=False))
在事务中执行SQL
通常, 我们通过调用 engine.connect
和 engine.begin
方法开始一个事件
sqlalchemy
使用事务有两种风格 commit as you go
和 Begin once
, 前者需要我们手动提交, 后者会自动提交
手动提交
engine.connect
方法符合python的上下文管理协议, 会返回一个 Connection
对象, 该方法会在不手动提交的情况下回滚.举个例子:
from sqlalchemy import create_engine
from sqlalchemy import text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.connect() as conn:
# 执行
result = conn.execute(text("select 'hello world'")) # text 可以使用SQL语句
print(result.all())
# conn.commit()
# [('hello world',)]
# 最后会ROLLBACK
上面的代码中, 相当于开启了事务, 由于最后没有调用 commit
方法, 所以会回滚.
自动提交
engine.begin
方法也符合python的上下文管理协议, 只要执行时不报错就会自动提交, 报错时会回滚.
from sqlalchemy import create_engine
from sqlalchemy import text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.begin() as conn:
result = conn.execute(text("select 'hello world'"))
print(result.all())
# [('hello world',)]
# COMMIT
绑定参数
上面在事务中执行SQL语句时, 我们用到了 sqlalchemy.text
, 可以直接定义文本SQL字符串
为了避免被SQL注入, 故在需要传入参数的场景中需要根据 sqlalchemy
的方式传入, 而不是直接拼接成字符串.
使用 :y
的格式定义参数, 且将值以字典的形式传给 execute
from sqlalchemy import create_engine
from sqlalchemy import text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.begin() as conn:
result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"})
print(result.all())
# [('lczmx',)]
# COMMIT
多个参数时, 可以这样
with engine.connect() as conn:
conn.execute(
text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"),
[{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}])
conn.commit()
这种方式也可以
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
增删改查
处理使用 text
直接执行SQL外, 你还可以使用其他语法增删改查数据
假如表结构如下:
$show create table address;
+---------+-----------------------------------------+
| Table | Create Table |
+---------+-----------------------------------------+
| address | CREATE TABLE address
(
id
int NOT NULL AUTO_INCREMENT,
uid
int NOT NULL,
email_address
varchar(32) NOT NULL,
PRIMARY KEY (id
),
KEY uid
(uid
),
CONSTRAINT address_ibfk_1
FOREIGN KEY (uid
) REFERENCES user_account
(id
)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=gbk |
+---------+------------------------------------------+
$show create table user_account;
+--------------+------------------------------------+
| Table | Create Table |
+--------------+------------------------------------+
| user_account | CREATE TABLE user_account
(
id
int NOT NULL AUTO_INCREMENT,
username
varchar(30) DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=gbk |
+--------------+-------------------------------------+
1 row in set (0.00 sec)
插入数据
使用 insert(...).values(...)
形式为数据库插入数据
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, insert
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 插入一条普通数据
conn.execute(insert(user_table).values(id=1, username="lczmx"))
# 插入外键等数据
conn.execute(insert(address_table).values(uid=1, email_address="lczmx@foxmail.com"))
# 自动生成value, 不需要我们手动指定
conn.execute(insert(user_table),
[{"username": "张三"},
{"username": "李四"},
{"username": "王五"},
{"username": "赵六"},
])
conn.commit()
SQLAlchemy还提供了更复杂的用法, 见: Inserting Rows with Core
注意: 插入数据没有返回值
删除数据
使用 delete(...).where(...)
的形式删除数据
目前的表数据:
select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | lczmx@foxmail.com |
| 2 | 张三 | NULL | NULL |
| 3 | 李四 | NULL | NULL |
| 4 | 王五 | NULL | NULL |
| 5 | 赵六 | NULL | NULL |
+-----+----------+------+-------------------+
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, delete
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 一般删除
# user_table.c 获取的是 列数据
result1 = conn.execute(delete(user_table).where(user_table.c.id == 3))
print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1
# and 删除
result2 = conn.execute(delete(user_table).where(user_table.c.username == "张三", user_table.c.id == 2))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
conn.commit()
.rowcount
属性获取受影响的行数
更多见: The delete() SQL Expression Construct
更新数据
使用 update(...).where(...).values(...)
的形式更新数据
select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | lczmx@foxmail.com |
| 2 | 张三 | NULL | NULL |
| 3 | 李四 | NULL | NULL |
| 4 | 王五 | NULL | NULL |
| 5 | 赵六 | NULL | NULL |
+-----+----------+------+-------------------+
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, update, bindparam, select
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 一般更新
result1 = conn.execute(update(user_table).where(
user_table.c.username == "王五").values(username="王老五"))
print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1
# 更新数据 加上 原来的数据
result2 = conn.execute(
update(user_table).where(user_table.c.username == "赵六").values(
username=user_table.c.username + "一号"))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
# 以字典的形式, 替换更新多个值
result3 = conn.execute(
update(user_table).where(user_table.c.username == bindparam('old_name')).values(
username=bindparam('new_name')),
[
{"old_name": "张三", "new_name": "新张三"},
{"old_name": "李四", "new_name": "新李四"},
]
)
print(f"受影响行数: {result3.rowcount}") # 受影响行数: 2
# 以 子查询 的方式 更新数据
scalar_subq = (
select(address_table.c.email_address).
where(address_table.c.uid == user_table.c.id).
order_by(address_table.c.id).
limit(1).
scalar_subquery()
)
# 将email_address的值 赋给 username
update(user_table).values(username=scalar_subq)
"""
-- 以上查询, 相当于:
UPDATE user_account SET username=(SELECT address.email_address
FROM address
WHERE address.uid = user_account.id ORDER BY address.id
LIMIT :param_1)
"""
conn.commit()
修改后的结果:
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | lczmx@foxmail.com |
| 2 | 新张三 | NULL | NULL |
| 3 | 新李四 | NULL | NULL |
| 4 | 王老五 | NULL | NULL |
| 5 | 赵六一号 | NULL | NULL |
+-----+----------+------+-------------------+
更多见: Updating and Deleting Rows with Core
查询数据
由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解
处理查询返回的数据
我们执行 conn.execute
方法的结果为: CursorResult
对象
其本质上是继承与 Result
对象, 其 使用方式见:Result
例子:
假如查询的表:
mysql> select * from user_account;
+----+----------+
| id | username |
+----+----------+
| 9 | lczmx |
| 10 | jack |
| 11 | tom |
| 12 | mike |
+----+----------+
4 rows in set (0.00 sec)
mysql>
利用SQLAlchemy获取数据:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.connect() as conn:
# 执行
result = conn.execute(text("select * from user_account;"))
for row in result.all():
# 使用f-strings 格式化字符串
print(f"id: {row.id:3}, username: {row.username:20}")
# 打印的结果:
"""
id: 9, username: lczmx
id: 10, username: jack
id: 11, username: tom
id: 12, username: mike
"""
conn.commit()
ORM
和Core一样, ORM也有一定的使用步骤:
- 配置数据库连接, 见上文:数据库的连接的格式
- 创建会话
- 创建表
- 使用接口, 增删改查数据
- 拿到返回数据, 执行其他代码
在学习SQLAlcehmy的ORM之前, 建议先了解一些概念, 以免后面会混淆
- 会话
Session
会话是SQLAlchemy ORM与数据库的交互对象
它可以管理建立连接中engine
, 并为通过会话加载或与会话关联的对象提供标识映射 (identity map
)
在使用时与Connection
非常相似, 你可以对比着使用 Base
通过sqlalchemy.orm.declarative_base
创建
作为定义表的基类, 内部有包含MetaData
对象
可以类似于Django
一样定义表
在 SQLAlchemy
中, session
是一个连接池, 的由其管理, 因此, 假如我们需要操作数据库的话, 需要在 session
中拿到 Connection
(连接)
创建会话
SQLAlchemy
提供了两种创建会话的方法:
sqlalchemy.orm.Session
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
创建引擎
engine = create_engine('postgresql://scott:tiger@localhost/')
创建会话
以下with可以简写成 with Session(engine) as session, session.begin():
with Session(engine) as session:
# 开启自动提交
with session.begin():
# add方法 会将some_object 保存到数据库
# session.add(some_object)
# session.add(some_other_object)
pass
sqlalchemy.orm.sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
创建引擎
engine = create_engine('postgresql://scott:tiger@localhost/')
创建session
Session = sessionmaker(engine)
一般使用
with Session() as session:
# session.add(some_object)
# session.add(some_other_object)
# 提交
session.commit()
自动提交
with Session.begin() as session:
# session.add(some_object)
# session.add(some_other_object)
pass
虽然有两种方法创建会话, 但我们一般使用 sessionmaker
创建会话
另外补充一下 session
的其它使用方式:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql://scott:tiger@localhost/')
Session = sessionmaker(engine)
从连接指定到session
with engine.connect() as connection:
with Session(bind=connection) as session:
# 一些操作
pass
下面列出 session
的一些常用方法, 增删改查数据时要用到
方法 参数 描述 add
instance
下次刷新操作时, 将 instance
保留到数据库中 delete
instance
下次刷新操作时, 将 instance
从数据库中删除 begin
subtransactions
nested
_subtrans
开始事务 rollback
无 回滚当前事务 commit
无 提交当前事务 close
无 关闭此Session execute
statement
params
execution_option
bind_arguments
等 执行SQL表达式构造 query
*entities
**kwargs
返回 Query
对象, 可用于查询数据 refresh
instance
attribute_names
with_for_update
instance
执行刷新操作
例子:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1)
with Session(engine) as session:
result = session.execute(stmt)
print(result.all())
# [(2, 'name2'), (3, 'name2')]
# ROLLBACK
在ORM中创建表
使用ORM时, 我们也需要 MetaData
, 不同的是, 我们是通过 sqlalchemy.orm.registry
构造的. 而且, 我们不需要像 Core
那样直接声明 Table
, 而是继承某个公共基类 (Base
), 添加属性即可. 有两种方式定义基类.
方式一:
from sqlalchemy.orm import registry
mapper_registry = registry()
print(mapper_registry.metadata) # MetaData对象
公共基类
Base = mapper_registry.generate_base()
方法二:
from sqlalchemy.orm import declarative_base
内部 return registry(...).generate_base(...)
Base = declarative_base()
现在你可以像在 Django ORM
中一样, 定义表并在数据库中创建表, 每一个 Column
表示一列数据, 关于 Column
的写法, 见: Column定义
from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Student(Base):
__tablename__ = "student"
sid = Column("sid", Integer, primary_key=True)
name = Column("name", String(32), nullable=False, index=True, comment="姓名")
age = Column("age", SMALLINT, nullable=False, comment="年龄")
gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")
class Course(Base):
__tablename__ = "course"
cid = Column("cid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="科目名")
tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="教师名")
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
"""
-- 对于sql
CREATE TABLE student (
sid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(32) NOT NULL COMMENT '姓名',
age SMALLINT NOT NULL COMMENT '年龄',
gender BOOL NOT NULL COMMENT '性别, True: 男, False: 女',
PRIMARY KEY (sid)
)
CREATE TABLE teacher (
tid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(10) NOT NULL COMMENT '教师名',
PRIMARY KEY (tid)
)
CREATE TABLE course (
cid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(10) NOT NULL COMMENT '科目名',
tid INTEGER COMMENT '课程教师',
PRIMARY KEY (cid),
FOREIGN KEY(tid) REFERENCES teacher (tid)
)
CREATE TABLE score (
sid INTEGER NOT NULL AUTO_INCREMENT,
score SMALLINT NOT NULL COMMENT '成绩',
student_id INTEGER COMMENT '成绩所属学生',
course_id INTEGER COMMENT '成绩所属科目',
PRIMARY KEY (sid),
FOREIGN KEY(student_id) REFERENCES student (sid),
FOREIGN KEY(course_id) REFERENCES course (cid)
)
"""
Base.metadata
是MetaData
对象, 常用的MetaData
方法见: MetaData注: 你通过
Student.__table__
属性可以查看Table
, 也可以通过Student.name
访问某一列
你也可以通过__init__
显示定义某些列
增删改查数据
插入数据
接上文 “在ORM中创建表” 中的表
1.x的接口与2.0的接口一样, 都是调用 session.add(instance)
方法添加到数据库 (add
方法下次刷新操作时, 将 instance
保存到数据库)
注意: 自动生成的数据, 在未插入到数据库之前, 都为 None
, 如: 自动生成的主键
你也可以调用
add_all(instance1, instance2, ...)
方法, 区别只是插入一条和多条数据而已
from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.orm import Session
from typing import Any
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Student(Base):
__tablename__ = "student"
sid = Column("sid", Integer, primary_key=True)
name = Column("name", String(32), nullable=False, index=True, comment="姓名")
age = Column("age", SMALLINT, nullable=False, comment="年龄")
gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")
class Course(Base):
__tablename__ = "course"
cid = Column("cid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="科目名")
tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="教师名")
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
一般将 添加到数据库 封装成一个函数
def create_data(db: Session, target_cls: Any, **kwargs):
try:
cls_obj = target_cls(**kwargs)
# 添加一个
db.add(cls_obj)
# 添加多个:
# db.add_all([obj1, obj2, ...])
db.commit()
# 手动将 数据 刷新到数据库
db.refresh(cls_obj)
return cls_obj
except Exception as e:
# 别忘记发生错误时回滚
db.rollback()
raise e
session = SessionLocal()
-------------- 创建学生数据
student = create_data(session, Student, sid=1, name="张三", age=22, gender=True)
-------------- 创建教师数据
teacher = create_data(session, Teacher, tid=1, name="语文老师")
-------------- 创建课程数据
course = create_data(session, Course, cid=1, name="语文", tid=teacher.tid)
-------------- 创建成绩数据
score = create_data(session, Score, sid=1, score=89, student_id=student.sid, course_id=course.cid)
注意: 自动生成主键时, 只有在刷新到数据库中后, 才能获取主键
总的来说, 插入数据代码一般为:
1. 实例化一个表类
db_city = CityTable(....)
2. 调用session的add方法
session.add(db_city)
3. 调用session的commit方法 提交事务
session.commit()
4. 手动调用session的refresh方法 将数据刷新到数据库
session.refresh(db_city)
删除数据
1.x的方法
主要步骤是先查询再删除, 一般形式为: session.query(...).filter(...).delete()
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 方法一: 调用 session.delete 方法
s = session.query(Score).filter(Score.score == 59).first()
session.delete(s)
# 方法二: 查询后直接删除
session.query(Score).filter(Score.score == 59).delete()
session.commit()
2.0的方法
像Core一样删除数据, 即 delte(...).where(...)
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import select, delete
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
session.execute(
delete(Score).where(Score.sid == 1)
)
session.commit()
修改数据
1.x的方法
主要步骤是先查询再更新, 即: session.query(...).filter(...).update(...)
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
row = session.query(Score).filter(Score.score == 59).update({"score": 60})
print(f"修改的行数: {row}")
session.commit()
2.0的方法
同样和Core一样, 使用 update(...).where(...).values(...)
的形式更新数据
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import update, bindparam
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 一般更新
result1 = session.execute(update(Score).where(Score.score == 59).values(score=60))
print(f"受影响行数: {result1.rowcount}")
# # 更新数据 加上 原来的数据
result2 = session.execute(
update(Score).where(Score.score == 59).values(score=Score.score + 1))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
# 以字典的形式, 替换更新多个值
result3 = session.execute(
update(Score).where(Score.score == bindparam('old_score')).values(score=bindparam('new_score')),
[
{"old_score": 59, "new_score": 60},
]
)
print(f"受影响行数: {result3.rowcount}")
session.commit()
同样
.rowcount
属性获取受影响行数
查询数据
1.x的方法
在SQLAlchemy1.x查询方式中, 使用 Query
对象进行查询, 类似于 Django ORM
的管理器, 可以较为简单地查询数据
假如要查询的表如下:
class User(Base):
__tablename__ = "User" # 设置表名
uid = Column(Integer, primary_key=True)
username = Column(String(80), unique=True)
email = Column(String(120), unique=True)
tags = Column(String(120))
def __repr__(self):
return '' % self.username
表的数据:
mysql> select * from User;
+-----+----------+-------------------+------+
| uid | username | email | tags |
+-----+----------+-------------------+------+
| 1 | 张三 | zhangesan@xx.com | 热情 |
| 2 | 李四 | lisi@xx.com | 热情 |
| 3 | 王五 | wangwu@xx.com | 开朗 |
| 4 | lczmx | lczmx@foxmail.com | 热情 |
+-----+----------+-------------------+------+
4 rows in set (0.10 sec)
mysql>
使用例子:
from sqlalchemy import Column, Integer, create_engine, String
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import not_, or_, desc
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class User(Base):
__tablename__ = "User" # 设置表名
uid = Column(Integer, primary_key=True)
username = Column(String(80), unique=True)
email = Column(String(120), unique=True)
tags = Column(String(120))
def __repr__(self):
return '' % self.username
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# ------------------ 查询所有User数据
session.query(User).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
"""
# ------------------ 查询有多少条数据
session.query(User).count()
"""
对应SQL
SELECT count(*) AS count_1 FROM (SELECT User
.uid AS User_uid
,
User
.username AS User_username
, User
.email AS User_email
,
User
.tags AS User_tags
FROM User
) AS anon_1
"""
# ------------------ 查询第1条数据
session.query(User).first()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
LIMIT 1
"""
# ------------------ 根据主键查询
session.query(User).get(1)
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.uid = 1
"""
# ------------------ 简单查询, 使用 关键字实参 的形式来设置字段名
session.query(User).filter_by(uid=1).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.uid = 1
"""
# ------------------ 复杂查询, 可以多个表一起,使用 恒等式'==' 等形式 来设置条件
session.query(User).filter(User.uid == 1).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.uid = 1
"""
# ------------------ filter 查询开头
session.query(User).filter(User.username.startswith("l")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE (User
.username LIKE concat('l', '%%'))
"""
# ------------------ filter 查询结尾
session.query(User).filter(User.username.endswith("x")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE (User
.username LIKE concat('%%', 'x'))
"""
# ------------------ filter 查询是否包含
session.query(User).filter(User.username.contains("lcz")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE (User
.username LIKE concat(concat('%%', "lcz", '%%')))
"""
# ------------------ filter 模糊查询
session.query(User).filter(User.username.like("%cz%")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.username LIKE "%cz%"
"""
# ------------------ filter 条件取反 (not)
session.query(User).filter(not_(User.username == "lczmx")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.username != "lczmx"
"""
# ------------------ filter条件 或 (or), 默认为and
session.query(User).filter(
or_(User.uid == 1, User.uid == 3), ).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.uid = 1 OR User
.uid = 3
"""
# ------------------ filter条件 and or not 一起使用
session.query(User).filter(or_(User.uid == 1, User.uid == 4), User.username == "lczmx",
not_(User.email == "wangwu@xx.com")).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE (User
.uid = 1 OR User
.uid = 4)
AND User
.username = "lczmx" AND User
.email = "lczmx@foxmail.com"
"""
# ------------------ filter 取反查询
session.query(User).filter(User.username != "lczmx").all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.username != "lczmx";
"""
# ------------------ 查询uid为[1, 3, 5, 7, 9]的用户
session.query(User).filter(User.uid.in_([1, 3, 5, 7, 9])).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
WHERE User
.uid IN (1, 3, 5, 7, 9)
"""
# ------------------ 分组查询
# !! 注意不是query(User), 因为Query(User)对应的SQL为:
# SELECT User
.uid AS User_uid
, User
.username AS User_username
,
# User
.email AS User_email
, User
.tags AS User_tags
session.query(User.tags).group_by(User.tags).all()
"""
对应SQL
SELECT User
.tags AS User_tags
FROM User
GROUP BY User
.tags
"""
# ------------------ 排序 顺序
session.query(User).order_by(User.uid).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
ORDER BY User
.uid
"""
# ------------------ 排序 倒序
session.query(User).order_by(desc(User.uid)).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
ORDER BY User
.uid DESC
"""
# ------------------ 去重
session.query(User.tags).distinct().all()
"""
对应SQL:
SELECT DISTINCT User
.tags AS User_tags
FROM User
;
"""
# ------------------ 取几条数据
session.query(User).limit(2).all()
"""
对应SQL:
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
LIMIT 2;
"""
# ------------------ 跳过几条个数据
session.query(User).offset(1).limit(2).all()
"""
对应SQL
SELECT User
.uid AS User_uid
, User
.username AS User_username
,
User
.email AS User_email
, User
.tags AS User_tags
FROM User
LIMIT 1, 2;
"""
关于query返回的对象
query
的返回对象为 sqlalchemy.orm.query.Query
对象, 你可以与 Result
对象进行对比, 主要有以下的方法:
all()
返回由表对象组成的列表first()
返回第一个结果 (表对象), 内部执行limit
SQLone()
只返回一行数据或引发异常 (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)one_or_none()
最多返回一行数据或引发异常 (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)scalar()
获取第一行的第一列数据. 如果没有要获取的行, 则返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
以上查询不包含一些连表操作, 见: relationship连表操作
2.0的方法
2.0的返回结果也是 Result
对象, 关于 Result
对象, 见: Result
注意: 由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解
relationship连表操作
我们自定义外键时, 一般的步骤是:
- 子表使用
字段名= Column(Integer, ForeignKey('主表名.主键'))
的格式定义 - 除此外, 还需要在主表中定义
relationship
用于子表与主表之间的跨表查询
完整例子:
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class User(Base):
__tablename__ = 'user'
uid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(16), nullable=False)
password = Column(String(32), nullable=False)
# Article是类名 user是反向访问的属性名称
article = relationship("Article", backref="user")
"""
article = relationship("Article", backref="user")
相当于:
class User(Base):
# Article是类名 user是反向访问的属性名称
article = relationship("Article", back_populates="user")
class Article(Base):
# User是类名 addresses是反向访问的属性名称
user = relationship("User", back_populates="addresses")
"""
def __repr__(self):
return "" % self.username
class Article(Base):
__tablename__ = 'article'
aid = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(36), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("user.uid"))
def __repr__(self):
return "" % self.title
Base.metadata.create_all(bind=engine)
关于relationship
relationship
在定义外键时, 有非常重要的作用, 如: 1. 跨表操作 2. 设置删除主表数据时子表的值
relationship
的参数很多, 这里只列出常用的几个, 全部参数见文档: relationship
back_populates
指定反向访问的属性名称backref
快捷设置两个relationship
(设置back_populates
的话, 要设置两个表)
cascade
用于控制修改数据时的选项值 说明save-update
默认选项, 在添加一条数据的时候,会把其他和它相关联的数据都添加到数据库中。这种行为就是save-update
属性影响的delete
表示当删除某一个模型中的数据的时候,是否也删除掉使用relationship
和它关联的数据delete-orphan
表示当对一个ORM对象解除了父表中的关联对象的时候,自己便会被删除掉。当然如果表中的数据被删除,自己也会被删除。这个选项只能用在一对多上,不能用在多对多以及多对一上。并且还需要在子表中的relationship
中,增加一个single_parent=True
的参数merge
默认选项, 当在使用session.merge
,合并一个对象的时候,会将使用了relationship
相关联的对象也进行merge
操作expunge
移除操作的时候,会将相关联的对象也进行移除。这个操作 只是从session
中移除,并不会真正的从数据库中删除all
是对save-update
,merge
,refresh-expire
,expunge
,delete
几种的填写 比如:
articles = relationship("Article",cascade="save-update,delete")
order_by
子表列表的排序方式
倒序
article = relationship("Article", backref="user", order_by="Article.aid.desc()")
正序
article = relationship("Article", backref="user", order_by="Article.aid")
本部分包括Core与ORM的跨表增删改查操作
select user
.uid as uid, user
.username, user
.password,
(select GROUP_CONCAT(article
.title) from article
where article
.author_id = user
.uid) as article
from user;
+-----+----------+----------+---------------+
| uid | username | password | article |
+-----+----------+----------+---------------+
| 1 | 张三 | 12345 | C++入门,C入门 |
| 2 | 李四 | 12346 | python入门 |
+-----+----------+----------+---------------+
GROUP_CONCAT
可以让多行数据拼接成一行数据, 以,
链接
通过relationship双向访问
即直接通过 relationship
访问主表或子表
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 子表访问主表, 返回主表对象
"""
实质上内部执行的SQL
SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
FROM user
WHERE user.uid = %(pk_1)s
"""
# ----------- 1.x方法
article_1x = session.query(Article).first()
print(article_1x.user) #
# ----------- 2.0 方法
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) #
# +++++++++++++++++++++++++ 主表访问子表, 返回子表列表
# 实质是 sqlalchemy.orm.collections.InstrumentedList 对象
# 是List的子类
"""
实质上内部执行的SQL
SELECT article.aid AS article_aid, article.title AS article_title,
article.content AS article_content, article.author_id AS article_author_id
FROM article
WHERE %(param_1)s = article.author_id ORDER BY article.aid
"""
# ----------- 1.x方法
user_1x = session.query(User).first()
print(user_1x.article)
# [, ]
# ----------- 2.0方法
user_20 = session.execute(select(User)).first()
print(user_20.User.article)
# [, ]
通过relationship修改关联关系
上面例子中说过, 主表.relationship字段
是 InstrumentedList
对象 (类似于 List
), 我们可以修改它, 然后调用 commit
方法即可. 子表.relationship字段
是对应的主表, 可以修改为自己想要的, 同样调用 commit
方法即可
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 子表修改属于的主表
"""
对于SQL
UPDATE article
SET author_id=%(author_id)s
WHERE article.aid = %(article_aid)s
"""
# ----------- 1.x方法
lisi = session.query(User).get(2)
article_1x = session.query(Article).first()
print(article_1x.user) #
article_1x.user = lisi # 改为
session.commit() # 记得提交
# ----------- 2.0 方法
zhangsan = session.execute(select(User).where(User.uid == 1)).first()
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) #
article_20.Article.user = zhangsan.User # 改为
session.commit() # 记得提交
# +++++++++++++++++++++++++ 主表修改子表列表
# ----------- 1.x方法 增加
user_1x = session.query(User).first()
# 添加一个新的子表数据
# 会在Article中插入一条新的数据
user_1x.article.append(Article(title="javascript 入门", content="console.log(hello world)"))
session.commit() # 记得提交
# ----------- 2.0 方法 移除
user_20 = session.execute(select(User)).first()
print(user_20.User.article) # [, , ]
article_js = session.execute(select(Article).where(Article.aid == 4)).scalar()
# 从主表中移除与子表的关系
user_20.User.article.remove(article_js)
session.commit() # 记得提交
通过relationship修改子/主表数据
同样非常简单, 找到对应的类, 然后修改数据并 commit
即可
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 通过子表修改对应主表的数据
# ----------- 1.x方法
article_1x = session.query(Article).first()
print(article_1x.user) #
article_1x.user.username = "张三二号"
session.commit() # 记得提交
# ----------- 2.0 方法
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) #
article_20.Article.user.username = "张三"
session.commit() # 记得提交
# +++++++++++++++++++++++++ 通过主表修改对应子表的数据
# ----------- 1.x方法
user_1x = session.query(User).first()
print(user_1x.article) # [, , ]
user_1x.article[-1].title = "js入门"
session.commit() # 记得提交
# ----------- 2.0 方法
user_20 = session.execute(select(User)).scalar()
print(user_20.article) # [, , ]
user_20.article[-1].title = "javascript 入门"
session.commit() # 记得提交
通过relationship查询数据
正向查询, 子表利用主表的条件查询, 使用 has
, 条件和普通查询的条件一样
反向查询, 主表利用子表的条件查询, 使用 any
, 条件和普通查询的条件一样
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 反向查询
"""
对应的SQL
SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
FROM user
WHERE EXISTS (SELECT 1
FROM article
WHERE user.uid = article.author_id AND article.title LIKE 'python%')
"""
# ********* 查询有以python开头的Article的User
# ----------- 1.x方法
user_1x = session.query(User).filter(User.article.any(Article.title.like("python%")))
print(user_1x.all()) # []
# ----------- 2.0方法
user_20 = session.execute(
select(User).where(User.article.any(Article.title.like("python%"))))
print(user_20.all()) # [(,)]
# +++++++++++++++++++++++++ 正向查询
"""
对应的SQL
SELECT article.aid AS article_aid, article.title AS article_title,
article.content AS article_content, article.author_id AS article_author_id
FROM article
WHERE EXISTS (SELECT 1
FROM user
WHERE user.uid = article.author_id AND (user.username LIKE concat(concat('%%', '四', '%%')))
"""
# ********* 查询User表中username有 四 的Article
# ----------- 1.x方法
article_1x = session.query(Article).filter(Article.user.has(User.username.contains("四")))
print(article_1x.all()) # []
# ----------- 2.0方法
article_20 = session.execute(
select(Article).where(Article.user.has(User.username.contains("四"))))
print(article_20.all()) # [(,)]
建立多对多关系
可以通过 relationship
便捷使用多对多关系
from sqlalchemy import Table, Text, Column, ForeignKey
第三张表
post_keywords = Table("post_keywords", Base.metadata,
Column('post_id', ForeignKey('posts.id'), primary_key=True),
Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
)
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
headline = Column(String(255), nullable=False)
body = Column(Text)
# 多对多关系 BlogPostKeyword
keywords = relationship('Keyword',
secondary=post_keywords,
back_populates='posts')
def __init__(self, headline, body):
self.headline = headline
self.body = body
def __repr__(self):
return "BlogPost(%r, %r)" % (self.headline, self.body)
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(50), nullable=False, unique=True)
# 多对多关系 KeywordBlogPost
posts = relationship('BlogPost',
secondary=post_keywords,
back_populates='keywords')
def __init__(self, keyword):
self.keyword = keyword
Base.metadata.create_all(bind=engine)
添加操作例子:
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 比如添加一篇文章, 为文章添加多个keyword
blog = BlogPost(headline="起飞!", body="我是文章的内容")
# 获取子表列表 并 append
blog.keywords.append(Keyword("新闻"))
blog.keywords.append(Keyword("热门"))
session.add(blog)
session.commit()
# ------- 添加第二篇文章
keyword1 = session.execute(
select(Keyword).filter_by(keyword="热门")
).scalar()
new_blog = BlogPost(headline="震惊!", body="我是第二篇文章的内容")
new_blog.keywords.append(keyword1)
session.add(new_blog)
session.commit()
查询操作例子:
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 查询所有 "热门" 文章
blog = session.execute(
select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "热门"))
).all()
print(blog)
# [(BlogPost('起飞!', '我是文章的内容'),),
# (BlogPost('震惊!', '我是第二篇文章的内容'),)]
其他操作也像一般的
relationship
一样操作
项目示范
一般来说, 我们使用SQLAlchemy的步骤大多相同, 下面
文件结构:
+--- database.py # 用于 初始化session 和 公共基类
+--- models.py # 定义表
+--- crud.py # 封装增删改查的方法
+--- schemas.py # 定义pydantic模型, 用于格式化已经取得的数据 [可选]
+--- main.py # 执行主逻辑
database.py
初始化 session
和 公共基类:
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
数据库的URL
替换成自己的
SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'
创建引擎
engine = create_engine(
# echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True
)
SessionLocal用于对数据的增删改查
flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
创建基本映射类, 用于创建表
Base = declarative_base(bind=engine, name='Base')
models.py
定义表结构
from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
导入公共基类
from .database import Base
class City(Base):
__tablename__ = 'city'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
province = Column(String(100), unique=True, nullable=False, comment='省/直辖市')
country = Column(String(100), nullable=False, comment='国家')
country_code = Column(String(100), nullable=False, comment='国家代码')
country_population = Column(BigInteger, nullable=False, comment='国家人口')
data = relationship('Data', back_populates='city') # 'Data'是关联的类名;back_populates来指定反向访问的属性名称
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')
__mapper_args__ = {"order_by": country_code} # 默认是正序,倒序加上.desc()方法
def __repr__(self):
return f'{self.country}_{self.province}'
class Data(Base):
__tablename__ = 'data'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
city_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市') # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名
date = Column(Date, nullable=False, comment='数据日期')
confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量')
deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量')
recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量')
city = relationship('City', back_populates='data') # 'City'是关联的类名;back_populates来指定反向访问的属性名称
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')
__mapper_args__ = {"order_by": date.desc()} # 按日期降序排列
def __repr__(self):
return f'{repr(self.date)}:确诊{self.confirmed}例'
crud.py
封装增删改查的方法:
from sqlalchemy.orm import Session
import models
import schemas
def get_city(db: Session, city_id: int):
return db.query(models.City).filter(models.City.id == city_id).first()
def get_city_by_name(db: Session, name: str):
return db.query(models.City).filter(models.City.province == name).first()
def get_cities(db: Session, skip: int = 0, limit: int = 10):
return db.query(models.City).offset(skip).limit(limit).all()
def create_city(db: Session, city: schemas.CreateCity):
"""
创建City数据
"""
db_city = models.City(**city.dict())
db.add(db_city)
db.commit()
db.refresh(db_city)
return db_city
def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
if city:
# 外键关联查询,这里不是像Django ORM那样Data.city.province
return db.query(models.Data).filter(models.Data.city.has(province=city))
return db.query(models.Data).offset(skip).limit(limit).all()
def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
"""
创建Data数据
"""
db_data = models.Data(**data.dict(), city_id=city_id)
db.add(db_data)
db.commit()
db.refresh(db_data)
return db_data
schemas.py
定义pydantic模型, 用于格式化已经取得的数据:
from datetime import date as date_
from datetime import datetime
from pydantic import BaseModel
class CreateData(BaseModel):
date: date_
confirmed: int = 0
deaths: int = 0
recovered: int = 0
class CreateCity(BaseModel):
province: str
country: str
country_code: str
country_population: int
class ReadData(CreateData):
id: int
city_id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
class ReadCity(CreateCity):
id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
main.py
执行主逻辑:
from sqlalchemy.orm import Session
import crud
import schemas
from database import engine, Base, SessionLocal
from models import City, Data
创建表, 已经存在的将被忽略
Base.metadata.create_all(bind=engine)
db = SessionLocal()
调用 crud
db_city = crud.get_city_by_name(db, name="广东省")
if db_city:
raise Exception("City already registered")
创建数据
city = City(...)
crud.create_city(db=db, city=city)
查询数据详解
只有涉及SQL 的查询数据, 必然绕不开 FROM
WHERE
SELECT
GROUP BY
HAVING
ORDER BY
LIMIT
INNER JOIN ... ON
LEFT JOIN ... ON
UNION
这些SQL查询语法, 那么它们在SQLAlchemy中是如何表示的呢? 实际上和SQL语句一样, SQLAlchemy的语法也有 select
where
join
order_by
group_by
having
等 …
注意: 这些方法在Core与ORM中都适用
刚接触可能会觉得比较复杂, 但是假如有SQL基础的话, 用起来比较简单.
要查询的表结构为:
+++++++++++++++++++ 使用Core定义 +++++++++++++++++++
from sqlalchemy import MetaData, create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user",
metadata_obj,
Column("uid", Integer, primary_key=True, autoincrement=True),
Column("username", String(16), nullable=False),
Column("password", String(32), nullable=False),
)
article_table = Table(
'article',
metadata_obj,
Column("aid", Integer, primary_key=True, autoincrement=True),
Column("title", String(36), nullable=False),
Column("content", Text, nullable=False),
Column("author_id", Integer, ForeignKey("user.uid"))
)
metadata_obj.create_all(bind=engine)
+++++++++++++++++++ 使用ORM定义 +++++++++++++++++++
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".
format(**DATABASE_CONFIG), echo=True, future=True)
class User(Base):
__tablename__ = 'user'
uid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(16), nullable=False)
password = Column(String(32), nullable=False)
# Article是类名 user是反向访问的属性名称
article = relationship("Article", backref="user")
def __repr__(self):
return "" % self.username
class Article(Base):
__tablename__ = 'article'
aid = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(36), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("user.uid"))
def __repr__(self):
return "" % self.title
Base.metadata.create_all(bind=engine)
表数据为:
mysql> select * from article, user where user.uid=article.author_id;
+-----+------------+--------------------+-----------+-----+----------+----------+
| aid | title | content | author_id | uid | username | password |
+-----+------------+--------------------+-----------+-----+----------+----------+
| 1 | C++入门 | c++ hello world | 1 | 1 | 张三 | 12345 |
| 2 | C入门 | c hello world | 1 | 1 | 张三 | 12345 |
| 3 | python入门 | print(hello world) | 2 | 2 | 李四 | 12346 |
+-----+------------+--------------------+-----------+-----+----------+----------+
3 rows in set (0.12 sec)
select
该函数可以指定要查询的表、列及添加别名
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 一般查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password FROM user
WHERE user.uid = 2
"""
-------------- Core
result_core_1 = conn.execute(select(user_table).where(user_table.c.uid == 2))
print(result_core_1.all()) # [(2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid == 2))
print(result_orm_1.all()) # [(,)]
2 ++++++++++++++++++ 查询全部列 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
"""
-------------- Core
result_core_2 = conn.execute(select(user_table))
print(result_core_2.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_2 = session.execute(select(User))
print(result_orm_2.all()) # [(,), (,)]
3 ++++++++++++++++++ 查询指定的列 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
"""
-------------- Core
result_core_3 = conn.execute(select(user_table.c.username))
print(result_core_3.all()) # [('张三',), ('李四',)]
-------------- ORM
result_orm_3 = session.execute(select(User.username))
print(result_orm_3.all()) # [('张三',), ('李四',)]
4 ++++++++++++++++++ 两个表 联合查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user, article
WHERE user.uid = article.author_id
"""
-------------- Core
result_core_4 = conn.execute(
select(user_table.c.username, article_table.c.title).where(
user_table.c.uid == article_table.c.author_id))
[('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_core_4.all())
-------------- ORM
result_orm_4 = session.execute(select(User.username, Article.title).where(
User.uid == Article.author_id))
[('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_orm_4.all())
5 ++++++++++++++++++ 为列添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user
WHERE user.uid = 1
"""
-------------- Core
result_core_5 = conn.execute(
select((user_table.c.username).label("name")).where(
user_table.c.uid == 1))
print(result_core_5.all()) # [('张三',)]
-------------- ORM
result_orm_5 = session.execute(
select((User.username).label("name")).where(
User.uid == 1))
print(result_orm_5.all()) # [('张三',)]
利用别名 取值
print(result_core_5.first().name) # 张三
print(result_orm_5.first().name) # 张三
6 ++++++++++++++++++ 为表添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user_1.username
FROM user AS user_1
WHERE user_1.uid = 1
"""
-------------- Core
user_table_core_alias = user_table.alias()
result_core_6 = conn.execute(
select(user_table_core_alias.c.username).where(
user_table_core_alias.c.uid == 1))
print(result_core_6.all()) # [('张三',)]
-------------- ORM
from sqlalchemy.orm import aliased
user_table_orm_alias = aliased(User)
result_orm_6 = session.execute(
select(user_table_orm_alias.username).where(
user_table_orm_alias.uid == 1))
print(result_orm_6.all()) # [('张三',)]
7 ++++++++++++++++++ 与text 结合, 添加额外列 ++++++++++++++++++
"""
对应SQL
SELECT now() AS now, user.username, '自定义字符'
FROM user
"""
from sqlalchemy import literal_column, text
literal_column表示一列数据
text可以转化成SQL
-------------- Core
result_core_7 = conn.execute(
select(literal_column("now()").label("now"), user_table.c.username, text("'自定义字符'")))
print(result_core_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 54, 44), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定义字符')]
"""
-------------- ORM
result_orm_7 = session.execute(
select(literal_column("now()").label("now"), User.username, text("'自定义字符'")))
print(result_orm_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 59, 42), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定义字符')]
"""
where
过滤数据, Core
中 table.c.xxx
获取行, 假如是ORM的话为: 类名.属性名
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 条件默认为and ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid < 3 AND user.uid > 1
"""
-------------- Core
result_core_1 = conn.execute(
select(user_table).where(user_table.c.uid < 3, user_table.c.uid > 1))
print(result_core_1.all()) # [(2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1))
print(result_orm_1.all()) # [(,)]
2 ++++++++++++++++++ 修改条件为not or ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四"
"""
from sqlalchemy import or_, not_
-------------- Core
result_core_2 = conn.execute(
select(user_table).where(
or_(user_table.c.uid == 1, user_table.c.uid == 2),
not_(user_table.c.username == "李四"),
))
print(result_core_2.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_2 = session.execute(select(User).where(
or_(User.uid == 1, User.uid == 2),
not_(User.username == "李四")))
print(result_orm_2.all()) # [(,)]
3 ++++++++++++++++++ startswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('张', '%%'))
"""
-------------- Core
result_core_3 = conn.execute(
select(user_table).where(
user_table.c.username.startswith("张")))
print(result_core_3.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_3 = session.execute(select(User).where(
User.username.startswith("张")))
print(result_orm_3.all()) # [(,)]
4 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('%%', '三'))
"""
-------------- Core
result_core_4 = conn.execute(
select(user_table).where(
user_table.c.username.endswith("三")))
print(result_core_4.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_4 = session.execute(select(User).where(
User.username.endswith("三")))
print(result_orm_4.all()) # [(,)]
5 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat(concat('%%', '三', '%%'))
"""
-------------- Core
result_core_5 = conn.execute(
select(user_table).where(
user_table.c.username.contains("三")))
print(result_core_5.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_5 = session.execute(select(User).where(
User.username.contains("三")))
print(result_orm_5.all()) # [(,)]
6 ++++++++++++++++++ like ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.username LIKE '%三'
"""
-------------- Core
result_core_6 = conn.execute(
select(user_table).where(
user_table.c.username.like("%三")))
print(result_core_6.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_6 = session.execute(select(User).where(
User.username.like("%三")))
print(result_orm_6.all()) # [(,)]
7 ++++++++++++++++++ in ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IN (1, 2)
"""
-------------- Core
result_core_7 = conn.execute(
select(user_table).where(
user_table.c.uid.in_((1, 2))
))
print(result_core_7.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_7 = session.execute(
select(User).where(
User.uid.in_((1, 2))
))
print(result_orm_7.all()) # [(,), (,)]
8 ++++++++++++++++++ between ... and ... ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid BETWEEN 1 AND 2
"""
-------------- Core
result_core_8 = conn.execute(
select(user_table).where(
user_table.c.uid.between(1, 2)
))
print(result_core_8.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_8 = session.execute(
select(User).where(
User.uid.between(1, 2)
))
print(result_orm_8.all()) # [(,), (,)]
9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IS NOT NULL
"""
from sqlalchemy import not_
-------------- Core
result_core_9 = conn.execute(
select(user_table).where(
# user_table.c.uid.is_(None), # IS NULL
not_(user_table.c.uid.is_(None)), # IS NOT NULL
))
print(result_core_9.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_9 = session.execute(
select(User).where(
# User.uid.is_(None), # IS NULL
not_(User.uid.is_(None)), # IS NOT NULL
))
print(result_orm_9.all()) # [(,), (,)]
除此之外, 你还可以使用一些其他运算符, 详情见: Operator Reference
order_by
order_by用于排序
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 根据某一列排序 (默认升序) ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid
"""
-------------- Core
result_core_1 = conn.execute(
select(user_table).order_by(user_table.c.uid))
print(result_core_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_1 = session.execute(
select(User).order_by(User.uid))
print(result_orm_1.all()) # [(,), (,)]
2 ++++++++++++++++++ 手动指定升序/降序 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid ASC/DESC
"""
-------------- Core
result_core_2 = conn.execute(
# select(user_table).order_by(user_table.c.uid.asc()) # 升序
select(user_table).order_by(user_table.c.uid.desc()) # 降序
)
print(result_core_2.all())
[(1, '张三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '张三', '12345')]
-------------- ORM
result_orm_2 = session.execute(
# select(User).order_by(User.uid.asc()) # 升序
select(User).order_by(User.uid.desc()) # 降序
)
print(result_orm_2.all())
[(,), (,)] / [(,), (,)]
3 ++++++++++++++++++ 根据别名排序 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user ORDER BY name DESC
"""
from sqlalchemy import desc, asc
-------------- Core
result_core_3 = conn.execute(
select((user_table.c.username).label("name")).order_by(desc("name"))
)
print(result_core_3.all()) # [('张三',), ('李四',)]
-------------- ORM
result_orm_3 = session.execute(
select((User.username).label("name")).order_by(desc("name"))
)
print(result_orm_3.all()) # [('张三',), ('李四',)]
group_by和having
group_by
用于分组, having
类似于 where
, 但可以对已分组数据使用聚合函数
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ group_by一般使用 ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
"""
func 内有很多内置函数
-------------- Core
result_core_1 = conn.execute(
select(func.count(article_table.c.author_id).label("count")).group_by(article_table.c.author_id)
)
print(result_core_1.all()) # [(2,), (1,)]
-------------- ORM
result_orm_1 = session.execute(
select(func.count(Article.author_id).label("count")).group_by(Article.author_id)
)
print(result_orm_1.all()) # [(2,), (1,)]
2 ++++++++++++++++++ group by + having ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
HAVING count(article.author_id) > 1
"""
func 内有很多内置函数
-------------- Core
result_core_2 = conn.execute(
select(func.count(article_table.c.author_id).label("count")).group_by(
article_table.c.author_id).having(func.count(article_table.c.author_id) > 1)
)
print(result_core_2.all()) # [(2,)]
-------------- ORM
result_orm_2 = session.execute(
select(func.count(Article.author_id).label("count")).group_by(
Article.author_id).having(func.count(Article.author_id) > 1)
)
print(result_orm_2.all()) # [(2,)]
除了
count
外, 还有其他的方法, 详情见: 内置函数
limit和offset
limit
: 表示取几条数据, offset
: 表示要跳过多少条数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 仅使用LIMIT ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1
"""
-------------- Core
result_core_1 = conn.execute(select(user_table).limit(1))
print(result_core_1.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_1 = session.execute(select(User).limit(1))
print(result_orm_1.all()) # [(,)]
2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1, 1
"""
-------------- Core
result_core_2 = conn.execute(select(user_table).limit(1).offset(1))
print(result_core_2.all()) # [(2, '李四', '12346')]
-------------- ORM
result_orm_2 = session.execute(select(User).limit(1).offset(1))
print(result_orm_2.all()) # [(,)]
去重
你可以在查询的时候使用SQL进行去重, 亦可以在取到数据后进行去重
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++
"""
对应SQL
SELECT DISTINCT article.author_id
FROM article
"""
-------------- Core
result_core_1 = conn.execute(
select(article_table.c.author_id).distinct()
)
print(result_core_1.all()) # [(1,), (2,)]
-------------- ORM
result_orm_1 = session.execute(
select(Article.author_id).distinct())
print(result_orm_1.all()) # [(1,), (2,)]
2 ++++++++++++++++++ 在结果中去重 ++++++++++++++++++
"""
对应SQL
SELECT article.author_id
FROM article
"""
-------------- Core
result_core_2 = conn.execute(select(article_table.c.author_id))
print(result_core_2.unique().all()) # [(1,), (2,)]
-------------- ORM
result_orm_2 = session.execute(select(Article.author_id))
print(result_orm_2.unique().all()) # [(1,), (2,)]
连接查询
有 内连接查询
外连接查询
完全连接查询
三种连接查询方式, 在SQLAlchemy中分别对应着 join(...)
join(..., isouter=True)
join(..., full=True)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式一 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_1 = conn.execute(
select(user_table.c.username, article_table.c.title).join_from(user_table, article_table))
print(result_core_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_1 = session.execute(
select(User.username, Article.title).join_from(User, Article))
print(result_orm_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
2 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式二 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_2 = conn.execute(
select(user_table.c.username, article_table.c.title).join(article_table))
print(result_core_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_2 = session.execute(select(User.username, Article.title).join(Article))
print(result_orm_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
3 ++++++++++++++++++ 内连接查询 (手动指定join的表) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_3 = conn.execute(
select(user_table.c.username, article_table.c.title).select_from(user_table).join(article_table))
print(result_core_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_3 = session.execute(
select(User.username, Article.title).select_from(User).join(Article))
print(result_orm_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
4 ++++++++++++++++++ 内连接查询 (手动指定on的条件) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_4 = conn.execute(
select(user_table.c.username, article_table.c.title).select_from(
user_table).join(article_table, user_table.c.uid == article_table.c.author_id))
print(result_core_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
-------------- ORM
result_orm_4 = session.execute(
select(User.username, Article.title).select_from(
User).join(Article, User.uid == Article.author_id))
print(result_orm_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
5 ++++++++++++++++++ 外连接查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_5 = conn.execute(
select(user_table, article_table.c.title).join(article_table, isouter=True))
print(result_core_5.all())
[(1, '张三', '12345', 'C入门'), (1, '张三', '12345', 'C++入门'), (2, '李四', '12346', 'python入门')]
-------------- ORM
result_orm_5 = session.execute(
select(User, Article.title).join(Article, isouter=True))
print(result_orm_5.all()) # [(, 'C入门'), (, 'C++入门'), (, 'python入门')]
6 ++++++++++++++++++ 完全连接查询 ++++++++++++++++++
# !!! 注意: MYSQL 中没有 FULL OUTER JOIN, 执行时会报错
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""
-------------- Core
result_core_6 = conn.execute(
select(user_table, article_table.c.title).join(article_table, full=True))
print(result_core_6.all())
-------------- ORM
result_orm_6 = session.execute(
select(User, Article.title).join(Article, full=True))
print(result_orm_6.all())
注意: 使用ORM的方式进行连表操作时, 可以通过
relationship
, 即不需要直接指定第二张表, 指定relationship
比如:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
result_orm = session.execute(
select(User.username, Article.title).select_from(User).join(User.article))
print(result_orm.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
UNION和UNION ALL
UNION ALL
: 合并两个或多个 SELECT
语句的结果, 结果不会去重
UNION
: 合并两个或多个 SELECT
语句的结果, 结果会去重
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, union_all, union
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ UNION ALL ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION ALL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 2
"""
-------------- Core
result_core_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_core_1_u = union_all(result_core_1_stmt1, result_core_1_stmt2)
result_core_1 = conn.execute(result_core_1_u)
print(result_core_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
-------------- ORM
result_orm_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_orm_1_u = union_all(result_orm_1_stmt1, result_orm_1_stmt2)
result_orm_1 = session.execute(result_orm_1_u)
print(result_orm_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
2 ++++++++++++++++++ UNION ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
"""
-------------- Core
result_core_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_core_2_u = union(result_core_2_stmt1, result_core_2_stmt2)
result_core_2 = conn.execute(result_core_2_u)
print(result_core_2.all()) # [(1, '张三', '12345')]
-------------- ORM
result_orm_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_u = union(result_orm_2_stmt1, result_orm_2_stmt2)
result_orm_2 = session.execute(result_orm_2_u)
print(result_orm_2.all()) # [(1, '张三', '12345')]
子查询
即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
和 SELECT * FROM data WHERE EXISTS (SELECT name FROM user);
的查询
或使用 WITH temp AS (...)
作为临时表
注意: 在使用子查询后,SQL语句的查询性能变得非常糟糕, 至于如何取舍看个人权衡了
关于子查询的优化, 见: 深入理解MySql子查询IN的执行和优化
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
1 ++++++++++++++++++ EXISTS 子查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
WHERE EXISTS (SELECT count(article.author_id) AS count_1
FROM article GROUP BY article.author_id
HAVING count(article.author_id) >= 2)
"""
-------------- Core
subquery_core_1 = (select(func.count(article_table.c.author_id)).group_by(
article_table.c.author_id).having(
func.count(article_table.c.author_id) >= 2)).exists()
result_core_1 = conn.execute(select(user_table.c.username).where(subquery_core_1))
print(result_core_1.all()) # [('张三',), ('李四',)]
-------------- ORM
subquery_orm_1 = (select(func.count(Article.author_id)).group_by(
Article.author_id).having(
func.count(Article.author_id) >= 2)).exists()
result_orm_1 = session.execute(select(User.username).where(subquery_orm_1))
print(result_orm_1.all()) # [('张三',), ('李四',)]
2 ++++++++++++++++++ 其它 子查询 ++++++++++++++++++
"""
对应SQL
SELECT article.title, anon_1.username
FROM article, (SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1) AS anon_1
WHERE article.author_id = anon_1.uid
"""
-------------- Core
subquery_core_2 = select(user_table.c.uid, user_table.c.username).where(
user_table.c.uid == 1).subquery()
result_core_2 = conn.execute(
select(article_table.c.title, subquery_core_2.c.username).where(
article_table.c.author_id == subquery_core_2.c.uid))
print(result_core_2.all()) # [('C++入门', '张三'), ('C入门', '张三')]
# -------------- ORM
subquery_orm_2 = select(User.uid, User.username).where(User.uid == 1).subquery()
result_orm_2 = session.execute(
select(Article.title, subquery_orm_2.c.username).where(
Article.author_id == subquery_orm_2.c.uid))
print(result_orm_2.all()) # [('C++入门', '张三'), ('C入门', '张三')]
3 ++++++++++++++++++ with 添加临时表 ++++++++++++++++++
"""
对应SQL
WITH anon_1 AS
(SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1)
SELECT article.title, anon_1.username
FROM article, anon_1
WHERE article.author_id = anon_1.uid
"""
-------------- Core
subquery_core_3 = select(user_table.c.uid, user_table.c.username).where(
user_table.c.uid == 1).cte()
result_core_3 = conn.execute(
select(article_table.c.title, subquery_core_3.c.username).where(
article_table.c.author_id == subquery_core_3.c.uid))
print(result_core_3.all()) # [('C++入门', '张三'), ('C入门', '张三')]
-------------- ORM
subquery_orm_3 = select(User.uid, User.username).where(User.uid == 1).cte()
result_orm_3 = session.execute(
select(Article.title, subquery_orm_3.c.username).where(
Article.author_id == subquery_orm_3.c.uid))
print(result_orm_3.all()) # [('C++入门', '张三'), ('C入门', '张三')]
从1.x迁移到2.0的接口
一些 Query
的接口用起来还是特别好用的, 因此在2.0风格的接口中, 一些接口仍然可以使用
get根据主键查询
注意: 这是session的方法
---------------- 1.x
session.query(User).get(42)
---------------- 2.0
session.get(User, 42)
filter_by简单查询
注意: 这是select的方法
result = session.execute(
select(User).filter_by(username="张三")
)
print(result.all()) # [(,)]
filter复杂查询
注意: 这是select的方法
result = session.execute(
select(User).filter(User.username == "张三")
)
print(result.all()) # [(,)]
1.x与2.0的ORM接口对比
以下表格来自于: 2.0 Migration – ORM Usage
一些类的介绍
Result
表示从数据库中返回的结果, 一行数据使用 Row
对象表示, 关于Row, 见: Row
从SQLAlchemy1.4开始, Core和ORM的结果(
Result
), 使用的接口相同
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
#
print(type(session_res))
with engine.connect() as conn:
conn_res = conn.execute(
text("select tid, name from teacher;")
)
#
print(type(conn_res))
注意: 例子中的 teacher
表的数据为:
+-----+----------+
| tid | name |
+-----+----------+
| 1 | 语文老师 |
| 2 | 英语老师 |
| 3 | 数学老师 |
+-----+----------+
Result的全部方法
unique(strategy=None)
去重, 但 需要注意何时调用, 应该在调用如.all()
这种生成Row
的方法之前调用, 否则返回的对象都没有unique
这个方法
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher;")
)
session_res2 = session.execute(
text("select tid, name from teacher;")
)
session_res = session_res1.merge(session_res2)
# [1, 2, 3, 1, 2, 3] ==> [1, 2, 3]
print(session_res.scalars().unique().all())
all
返回所有Row数据的列表, 之后的调用将返回一个空列表
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# ---------------------------------- 第一次调用all
first_all = session_res.all()
print(type(session_res.all())) #
for row in first_all:
print(type(row)) #
print(f"{row.tid}-{row.name}")
# ---------------------------------- 第二次调用all
print(session_res.all()) # []
fetchall()
与all
方法一样
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
first_fetchall = session_res.fetchall()
second_fetchall = session_res.fetchall()
# [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
print(first_fetchall)
# []
print(second_fetchall)
fetchmany(size=None)
取多行数据,size
表示取多少行数据 , 当所有行都用完时, 返回一个空列表
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# 一共3行数据
# 取两行数据: [(1, '语文老师'), (2, '英语老师')]
print(session_res.fetchmany(2))
# 取一行数据: [(3, '数学老师')]
print(session_res.fetchmany(1))
# 没有数据了, 返回空列表: []
print(session_res.fetchmany(1))
fetchone()
取一行数据, 当所有行都用完时,返回None
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
while True:
row = session_res.fetchone()
if not row:
break
print(row)
"""
(1, '语文老师')
(2, '英语老师')
(3, '数学老师')
"""
first()
获取第一行数据, 关闭Result并丢弃其余行, 如果没有行,则不获取 (即返回值为None
)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# (1, '语文老师')
print(session_res.first())
# !!! 由于Result已经关闭, 继续操作会报错:
# sqlalchemy.exc.ResourceClosedError: This result object is closed.
print(session_res.first())
one()
只返回一行数据或引发异常, 并 关闭Result (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher where tid=1;")
)
# (1, '语文老师')
print(session_res.one())
one_or_none()
最多返回一行数据或引发异常, 并 关闭Result (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher where tid < 1;")
)
# None
print(session_res1.one_or_none())
session_res2 = session.execute(
text("select tid, name from teacher where tid = 1;")
)
# (1, '语文老师')
print(session_res2.one_or_none())
columns(*col_expressions)
限制返回列, 也可以对列进行重新排序
即假如结果的列为(a, b, c, d), 但我只需要a和b, 那么只需要result.columns("a", "b")
, 你还可以调整它们的顺序, 以方便解包
注意: 这会修改Result的列, 且该方法的返回值就是修改后的Result对象
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher;")
)
session_res2 = session.execute(
text("select tid, name from teacher;")
)
for row in session_res1.columns("tid"):
# 返回值时修改后的Result
# 获取字段名元组
print(row._fields) # ('tid',)
# 会修改原Result
session_res2.columns("name")
for row in session_res2:
# 获取字段名元组
print(row._fields) # ('name',)
scalar()
获取第一行的第一列数据, 并 关闭Result. 如果没有要获取的行, 则返回None
如:[(1, "lczmx"), (2, "jack")]
, 返回1
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
print(session_res.scalar()) # 1
scalar_one()
只返回一行数据的第一列或引发异常, 并 关闭Result (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
即Result.one()
+Result.scalar()
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher where tid=1;")
)
print(session_res.scalar_one()) # 1
scalar_one_or_none()
最多返回一行数据的第一列或引发异常, 并 关闭Result (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
即Result.one_or_none()
+Result.scalar()
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher where tid
scalars(index=0)
返回一个ScalarResult
对象, 该对象以每行数据的 第index
列 元素作为数据 (而不是Result
的Row
)
该对象的方法有:unique
partitions
fetchall
fetchmany
all
first
one_or_none
one
具体使用与Result
类似
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# [1, 2, 3]
print(session_res.scalars().all())
mappings()
返回一个MappingResult
对象,MappingResult
对象与Result
对象类似, 但是一行数据使用RowMapping
对象表示,RowMapping
对象类似于字典对象, 简而言之: 调用该方法, 你可以将一行数据由类元组变为类字典
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
for d in session_res.mappings():
# 像操作字典一样操作 d 即可
print(d.get("tid"), d.get("name"))
keys()
从SQLAlchemy1.4起 (之前的版本返回一个列表), 该方法将返回一个RMKeyView
对象, 该对象可迭代, 其_keys
属性存放列的名称, 由于实现的__contains__
方法, 因此也可以使用in
运算符作判断.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
from collections import Iterable
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
keys = session_res.keys()
print(keys) # RMKeyView(['tid', 'name'])
#
print(type(keys))
# 可迭代的
print(isinstance(keys, Iterable)) # True
if "name" in keys:
print("name in keys")
freeze()
可以对Result
进行缓存, 见官方文档: Re-Executing Statementsmerge(*others)
该方法合并其他Result
, 返回一个MergedResult
对象, 你可以像一般的Result
一样操作它, 但是取值的时候注意游标的位置(MergedResult
关闭,Result
也关闭;MergedResult
取完了值,Result
的值也被取完)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher where tid=1;")
)
session_res2 = session.execute(
text("select tid, name from teacher where tid=2;")
)
session_res = session_res2.merge(session_res1)
# 注意 先session_res2再session_res1
# (2, '英语老师')
print(session_res.fetchone())
# [(1, '语文老师')]
print(session_res1.all())
# session_res已经取过一次
# 所以返回: []
print(session_res2.all())
partitions(size=None)
迭代生成size
大小的行的子列表,size
为None
时调用Result.yield_per()
, 否则调用Result.fetchmany(size)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res1 = session.execute(
text("select tid, name from teacher;")
)
# 每次迭代 取 1行 数据
for i in session_res1.partitions():
print(i)
"""
[(1, '语文老师')]
[(2, '英语老师')]
[(3, '数学老师')]
"""
session_res2 = session.execute(
text("select tid, name from teacher;")
)
# 每次迭代 取 2行 数据
for i in session_res2.partitions(2):
print(i)
"""
[(1, '语文老师'), (2, '英语老师')]
[(3, '数学老师')]
"""
# 已经迭代完了, 就没有值了
print(list(session_res2.partitions())) # []
yield_per(num)
迭代num
行数据, 返回的是Result
对象
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
print(session_res.yield_per(1).all())
close
关闭此Result
, 再操作的话会抛出异常:sqlalchemy.exc.ResourceClosedError: This result object is closed.
Row
一般来说, 一行数据是一个 Row
对象
常用的属性或方法:
属性/方法 属性或方法 描述 _asdict
方法 返回字段名与值的字典数据 并添加到 _mapping
属性 _fields
属性 返回字符名的元组 _mapping
属性 返回字段名与值的字典数据
一般使用:
注意: 我们每一次迭代Result对象, 得到的是Row对象
通过属性取值
result = conn.execute(text("select x, y from some_table"))
for row in result:
y = row.y
# illustrate use with Python f-strings
print(f"Row: {row.x} {row.y}")
在ORM中,
select(Article)
的属性是Article
,select(Article.title)
的属性是title
通过元组解包
result = conn.execute(text("select x, y from some_table"))
for x, y in result:
# ...
通过索引取值
result = conn.execute(text("select x, y from some_table"))
for row in result:
x = row[0]
MetaData
是包含 Table
和 Engine
的对象, 也就是说它主要是用来管理 Table
(表)的
下面列出 MetaData
对象的一些方法
方法 参数 描述 clear
无 清除此元数据中的所有表对象 create_all(bind=None, tables=None, checkfirst=True)
bind
: 数据库Engine tables
Table
对象列表 checkfirst
: 是否 仅不存在表时 创建 在数据库中创建 元数据中的所有表 drop_all(bind=None, tables=None, checkfirst=True)
bind
: 数据库Engine tables
Table
对象列表 checkfirst
: 是否 仅存在表时 删除 在数据库中删除 元数据中存储的所有表 remove(table)
table
: 表对象 从此元数据中删除给定的表对象 tables
tables
是属性, 无参数 返回 Table
的字段对象
内置函数
常用的SQL函数:
函数名 对应的SQL函数 描述 max
MAX 返回一组值中的最大值 min
MIN 返回一组值中的最小值 count
COUNT 返回匹配指定条件的行数, 没有参数时为: COUNT(*)
sum
SUM 计算一组值的总和 rank
RAND 产生 0 至 1 之间的随机数 concat
CONCAT 多个字符串连接为一个字符串 char_length
CHAR_LENGTH 计算字符串字符数 coalesce
COALESCE 接受一系列的表达式或列, 返回第一个非空的值 session_user
SESSION_USER 返回当前连接的当前用户名和主机名, 形如: root@localhost
user
USER 返回连接的当前用户名和主机名, 形如: root@localhost
current_user
CURRENT_USER 返回用户名和主机名, 形如: root@localhost
current_date
CURRENT_DATE 函数返回当前日期, 格式: YYYY-MM-DD
current_time
CURRENT_TIME 返回当前时间, 格式: HH-MM-SS
current_timestamp
CURRENT_TIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
localtime
LOCALTIME 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
localtimestamp
LOCALTIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
now
NOW 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
sysdate
SYSDATE 返回当前日期和时间, 格式: YYYY-MM-DD HH:MM:SS
array_agg
ARRAY_AGG PostgreSql可用, 把表达式变成一个数组 dense_rank
DENSE_RANK 用于排名, 见:
MySQL DENSE_RANK() 函数 percent_rank
PERCENT_RANK 计算分区或结果集中行的百分位数排名 Function
描述一个SQL函数, 见:
Function GenericFunction
见:
GenericFunction grouping_sets
GROUPING SETS 定义分组集, 见:
SQL Grouping Sets运算符 rollup
ROLLUP 生成小计和总计, 见:
MySQL ROLLUP cube
见:
cube cume_dist
全部函数见: SQL and Generic Functions
使用例子:
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy import select, func
导入公共基类
Base = declarative_base()
数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True, autoincrement=True)
name = Column("name", String(10), nullable=False, comment="教师名")
with Session() as session:
session_res = session.execute(
select(func.count()).select_from(Teacher)
)
# (3,)
print(session_res.one())
Column定义
一个 Column
即表的一列数据, 和我们用SQL语句定义一列数据一样, 参数主要包括: 字段名、字段类型、约束条件, 比如:
from sqlalchemy import Column, String
Column("name", String(30), unique=True, nullable=False, comment='姓名')
字段类型, 一般你可以用直接指定数据库的字段类型, 也可以让SQLAlchemy的DDL自动选择字段类型
直接使用数据库的字段类型
字段类型 描述 ARRAY(item_type, ...)
数组类型, 目前只支持 PostgreSQL
, 因此建议用: sqlalchemy.dialects.postgresql.ARRAY
BIGINT
SQL BIGINT类型 BINARY(length)
SQL BINARY类型 BLOB
SQL BLOB类型 BOOLEAN
SQL布尔类型 CHAR
SQLCHAR类型 CLOB
SQL CLOB型 DATE
SQL DATE期类型 DATETIME
SQL DATETIME类型 DECIMAL
SQL DECIMAL类型 FLOAT
SQL FLOAT类型 INT
sqlalchemy.sql.sqltypes.INTEGER
的别名 INTEGER
SQL INT或INTEGER类型 JSON
SQL JSON类型 NCHAR
SQL NChar类型 NUMERIC
SQL NUMERIC类型 NVARCHAR
SQL NVARCHAR类型 REAL
SQL REAL类型 SMALLINT
SQL SMALLINT类型 TEXT
SQL TEXT类型 TIME
SQL TIME类型 TIMESTAMP
SQL TIMESTAMP类型 VARBINARY
SQLVARBINARY类型 VARCHAR
SQL VARCHAR类型
关于SQL的数据类型, 见: SQL 数据类型
自动转化的字段类型
字段类型 描述 通常对应的字段类型 Boolean
布尔数据类型 Boolean或SMALLINT String
所有字符串和字符类型的基 VARCHAR Text
大小可变的字符串类型 CLOB或TEXT LargeBinary
大的二进制字节数据类型 BLOB或BYTEA Unicode
长度可变的Unicode字符串类型 UnicodeText
无限长的Unicode字符串类型 SmallInteger
较小的一种 int 整数 SMALLINT Integer
int类型 BigInteger
BIGINT数据类型 BIGINT Numeric
用于固定精度数字的类型 NUMERIC或DECIMAL Float
浮点类型 FLOAT 或 REAL . Date
datetime.date
类型 Time
datetime.time
类型 DateTime
datetime.datetime
类型 Interval
datetime.timedelta
类型 Enum
枚举类型 ENUM或VARCHAR PickleType
保存使用pickle序列化的python对象 二进制类型 SchemaType
将类型标记为可能需要架构级DDL才能使用 MatchType
引用match运算符的返回类型 MySQL的是浮点型
约束条件
约束条件和其他参数 描述 autoincrement
是否自增 default
设置默认参数, 可以是可调用对象 index
是否创建索引 info
SchemaItem.info
的属性 nullable
是否非空, False
not null
primary_key
是否为主键 unique
是否唯一 comment
注释字段, 会写入SQL 中的COMMENT onupdate
调用更新数据时传入的值, 如: updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
server_default
为SQLAlchemy的DDL设置默认值, 可以是 str
unicode
text()
, 如: sqlalchemy.func.now()
注 :
sqlalchemy.func
, 用于生成SQL函数表达式, 详情见: 内置函数
Column
的例子:
from sqlalchemy import DateTime, func
class Data(Base):
__tablename__ = 'data'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')
Original: https://www.cnblogs.com/lczmx/p/15781883.html
Author: 403·Forbidden
Title: SQLAlchemy完全入门
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/581343/
转载文章受原作者版权保护。转载请注明原作者出处!