框架

Flask 是一个以微核心设计为基础的轻量级 Web 应用框架,它提供了 Web 开发的基础功能,如路由、请求处理和响应构建。这种设计哲学使得 Flask 既简单直观,易于新手上手,又足够灵活,能够满足经验丰富的开发者的需求。开发者可以根据自己的项目需求,选择性地添加各种 Flask 扩展,实现如数据库集成、表单验证等额外功能。

传参

GET

通常用于请求服务器发送资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
# get query参数
@app.route('/get')
def testGet():
data = request.args.get('data')
print('获取的data值:', data)
return "获取的data值:" + data


# get path参数
@app.route("/get/<int:id>")
def testGetPath(id):
print(type(id))
return f"返回的数据 {id}"

路径参数

string:接受任何不包含斜杠的文本

int:接受正整数

float:接受正浮点数

path:接受包含斜杠的文本

  • 也可自定义转换器来实现复杂限制条件

POST

通常用于向服务器提交要被处理的数据。

1
2
3
4
5
6
7
8
9
10
11
12
# post form-data参数
@app.route("/post/form", methods=["POST"])
def testPostForm():
username = request.form.get("username")
password = request.form.get("password")
print(username, password)
data = {
"username": username,
"password": password
}
return data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# post json参数
# 学生信息类
class Student():
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age

def __repr__(self):
return f"Student[id={self.id},name={self.name},age={self.age}]"

def to_dict(self):
return {
'id': self.id,
'name': self.name,
'age': self.age
}

@app.route("/post/json", methods=["POST"])
def testPostJson():
id = request.json.get("id")
name = request.json.get("name")
age = request.json.get("age")
data = Student(id, name, age)
print(data)
# 返回参数需要格式化
return jsonify(data.to_dict())

PUT

通常用于更新服务器上的现有资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# put path参数
@app.route("/api/put/<int:id>", methods=["PUT"])
def testPut(id):
print(type(id))
return f"上传参数 {id}"


# put query参数
@app.route("/api/put", methods=["PUT"])
def testPuta():
data = request.args.get('data')
print(type(data))
return f"上传参数 {data}"


# put json参数
@app.route("/api/put/json", methods=["PUT"])
def testPutJson():
name = request.json.get("name")
return name

DELETE

通常用于删除服务器上的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
# delete query参数
@app.route('/api/delete', methods=["DELETE"]) # 方式1
def testDelete():
data = request.args.get('name')
print(data)
return data + "删除成功"

# delete path参数
@app.route("/api/delete/<int:ID>", methods=["DELETE"]) # 方式2
def testDeletePath(ID):
print(type(ID))
return f"测试值为 {ID}"

request对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
request.method  返回请求方法

request.args 是获取url中的请求参数数据(key-value),主要是GET请求,请求参数在url中.也即请求链接中?后⾯的所有参数

request.args.get('a') 获取url中参数a的值,数据来源是url地址

request.form 是获取form表单中的数据(key-value),原理跟request.args差不多,只是request.args数据来源是url,request.form的数据来源是表单

request.form.get('username')获取表单中key为username的值,也即获取在html页面中定义的元素的name值对应的输入值

request.cookies 获取cookies信息

request.headres 获取请求头信息

request.data 如果处理不了的数据就变成字符串儿存在data里面

request.files 获取上传或下载的文件信息

request.path 获取请求文件路径:/myapplication/page.html

request.base_url 获取域名与请求文件路径:http://www.baidu.com/myapplication/page.html

request.url 获取全部url:http://www.baidu.com/myapplication/page.html?id=1&edit=edit

request.url_root 获取域名:http://www.baidu.com/

重定向

  • 重定向到网页
1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask, request, redirect, jsonify

app = Flask(__name__)

@app.route('/home')
def testGet():
# 重定向到百度首页
return redirect('https://www.baidu.com')


if __name__ == '__main__':
app.run()
  • 重定向到内部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from flask import Flask, redirect, url_for

app = Flask(__name__)


@app.route('/home')
def testGet():
# 内部testOut函数
return redirect(url_for('testOut'))


@app.route('/loginOut')
def testOut():
return 'out'


if __name__ == '__main__':
app.run()

异常处理

abord函数和错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import Flask, request, redirect, jsonify

app = Flask(__name__)

@app.route('/home')
def testGet():
# 返回错误类型404
abord(404)
return None

# 定义错误处理方法
@app.errorhandler(404)
def handle_404_error():
return redirect('https://www.baidu.com')

if __name__ == '__main__':
app.run()

SQLAlchemy

