FastAPI 实现定时任务
发布时间:2024-01-21
浏览量: 2184
文章分类: python
FastAPI 实现定时任务
关于定时任务,我基本都是使用apscheduler,详情可以翻我之前的文章,我有介绍这个框架如何在Django或者python中单独使用的教程。
这次在FastAPI中也是直接整合吧,非常方便。
首先安装这些我就都不讲了,网上很多都是很老的方法,我目前理解的使用事件监听方法add_event_handler来实现的。
直接上代码吧
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import datetime
import uvicorn
from typing import Callable
app = FastAPI()
# 初始化调度器
scheduler = AsyncIOScheduler()
def startup(app: FastAPI) -> Callable:
async def app_start() -> None:
# 定时任务
scheduler.add_job(
lambda: print("定时任务执行时间:", datetime.datetime.now()), "interval", seconds=10
)
scheduler.start()
return app_start
def stopping(app: FastAPI) -> Callable:
async def app_stop() -> None:
# 不停止调度器,直到所有任务完成
await scheduler.shutdown()
return app_stop
# 添加事件监听
app.add_event_handler("startup", startup(app))
app.add_event_handler("shutdown", stopping(app))
@app.get("/")
async def read_root():
return {"message": "Hello, World!"}
# 运行应用
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)