文章目录
- 使用PyMySQL连接数据库并执行SQL语句
* - 安装PyMySQL模块
- 连接数据库并执行语句
- 插入多条数据
- 查询数据
- 删除数据
- 完整项目
- 在Flask项目中连接数据库
* - flask-sqlalchemy
使用PyMySQL连接数据库并执行SQL语句
安装PyMySQL模块
下载该模块:
pip3 install PyMySQL
连接数据库并执行语句
配置文件,这里写在一个文件也可以。
dbconfig.py
import pymysql
dbconfig = {"host": "localhost", "port": 3306, "user": "root", "password": "yu123", "db": "cy", "charset": 'utf8',
"cursorclass": pymysql.cursors.DictCursor}
con_mysql.py
import pymysql
from dbconfig import dbconfig
from pymysql.cursors import Cursor, SSCursor, DictCursor
connection = pymysql.connect(**dbconfig)
cursor = Cursor(connection)
try:
with DictCursor(connection) as cursor:
sql = 'insert into cy(id,name) values(1,2)'
cursor.execute(sql)
connection.commit()
finally:
connection.close();
插入多条数据
使用 executemany(templet,args)
参数templet,使用占位符。例如:
insert into table(id,name) values(%s,%s)
参数args是一个列表,每一个成员都是一个元组。例如:
[(1,'小明'),(2,'小红'),(3,'琦琦'),(4,'韩梅梅')]
完整代码
import pymysql
from dbconfig import dbconfig
from pymysql.cursors import Cursor, SSCursor, DictCursor
connection = pymysql.connect(**dbconfig)
cursor = Cursor(connection)
try:
with DictCursor(connection) as cursor:
sql = 'insert into cy(id,name) values(%s,%s)'
li = [(1, '小明'), (2, '小红'), (3, '琦琦'), (4, '韩梅梅')]
cursor.executemany(sql, li)
connection.commit()
finally:
connection.close();
插入成功
查询数据
获取查询结果
result = cursor.fetchone()
print(result)
result = cursor.fetchone()
print(result)
直接执行查询语句 select * from cy
然后使用 fetchall()
函数获取结果即可
mysql.py
import pymysql
from dbconfig import dbconfig
from pymysql.cursors import Cursor, SSCursor, DictCursor
connection = pymysql.connect(**dbconfig)
cursor = Cursor(connection)
try:
with DictCursor(connection) as cursor:
sql = 'select * from cy'
cursor.execute(sql)
result = cursor.fetchall()
print(result)
connection.commit()
finally:
connection.close()
删除数据
大致原理差不多,修改一下sql语句即可
import pymysql
def Delete_From():
db = pymysql.connect("localhost","root","123456","test")
cursor = db.cursor()
sql = """DELETE FROM student WHERE ID = %s"""%(3)
try:
cursor.execute(sql)
db.commit()
print("删除数据成功")
except Exception as e:
print("删除数据失败:case%s"%e)
db.rollback()
finally:
cursor.close()
db.close()
def main():
Delete_From()
if __name__ == '__main__':
main()
完整项目
├── config.ini
├── conn.py
└── auth.py
config.ini
文件,数据库配置文件。
[DATABASE]
DB_HOST = 127.0.0.1
DB_USER = root
DB_PASSWD = yu123
DB_DATABASE = yu
DB_PORT = 3306
conn.py
,连接数据库文件
import configparser
import pymysql
from pymysql.cursors import DictCursor
cfg = configparser.ConfigParser()
cfg.read('config.ini')
def dbconn():
DB_HOST = cfg.get("DATABASE", "DB_HOST")
DB_USER = cfg.get("DATABASE", "DB_USER")
DB_PORT = cfg.get("DATABASE", "DB_PORT")
DB_PASSWD = cfg.get("DATABASE", "DB_PASSWD")
DB_DATABASE = cfg.get("DATABASE", "DB_DATABASE")
conn = pymysql.connect(host=DB_HOST, port=int(DB_PORT), user=DB_USER, password=DB_PASSWD, db=DB_DATABASE,
charset='utf8')
cursor = DictCursor(conn)
return conn, cursor
auth.py
,flask项目中的权限控制文件,负责执行具体SQL语句
import functools
from flask import (
Blueprint, flash, g, redirect, render_template, request, session, url_for
)
from werkzeug.security import check_password_hash, generate_password_hash
from app.conn import dbconn
bp = Blueprint('auth', __name__, url_prefix='/auth')
@bp.route('/index', methods=('GET',))
def index():
if g.user is None:
return redirect(url_for('auth.login'))
else:
return "Hello " + g.user['username']
@bp.route('/register', methods=('GET', 'POST'))
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn, cursor = dbconn()
error = None
if not username:
error = 'Username is required.'
elif not password:
error = 'Password is required.'
if error is None:
try:
sql = "INSERT INTO user (username, password) VALUES (%s, %s)"
cursor.execute(sql, (username, generate_password_hash(password),))
conn.commit()
except Exception as e:
cursor.close()
conn.close()
error = f"出错了!\n错误信息为:{e}"
else:
cursor.close()
conn.close()
return redirect(url_for("auth.login"))
flash(error)
return render_template('auth/register.html')
@bp.route('/login', methods=('GET', 'POST'))
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn, cursor = dbconn()
error = None
sql = "SELECT * FROM user WHERE username = %s"
cursor.execute(sql, (username,))
user = cursor.fetchone()
if user is None:
error = 'Incorrect username.'
elif not check_password_hash(user['password'], password):
error = 'Incorrect password.'
if error is None:
session.clear()
session['user_id'] = user['id']
return redirect(url_for('auth.login'))
flash(error)
return render_template('auth/login.html')
@bp.before_app_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
conn, cursor = dbconn()
sql = "SELECT * FROM user WHERE id = %s"
cursor.execute(sql, (user_id,))
g.user = cursor.fetchone()
@bp.route('/logout')
def logout():
session.clear()
return redirect(url_for('auth.login'))
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
return view(**kwargs)
return wrapped_view
在Flask项目中连接数据库
需要使用 flask-sqlalchemy
模块和 flask_wtf
模块
flask-sqlalchemy
安装
pip install flask_sqlalchemy
导入SQLAlchemy并实例化
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
数据库相关配置
DIALCT = "mysql"
DRIVER = "pymysql"
USERNAME = "root"
PASSWORD = "admin"
HOST = "127.0.0.1"
PORT = "3306"
DATABASE = "flask_sqlalchmy_demo"
DB_URI = "{}+{}://{}:{}@{}:{}/{}?charset=utf8".format(DIALCT,DRIVER,USERNAME,PASSWORD,HOST,PORT,DATABASE)
app.config["SQLALCHEMY_DATABASE_URI"] = DB_URI
创建一个User模型
class Users(db.Model, UserMixin):
__tablename__ = 'Users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True)
email = db.Column(db.String(64), unique=True)
password = db.Column(db.LargeBinary)
映射到数据库中
db.create_all()
添加数据
user = Users(**request.form)
db.session.add(user)
db.session.commit()
查询数据
results = User.query.all()
print(results)
修改数据
result = User.query.filter(User.name == "张三").first()
result.name = "李四"
db.session.commit()
删除名字为张三的数据
result = User.query.filter(User.name == "张三").first()
db.session.delete(result)
db.session.commit()
Original: https://blog.csdn.net/qq_44657899/article/details/123595590
Author: 天问_Herbert555
Title: Python操作Mysql数据库
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/745663/
转载文章受原作者版权保护。转载请注明原作者出处!