Flask 超时控制是指在 Flask Web 应用中设置请求的超时时间,以防止长时间未响应的请求占用过多资源,在 Flask 中,可以通过使用 after_request 装饰器和 g 对象来实现超时控制。

创新互联公司是专业的三江侗网站建设公司,三江侗接单;提供成都网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行三江侗网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
1、使用 after_request 装饰器
after_request 装饰器可以在每次请求处理完成后执行一些操作,通过在这个装饰器中设置一个定时器,可以实现对请求的超时控制。
示例代码:
from flask import Flask, g, request, jsonify
import threading
app = Flask(__name__)
定义一个全局变量,用于存储每个请求的开始时间
request_start_time = {}
def check_timeout():
while True:
for key in list(request_start_time.keys()):
if time.time() request_start_time[key] > 5: # 设置超时时间为 5 秒
del request_start_time[key]
g.response = jsonify({"error": "请求超时"})
break
time.sleep(0.1)
@app.route('/')
def index():
return "Hello, World!"
@app.after_request
def after_request(response):
request_id = request.headers.get('XRequestID')
if request_id:
request_start_time[request_id] = time.time()
threading.Thread(target=check_timeout).start()
return response
if __name__ == '__main__':
app.run()
2、使用 g 对象
g 对象是一个线程局部数据结构,用于存储每个请求的上下文信息,通过在 g 对象中设置一个定时器,可以实现对请求的超时控制。
示例代码:
from flask import Flask, g, request, jsonify, current_app as app
import threading
import time
app = Flask(__name__)
app.config['TIMEOUT'] = 5 # 设置超时时间为 5 秒
def check_timeout():
while True:
for key in list(g.requests.keys()):
if time.time() g.requests[key]['start'] > app.config['TIMEOUT']: # 获取超时时间配置
del g.requests[key]
g.current_response = jsonify({"error": "请求超时"})
break
time.sleep(0.1)
app.logger.debug("Checking timeouts")
@app.route('/')
def index():
g.requests[request.headers.get('XRequestID')] = {'start': time.time(), 'response': "Hello, World!"} # 将请求信息存储到 g 对象中
return g.requests[request.headers.get('XRequestID')]['response']
# return "Hello, World!" # 如果注释掉这行代码,将不会触发超时控制,因为请求没有存储到 g 对象中
threading.Thread(target=check_timeout).start() # 启动一个线程来检查超时情况,避免阻塞主线程
# app.run() # 如果注释掉这行代码,将不会触发超时控制,因为主线程没有运行事件循环,无法触发 after_request 装饰器中的定时器函数