常用的一些操作,自己总结的,便于查阅
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
@app.route("/list/<int:boardid>")
@app.route("/receive_post", methods=["POST"])
post_param = int(request.form.get("post_param","0"))
# get参数用request.args
str = str
len = len
int = int
@app.route("/")
def index():
# ... some logic code
targs = globals()
targs.update(locals())
return render_template("template.html", **targs)
from flask import Flask, render_template, Blueprint, request, redirect
app = Flask(__name__)
for path in ['images', 'pic', 'css']:
blueprint = Blueprint(path, __name__, static_url_path='/'+path, static_folder=path)
app.register_blueprint(blueprint)
@app.before_request
def before_request():
ua = request.user_agent.string.lower()
for mobileua in "android|fennec|iemobile|iphone|opera mini|opera mobi|mobile".split("|"):
if mobileua in ua:
g.isphone = True
break
else:
g.isphone = False
def limit_param(param_name, default_value, minvalue, maxvalue):
"""
example: p = limit_param("p", 1, 1, 5)
"""
if maxvalue<minvalue:
maxvalue = minvalue
try:
data = int(request.args.get(param_name, default_value))
except:
data = default_value
if data<minvalue:
data = minvalue
elif data > maxvalue:
data = maxvalue
return data
用法: @require_login()
注意使用时添加到@app.route
行的后面
import functools
from flask import session, abort, redirect
def require_login(code=200, text="login first", jumptologin=False):
def real_decorator(func):
@functools.wraps(func)
def wrapper(*args,**kwargs):
if "username" not in session:
if jumptologin:
return redirect("/signin?error=needlogin&next="+signit(request.path))
elif code==200:
return text
else:
abort(code)
else:
return func(*args, **kwargs)
return wrapper
return real_decorator
from flask import Flask, render_template, Blueprint, request, redirect, Markup, g, session, abort, Response, make_response, send_file, jsonify
from werkzeug.utils import secure_filename
import time
import datetime
import random
import pickle
import requests
import os
import sys
import traceback
import mimetypes
import string
import re
import hashlib
import json
request:
curl -XGET http://127.0.0.1:5000/alert/dingding/test?x=y
then:
request.method: GET
request.url: http://127.0.0.1:5000/alert/dingding/test?x=y
request.base_url: http://127.0.0.1:5000/alert/dingding/test
request.url_charset: utf-8
request.url_root: http://127.0.0.1:5000/
str(request.url_rule): /alert/dingding/test
request.host_url: http://127.0.0.1:5000/
request.host: 127.0.0.1:5000
request.script_root:
request.path: /alert/dingding/test
request.full_path: /alert/dingding/test?x=y
request.args: ImmutableMultiDict([('x', 'y')])
request.args.get('x'): y
request.get_data() POST内容 bytes类型
request.endpoint 处理这个请求的函数名称
找到对uwsgi应用做profiling的dozer库
使用方法:
- 先安装python3对应的uwsgi:
apt install uwsgi-plugin-python3
- 写一个python脚本包装app,如
profiler_app.py
:
#!/usr/bin/python3
from app import app
from dozer import Profiler
appx = Profiler(app, profile_path="/tmp/profiles")
if __name__ == "__main__":
import os
os.system("uwsgi -w profiler_app:appx --http :80")
- 别忘记
mkdir /tmp/profiles
然后就可以启动了python3 profiler_app.py
- 使用http://127.0.0.1/_profiler/ 查看结果,可以点开每个请求看各个函数耗时详情
需求:特定页面需要加载一些耗时的资源,如果在应用启动的时候做加载,此时新来的请求就必须等待这个加载才能完成;而实际上这个init并非所有请求都必须的,想做一个lazyinit: 在不影响正常请求的前提下尽快完成init函数
我的做法:设计一个/lazyinit
路由函数做初始化工作,在重新部署/重启flask服务的时候同时启动一个简单的python脚本反复请求这个url直到所有的进程都已经触发
这样利用uwsgi自身就有的多进程负载均衡,每次最多只会有一个进程做初始化工作,其他进程可以正常处理请求;坏处就是在日志里面产生一些垃圾吧,影响不大
问题来了 uwsgi怎么知道当前是哪个进程呢 我发现threading提供的进程名称是字符串b'uWSGIWorker2Core2'
,其中Worker
后面的数字就是进程ID 不同进程ID的全局变量是不同的
代码:
flask中的/lazyinit
实现,返回处理当前请求的worker id:
import threading
def get_workerid():
# return uwsgi worker id: int
threadname = threading.current_thread().name
id_str = threadname.lower().split("worker")[1].split("core")[0]
return int(id_str)
HAS_INITED = False
@app.route("/lazyinit")
def lazyinit():
workerid = get_workerid()
if not HAS_INITED: # skip init if has already initialized
sleep(1) # do real init code...
HAS_INITED = True
return str(workerid)
这是反复请求的代码,重复请求最多100次,直到所有4个进程都已经触发,其中uwsgi的workerid是从1开始计数的
MAX_TRIES = 100
PROCESS_COUNT = 4
import requests
i = 0
status = [False]*PROCESS_COUNT
for i in range(MAX_TRIES):
id = requests.get("http://127.0.0.1/lazyinit?id="+str(i)).text
id = int(id) - 1
status[id] = True
if all(status):
break
就是这个问题: https://www.reddit.com/r/flask/comments/634i5u/make_flask_return_header_response_with_http11/
人家认为Flask不支持,其实flask使用的是werkzeug.serving
,最底层还是BaseHTTPRequestHandler,而这个是支持HTTP/1.1的,只是默认HTTP/1.0而已
实际发送请求HTTP/1.1 200 OK
是这个类的send_response
函数,用到protocol_version
这个属性,而这个属性是类的属性(不是在__init__
函数赋值的),所以我们可以直接修改 之后创建的对象就会自动拥有新的值
在调用之前添加以下几行即可
try:
from http.server import BaseHTTPRequestHandler
except: #PY2
from BaseHTTPServer import BaseHTTPRequestHandler
BaseHTTPRequestHandler.protocol_version = "HTTP/1.1"
一种直接的做法:注意顺序 局部变量优先于全局变量
targs = globals()
targs.update(locals())
render_template("x.html", **targs)
然而这样需要每个视图函数都写这三行,不够优雅
不如试试:获取调用者的局部变量 https://stackoverflow.com/questions/6618795/get-locals-from-calling-namespace-in-python
import inspect
def myrender_template(filename):
backframe = inspect.currentframe().f_back
targs = {}
targs.update(backframe.f_globals)
targs.update(backframe.f_locals)
return render_template(filename, **targs)
考虑我们需要向前端提供消息队列的消费者,比如收到广播后发给浏览器通知用户。当然我们可以用websocket,但这种场景(只有服务器给浏览器发)下只需要长连接的EventSource就行了。
基础篇: https://stackoverflow.com/questions/12232304/how-to-implement-server-push-in-flask-framework
def queue_consumer():
conn = 创建连接() #连接到消息队列,创建 channel
for data in conn.读取数据():
yield b"data: "+data+b"\n\n"
关闭连接() # 怎么执行到?
@app.route("/stream")
def stream():
return Response(queue_consumer(), mimetype="text/event-stream")
这个的问题在于关闭连接不会执行到,在消息队列服务器上观察到channel一直没有释放,这肯定不行,我们需要在浏览器断开连接的时候自动释放conn等资源。
读了 werkzeug 的源代码发现 Response 有 call_on_close 函数,在连接关闭的时候我们把生成器close即可触发yield的异常:
def queue_consumer():
conn = 创建连接() #连接到消息队列,创建 channel
try:
for data in conn.读取数据():
yield b"data: "+data+b"\n\n" #结束的时候会触发GeneratorExit异常
except:
pass
关闭连接()
@app.route("/stream")
def stream():
consumer = queue_consumer()
res = Response(consumer, mimetype="text/event-stream")
def onclose():
consumer.close()
res.call_on_close(onclose)
return res
这样还不够,发现无法使用g,以及Nginx默认缓存响应导致延迟,需要继续配置:
def queue_consumer():
conn = 创建连接() #连接到消息队列,创建 channel
try:
for data in conn.读取数据():
yield b"data: "+data+b"\n\n" #结束的时候会触发GeneratorExit异常
except:
pass
关闭连接()
@app.route("/stream")
def stream():
consumer = queue_consumer()
res = Response(stream_with_context(consumer), mimetype="text/event-stream")
def onclose():
consumer.close()
res.call_on_close(onclose)
res.headers["X-Accel-Buffering"] = "no"
res.headers["Cache-Control"] = "no-cache"
return res
这些Nginx配置你也可能需要加上:尤其是还有下一层反代的时候
uwsgi_pass_header "X-Accel-Buffering";
uwsgi_read_timeout 120s;
uwsgi_send_timeout 120s;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_pass_header "X-Accel-Buffering";
当我们的网站能被跨域访问的时候,要注意cookie的设置,加上SameSite=None; Secure
参考:
- https://stackoverflow.com/questions/56828663/how-to-explicitly-set-samesite-none-on-a-flask-response
- pallets/werkzeug#1549
- https://stackoverflow.com/questions/62992831/python-session-samesite-none-not-being-set
resp.set_cookie('cross-site-cookie', 'bar', samesite='None', secure=True)
resp.headers.add('Set-Cookie','cross-site-cookie=bar; SameSite=None; Secure')
Flask的session cookie也要跨域的话:
from flask import session
from flask.sessions import SecureCookieSessionInterface
session_cookie = SecureCookieSessionInterface().get_signing_serializer(app)
@app.after_request
def cookies(response):
same_cookie = session_cookie.dumps(dict(session))
response.headers.add("Set-Cookie", f"session={same_cookie}; Secure; HttpOnly; SameSite=None; Path=/;")
return response