前提

以下演示采用postgresql,需要先安装依赖库

1
pip install sqlalchemy psycopg2

连接数据库

1
2
3
4
5
6
7
8
9
10
11
Base = declarative_base()

# 创建数据库连接
"""
postgresql://[用户名]:[用户密码]@[主机号]/[数据库名]
"""
url = 'postgresql://cqhy:cqhy123!@120.46.37.218/HY_reserve'
engine = sqlalchemy.create_engine(url)

Session = orm.sessionmaker(bind=engine)
session = Session()

模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 定义数据模型
class TestData(Base):
__tablename__ = "a_photo"

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
uuid = sqlalchemy.Column(sqlalchemy.String(64), unique=True)
add_time = sqlalchemy.Column(sqlalchemy.DateTime())
data = sqlalchemy.Column(sqlalchemy.Text(), default='{}')

def __init__(self, name, uuid, data='{}', add_time=None):
self.name = name
self.uuid = uuid
self.data = data
self.add_time = add_time if None != add_time else datetime.datetime.now()

Column常用参数

序号 参数名称 说明
1 name 字段的名称,默认为类属性的名称。指定ORM模型的中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。
2 type_ 字段的数据类型,如 Integer、String、Date 等。
3 primary_key 指定某个字段是否为主键,默认为 False。
4 unique 指定某个字段的值是否唯一,默认是False。
5 nullable 指定某个字段是否为空。默认值是True,可以为空。
6 default 默认值,当插入数据时没有提供该字段的值时使用。
7 index 是否创建索引,默认为 False。
8 autoincrement 是否为自增字段,仅适用于整数类型,默认为False。
9 onupdate 更新的时候执行的函数。在数据更新的时候会调用这个参数指定的值或者函数。在第一次插入这条数据的时候,不会用onupdate的值,只会使用default的值。常用于是字段(每次更新数据的时候都要更新该字段值)。

常用数据类型

序号 数据类型 说明
1 Integer 整形
2 Float 浮点类型
3 Boolean 传递True/False
4 DECIMAL 定点类型,具有小数点而且数值确定的数值
5 enum 枚举类型
6 DateTime 日期时间类型
7 Date 传递datetime.date()进去
8 Time 传递datatime.time()
9 String 字符类型,使用时需要指定长度,区别于Text类型
10 Text 文本类型
11 LONGTEXT 长文本类型

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# 建表
def create_table():
with engine.connect() as connection:
# 创建表格(如果不存在)
Base.metadata.create_all(engine)


# 插入一行数据
def add_one():
# 创建会话
uuid_str = str(uuid.uuid4())
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

new_user = TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time)

# 添加到session:
session.add(new_user)

# 提交即保存到数据库:
session.commit()

# 关闭session:
session.close()


# 插入多行数据
def add_multi():
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

data_list = []
for i in range(10):
uuid_str = str(uuid.uuid4())
data_list.append(TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time))

# 添加到session:
session.add_all(data_list)

# 提交即保存到数据库:
session.commit()

# 关闭session:
session.close()


# 查询操作
def select_data():
# 创建Session
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(TestData).filter(TestData.id == '1' and TestData.photo_name == 'qwe').one()
print('name:', user.name)
print('data:', user.data)
session.close() # 关闭Session


# 更新操作
def update_data():
users = session.query(TestData).filter_by(name="zhangsan").first() # 查询条件
users.data = '{"name": "lisi"}' # 更新操作
session.add(users) # 添加到会话
session.commit() # 提交即保存到数据库
session.close() # 关闭会话


# 删除操作
def delete_one():
delete_data = session.query(TestData).filter(TestData.id == "1").first()
if delete_data:
session.delete(delete_data)
session.commit()
session.close() # 关闭会话


# 删除表
def drop_table():
session.execute(text('drop table tb_test_1'))
session.commit()
session.close()

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import time
import uuid
import datetime
import sqlalchemy
import sqlalchemy.orm as orm
from sqlalchemy.orm import declarative_base
from sqlalchemy import text

Base = declarative_base()

# 创建数据库连接
url = 'postgresql://[用户名]:[用户密码]@[主机号]/[数据库名]'
engine = sqlalchemy.create_engine(url)

Session = orm.sessionmaker(bind=engine)
session = Session()


# 定义数据模型
class TestData(Base):
__tablename__ = "a_photo"

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
uuid = sqlalchemy.Column(sqlalchemy.String(64), unique=True)
add_time = sqlalchemy.Column(sqlalchemy.DateTime())
data = sqlalchemy.Column(sqlalchemy.Text(), default='{}')

