Web应用框架(Flask) | 字数总计: 3.1k | 阅读时长: 14分钟 | 阅读量:
框架 Flask 是一个以微核心设计为基础的轻量级 Web 应用框架,它提供了 Web 开发的基础功能,如路由、请求处理和响应构建。这种设计哲学使得 Flask 既简单直观,易于新手上手,又足够灵活,能够满足经验丰富的开发者的需求。开发者可以根据自己的项目需求,选择性地添加各种 Flask 扩展,实现如数据库集成、表单验证等额外功能。
传参 GET 通常用于请求服务器发送资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 @app.route('/get' ) def testGet (): data = request.args.get('data' ) print ('获取的data值:' , data) return "获取的data值:" + data @app.route("/get/<int:id>" ) def testGetPath (id ): print (type (id )) return f"返回的数据 {id } "
路径参数
string:接受任何不包含斜杠的文本
int:接受正整数
float:接受正浮点数
path:接受包含斜杠的文本
POST 通常用于向服务器提交要被处理的数据。
1 2 3 4 5 6 7 8 9 10 11 12 @app.route("/post/form" , methods=["POST" ] ) def testPostForm (): username = request.form.get("username" ) password = request.form.get("password" ) print (username, password) data = { "username" : username, "password" : password } return data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class Student (): def __init__ (self, id , name, age ): self.id = id self.name = name self.age = age def __repr__ (self ): return f"Student[id={self.id } ,name={self.name} ,age={self.age} ]" def to_dict (self ): return { 'id' : self.id , 'name' : self.name, 'age' : self.age } @app.route("/post/json" , methods=["POST" ] ) def testPostJson (): id = request.json.get("id" ) name = request.json.get("name" ) age = request.json.get("age" ) data = Student(id , name, age) print (data) return jsonify(data.to_dict())
PUT 通常用于更新服务器上的现有资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @app.route("/api/put/<int:id>" , methods=["PUT" ] ) def testPut (id ): print (type (id )) return f"上传参数 {id } " @app.route("/api/put" , methods=["PUT" ] ) def testPuta (): data = request.args.get('data' ) print (type (data)) return f"上传参数 {data} " @app.route("/api/put/json" , methods=["PUT" ] ) def testPutJson (): name = request.json.get("name" ) return name
DELETE 通常用于删除服务器上的资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 @app.route('/api/delete' , methods=["DELETE" ] ) def testDelete (): data = request.args.get('name' ) print (data) return data + "删除成功" @app.route("/api/delete/<int:ID>" , methods=["DELETE" ] ) def testDeletePath (ID ): print (type (ID)) return f"测试值为 {ID} "
request对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 request.method 返回请求方法 request.args 是获取url中的请求参数数据(key-value),主要是GET 请求,请求参数在url中.也即请求链接中?后⾯的所有参数 request.args .get ('a' ) 获取url中参数a的值,数据来源是url地址 request.form 是获取form表单中的数据(key-value),原理跟request.args 差不多,只是request.args 数据来源是url,request.form 的数据来源是表单 request.form .get ('username' )获取表单中key为username的值,也即获取在html页面中定义的元素的name值对应的输入值 request.cookies 获取cookies信息 request.headres 获取请求头信息 request.data 如果处理不了的数据就变成字符串儿存在data里面 request.files 获取上传或下载的文件信息 request.path 获取请求文件路径:/myapplication/page.html request.base_url 获取域名与请求文件路径:http : request.url 获取全部url:http : request.url_root 获取域名:http :
重定向
1 2 3 4 5 6 7 8 9 10 11 12 from flask import Flask, request, redirect, jsonifyapp = Flask(__name__) @app.route('/home' ) def testGet (): return redirect('https://www.baidu.com' ) if __name__ == '__main__' : app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from flask import Flask, redirect, url_forapp = Flask(__name__) @app.route('/home' ) def testGet (): return redirect(url_for('testOut' )) @app.route('/loginOut' ) def testOut (): return 'out' if __name__ == '__main__' : app.run()
异常处理 abord
函数和错误处理1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from flask import Flask, request, redirect, jsonifyapp = Flask(__name__) @app.route('/home' ) def testGet (): abord(404 ) return None @app.errorhandler(404 ) def handle_404_error (): return redirect('https://www.baidu.com' ) if __name__ == '__main__' : app.run()
SQLAlchemy
前提
以下演示采用postgresql
,需要先安装依赖库
1 pip install sqlalchemy psycopg2
连接数据库 1 2 3 4 5 6 7 8 9 10 11 Base = declarative_base() """ postgresql://[用户名]:[用户密码]@[主机号]/[数据库名] """ url = 'postgresql://cqhy:cqhy123!@120.46.37.218/HY_reserve' engine = sqlalchemy.create_engine(url) Session = orm.sessionmaker(bind=engine) session = Session()
模型定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class TestData (Base ): __tablename__ = "a_photo" id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True , autoincrement=True ) name = sqlalchemy.Column(sqlalchemy.String(256 ), nullable=True ) uuid = sqlalchemy.Column(sqlalchemy.String(64 ), unique=True ) add_time = sqlalchemy.Column(sqlalchemy.DateTime()) data = sqlalchemy.Column(sqlalchemy.Text(), default='{}' ) def __init__ (self, name, uuid, data='{}' , add_time=None ): self.name = name self.uuid = uuid self.data = data self.add_time = add_time if None != add_time else datetime.datetime.now()
Column常用参数
序号
参数名称
说明
1
name
字段的名称,默认为类属性的名称。指定ORM模型的中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。
2
type_
字段的数据类型,如 Integer、String、Date 等。
3
primary_key
指定某个字段是否为主键,默认为 False。
4
unique
指定某个字段的值是否唯一,默认是False。
5
nullable
指定某个字段是否为空。默认值是True,可以为空。
6
default
默认值,当插入数据时没有提供该字段的值时使用。
7
index
是否创建索引,默认为 False。
8
autoincrement
是否为自增字段,仅适用于整数类型,默认为False。
9
onupdate
更新的时候执行的函数。在数据更新的时候会调用这个参数指定的值或者函数。在第一次插入这条数据的时候,不会用onupdate的值,只会使用default的值。常用于是字段(每次更新数据的时候都要更新该字段值)。
常用数据类型
序号
数据类型
说明
1
Integer
整形
2
Float
浮点类型
3
Boolean
传递True/False
4
DECIMAL
定点类型,具有小数点而且数值确定的数值
5
enum
枚举类型
6
DateTime
日期时间类型
7
Date
传递datetime.date()进去
8
Time
传递datatime.time()
9
String
字符类型,使用时需要指定长度,区别于Text类型
10
Text
文本类型
11
LONGTEXT
长文本类型
基本操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 def create_table (): with engine.connect() as connection: Base.metadata.create_all(engine) def add_one (): uuid_str = str (uuid.uuid4()) local_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) new_user = TestData(name='zhangsan' , uuid=uuid_str, data='{"name": "zhangsan"}' , add_time=local_time) session.add(new_user) session.commit() session.close() def add_multi (): local_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) data_list = [] for i in range (10 ): uuid_str = str (uuid.uuid4()) data_list.append(TestData(name='zhangsan' , uuid=uuid_str, data='{"name": "zhangsan"}' , add_time=local_time)) session.add_all(data_list) session.commit() session.close() def select_data (): user = session.query(TestData).filter (TestData.id == '1' and TestData.photo_name == 'qwe' ).one() print ('name:' , user.name) print ('data:' , user.data) session.close() def update_data (): users = session.query(TestData).filter_by(name="zhangsan" ).first() users.data = '{"name": "lisi"}' session.add(users) session.commit() session.close() def delete_one (): delete_data = session.query(TestData).filter (TestData.id == "1" ).first() if delete_data: session.delete(delete_data) session.commit() session.close() def drop_table (): session.execute(text('drop table tb_test_1' )) session.commit() session.close()
完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 import timeimport uuidimport datetimeimport sqlalchemyimport sqlalchemy.orm as ormfrom sqlalchemy.orm import declarative_basefrom sqlalchemy import textBase = declarative_base() url = 'postgresql://[用户名]:[用户密码]@[主机号]/[数据库名]' engine = sqlalchemy.create_engine(url) Session = orm.sessionmaker(bind=engine) session = Session() class TestData (Base ): __tablename__ = "a_photo" id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True , autoincrement=True ) name = sqlalchemy.Column(sqlalchemy.String(256 ), nullable=True ) uuid = sqlalchemy.Column(sqlalchemy.String(64 ), unique=True ) add_time = sqlalchemy.Column(sqlalchemy.DateTime()) data = sqlalchemy.Column(sqlalchemy.Text(), default='{}' ) def __init__ (self, name, uuid, data='{}' , add_time=None ): self.name = name self.uuid = uuid self.data = data self.add_time = add_time if None != add_time else datetime.datetime.now() def create_table (): with engine.connect() as connection: Base.metadata.create_all(engine) def add_one (): uuid_str = str (uuid.uuid4()) local_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) new_user = TestData(name='zhangsan' , uuid=uuid_str, data='{"name": "zhangsan"}' , add_time=local_time) session.add(new_user) session.commit() session.close() def add_multi (): local_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) data_list = [] for i in range (10 ): uuid_str = str (uuid.uuid4()) data_list.append(TestData(name='zhangsan' , uuid=uuid_str, data='{"name": "zhangsan"}' , add_time=local_time)) session.add_all(data_list) session.commit() session.close() def select_data (): user = session.query(TestData).filter (TestData.id == '1' and TestData.photo_name == 'qwe' ).one() print ('name:' , user.name) print ('data:' , user.data) session.close() def update_data (): users = session.query(TestData).filter_by(name="zhangsan" ).first() users.data = '{"name": "lisi"}' session.add(users) session.commit() session.close() def delete_one (): delete_data = session.query(TestData).filter (TestData.id == "1" ).first() if delete_data: session.delete(delete_data) session.commit() session.close() def drop_table (): session.execute(text('drop table tb_test_1' )) session.commit() session.close() def run (): create_table() add_multi() if __name__ == '__main__' : run()
其余解决方案 python对postgres
数据库增删查改操作(采用直接执行拼接的sql,生产有风险,存在sql注入,需注意使用场景)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import psycopg2db = psycopg2.connect( host="127.0.0.1" , port='5432' , user="postgres" , password="ccucc" , database= 'BHQ' ) cursor = db.cursor() def select_all (): sql = 'SELECT * FROM "t_animal_info" ORDER BY "id";' cursor.execute(sql) print (cursor.fetchall()) def select_all_one (): cursor.execute('SELECT * FROM "t_animal_info";' ) while True : singleData = cursor.fetchone() if singleData is None : break print (singleData) def select_num (): cursor.execute('SELECT * FROM "t_animal_info";' ) print (cursor.fetchmany(2 )) def add_single (): sql = 'INSERT INTO t_animal_info ("filename") VALUES (\'%s\')' % ("kangkang" ) cursor.execute(sql) print (f"成功插入{cursor.rowcount} 条数据" ) db.commit() def add_mutil (): sql = 'INSERT INTO "t_animal_info" ("filename") VALUES (%s)' cursor.executemany(sql, [('Tim' ,), ('Jane' ,)]) print (f"成功插入{cursor.rowcount} 条数据" ) db.commit() def update_single (): sql='UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s' cursor.execute(sql,('ccucc' ,'Tom' )) db.commit() def update_mutil (): sql = 'UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s' cursor.executemany(sql, [('Tim' , 'a' ), ('Jane' , 's' )]) db.commit() def delete_single (del_id ): cursor.execute('DELETE FROM "t_animal_info" WHERE "id" = (\'%s\')' % del_id) db.commit() def delete_mutil (): sql = 'DELETE FROM "t_animal_info" WHERE "id" = %s;' cursor.executemany(sql, [('4' ), ('5' ),('6' ), ('7' ), ('9' )]) db.commit() select_all() cursor.close() db.close()
打包部署 windows
1 2 3 4 5 from app import appif __name__ == "__main__" : from waitress import serve serve(app, host='0.0.0.0' , port=8080 , threads=4 )