python 异步编程中使用 yield 场景分析
python 异步编程中使用 yield 场景分析
我使用 python也差不多有十几年了,这几年虽然我也用别的编程语言,但是基本 python 一直是我的主力语言,从 2.x 一直到 3.1x版本,从开始简明手册一本电子书到现在会强迫自己去了解各种新特性,看看都有什么新玩意。但是说到多线程 、多进程 、 携程 这些偏向于cpu密集类问题,我算是看着python 一步一步走过来的。
不过在经历了多个不同的项目,涉及web开发(Django) 、 api开发(FastAPI) 、 网络请求库( requests、aiohttp ) 、 数据分析(numpy、pandas) 等等这些项目后,我现在基本稳定在全部都统一使用 uv + 异步(asyncio)的方式来做所有的项目开发, 同时在涉及规范上面,我也非常推崇这种模式,暂时我并没有遇到什么性能瓶颈。
这里我先说一下我对异步的理解,异步 其实就是一种非阻塞的编程方式,简单来说就是当你在执行一个任务的时候,如果这个任务需要等待一段时间(比如网络请求、文件读写等),那么在等待的这段时间里,你可以去执行其他的任务,而不是一直等待这个任务完成。这样就可以提高程序的效率和响应速度。
现在我们有一个场景,比如我传入一个 开始时间 、结束时间, 我需要生成这个时间段内每分钟的时间区间。 例如 start_date = '2025-10-01 00:00:00' , end_date = '2025-10-02 00:00:00', 那么我需要生成的时间区间就是:
[
{'start_date':'2025-10-01 00:00:00', 'end_date':'2025-10-01 00:01:00'},
{'start_date':'2025-10-01 00:01:00', 'end_date':'2025-10-01 00:02:00'},
...
]
这个场景比较好理解,虽然生成过程中并不涉及到所谓的 网络请求、文件读写等,但是理解起来是比较好理解的。
所以我们单纯只是以理解异步的方式来实现这个功能,实际单纯说这个功能的话,个人觉得同步和异步没有区别。
传统的模式
如果按照我们传统方式来实现,大概是这样的思路
+ 创建一个列表,用于存储生成的时间区间
+ 使用 datetime模块,生成不同的时间区间
+ 写入到最开始的列表中
+ 返回列表
下面是代码实现样例,这个方法其实到也没问题,但是如果我把粒度控制在 1 秒,一次生成 10 年的话,这个对内存压力就很大,因为存储的数据量很大。
import datetime
def hourly_intervals(start_str: str, end_str: str):
"""
返回每小时区间
"""
start = datetime.datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S")
end = datetime.datetime.strptime(end_str, "%Y-%m-%d %H:%M:%S")
current = start
datetime_list = []
while current < end:
next_hour = current + datetime.timedelta(hours=1)
if next_hour > end:
next_hour = end
# 把生成的时间写入列表
datetime_list.append(
{
"start_time": current.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": next_hour.strftime("%Y-%m-%d %H:%M:%S"),
}
)
current = next_hour
return datetime_list
使用 yield 的方式
如果我们使用 yield 的方式来实现这个功能,代码会变得更加简洁,同时也能节省内存,因为 yield 会生成一个生成器对象,而不是一次性把所有的时间区间都存储在内存中。
import datetime
async def generate_hourly_intervals(start_str: str, end_str: str):
"""
返回每小时区间
"""
start = datetime.datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S")
end = datetime.datetime.strptime(end_str, "%Y-%m-%d %H:%M:%S")
current = start
while current < end:
next_hour = current + datetime.timedelta(hours=1)
if next_hour > end:
next_hour = end
yield {
"start_time": current.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": next_hour.strftime("%Y-%m-%d %H:%M:%S"),
}
current = next_hour
可以对比看到,这里少了上面代码的dartetime_list 列表,同时也少了 append 这个操作,代码变得更加简洁。因为他是生成一次,返回结果,然后等到下一次调用的时候,再继续生成下一个结果,这样就节省大量内存。
同时在业务代码中调用也可以直接使用async for的方式来遍历调用,从阅读理解上来看也会更直观。
async for interval in generate_hourly_intervals(start_time, end_time):
logger.debug(interval)
上面的这个例子还是说明生成器异步的使用方式,当然如果你是同步的方式,也是可以使用 yield 的方式来实现的,只不过在调用的时候不能使用 async for,而是使用普通的 for 来遍历。