def __init__(self, name, uuid, data='{}', add_time=None):
self.name = name
self.uuid = uuid
self.data = data
self.add_time = add_time if None != add_time else datetime.datetime.now()


def create_table():
with engine.connect() as connection:
# 创建表格(如果不存在)
Base.metadata.create_all(engine)

# 插入一行数据
def add_one():
# 创建会话
uuid_str = str(uuid.uuid4())
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

new_user = TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time)

# 添加到session:
session.add(new_user)

# 提交即保存到数据库:
session.commit()

# 关闭session:
session.close()


# 插入多行数据
def add_multi():
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

data_list = []
for i in range(10):
uuid_str = str(uuid.uuid4())
data_list.append(TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time))

# 添加到session:
session.add_all(data_list)

# 提交即保存到数据库:
session.commit()

# 关闭session:
session.close()


# 查询操作
def select_data():
# 创建Session
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(TestData).filter(TestData.id == '1' and TestData.photo_name == 'qwe').one()
print('name:', user.name)
print('data:', user.data)
session.close() # 关闭Session


# 更新操作
def update_data():
users = session.query(TestData).filter_by(name="zhangsan").first() # 查询条件
users.data = '{"name": "lisi"}' # 更新操作
session.add(users) # 添加到会话
session.commit() # 提交即保存到数据库
session.close() # 关闭会话


# 删除操作
def delete_one():
delete_data = session.query(TestData).filter(TestData.id == "1").first()
if delete_data:
session.delete(delete_data)
session.commit()
session.close() # 关闭会话


# 删除表
def drop_table():
session.execute(text('drop table tb_test_1'))
session.commit()
session.close()


def run():
create_table()
# add_one()
add_multi()
# select_data()
# update_data()
# delete_one()
# drop_table()
# select_data1()


if __name__ == '__main__':
run()

其余解决方案

python对postgres数据库增删查改操作(采用直接执行拼接的sql,生产有风险,存在sql注入,需注意使用场景)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import psycopg2

# 连接数据库
db = psycopg2.connect(
host="127.0.0.1",
port='5432',
user="postgres",
password="ccucc",
database= 'BHQ'
)

# 创建游标
cursor = db.cursor()

def select_all():
# 直接获取全部数据
sql = 'SELECT * FROM "t_animal_info" ORDER BY "id";'
cursor.execute(sql)
print(cursor.fetchall()) #打印结果
def select_all_one():
# 一次读取一条结果,循环获取所有记录
cursor.execute('SELECT * FROM "t_animal_info";')
while True:
singleData = cursor.fetchone()
if singleData is None:
break
print(singleData)
def select_num():
# 获取前N条记录
cursor.execute('SELECT * FROM "t_animal_info";')
print(cursor.fetchmany(2))
def add_single():
# 插入单条数据
sql = 'INSERT INTO t_animal_info ("filename") VALUES (\'%s\')' % ("kangkang")
cursor.execute(sql)
print(f"成功插入{cursor.rowcount}条数据")
db.commit()
def add_mutil():
# 批量插入数据
sql = 'INSERT INTO "t_animal_info" ("filename") VALUES (%s)'
# TypeError: not all arguments converted during string formatting(需要添加,)
cursor.executemany(sql, [('Tim',), ('Jane',)])
print(f"成功插入{cursor.rowcount}条数据")
db.commit()
def update_single():
# 执行单条数据修改
sql='UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s'
cursor.execute(sql,('ccucc','Tom'))
db.commit()
def update_mutil():
# 执行批量数据修改(!!!!注意变量位置,同sql语句占位对应!!!!)
sql = 'UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s'
cursor.executemany(sql, [('Tim', 'a'), ('Jane', 's')])
db.commit()
def delete_single(del_id):
# 执行单条数据删除
cursor.execute('DELETE FROM "t_animal_info" WHERE "id" = (\'%s\')' % del_id)
db.commit()
def delete_mutil():
# 执行批量数据删除
sql = 'DELETE FROM "t_animal_info" WHERE "id" = %s;'
cursor.executemany(sql, [('4'), ('5'),('6'), ('7'), ('9')])
db.commit()

# update_single()
# add_mutil()

# add_single()
# delete_mutil()
# delete_mutil()
select_all()


cursor.close() #关闭游标
db.close() #关闭链接

打包部署

windows

1
pip install waitress
1
2
3
4
5
from app import app

if __name__ == "__main__":
from waitress import serve
serve(app, host='0.0.0.0', port=8080, threads=4)