共计 1580 个字符,预计需要花费 4 分钟才能阅读完成。
在前面的异步内容中,我们学会了任务调度、并发执行与后台任务。
本篇将深入异步系统中非常关键的一部分: 任务之间如何协作 。
最常见的模式是 生产者 - 消费者模型(Producer-Consumer),在异步环境中我们可以使用 asyncio.Queue 来优雅地实现。
一、为什么需要生产者与消费者?
在复杂系统中,往往存在以下需求:
- 一个任务负责“生成数据”
- 另一个任务负责“处理数据”
- 数据生成速度和处理速度不同
- 不希望两者彼此阻塞
这就是生产者 - 消费者模式的价值所在。
二、asyncio.Queue 简介
asyncio.Queue 提供一个线程安全、协程安全的队列,支持:
put():异步放入任务get():异步取出任务qsize():队列大小task_done():处理完成标记join():等待队列所有任务处理完毕
它是构建异步流水线任务的基础。
三、创建队列
import asyncio
queue = asyncio.Queue()
简单、直接。
四、生产者:不断向队列写入数据
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.2) # 模拟生成数据耗时
await queue.put(f" 任务 -{i}")
print(f" 生产:任务 -{i}")
# 标记结束(放入 None)await queue.put(None)
要点:
- 生产数据
- 放入队列
- 用
None通知消费者停止
五、消费者:不断处理队列中的任务
async def consumer(queue):
while True:
task = await queue.get()
if task is None:
queue.task_done()
break # 收到终止信号,退出消费者
await asyncio.sleep(0.5) # 模拟处理
print(f" 消费:{task}")
queue.task_done()
要点:
- 不断从队列取数据
- 收到
None后退出 task_done()表示处理完一个任务
六、构建生产者 - 消费者系统
async def main():
queue = asyncio.Queue()
prod = asyncio.create_task(producer(queue, 10))
cons = asyncio.create_task(consumer(queue))
await asyncio.gather(prod)
await queue.join() # 等待所有任务处理完毕
await cons # 等消费者退出
asyncio.run(main())
流程说明:
- 启动生产者
- 启动消费者
- 等待所有队列任务被处理
- 等消费者真正退出
七、同时运行多个消费者(并发处理任务)
async def main():
queue = asyncio.Queue()
prod = asyncio.create_task(producer(queue, 20))
consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]
await prod
await queue.join()
for c in consumers:
await c
效果:
- 3 个消费者并发处理
- 队列生产速度与消费速度完全解耦
- 实际系统常用的高性能模式
八、实际应用场景(非常常见)
生产者 - 消费者模型常见于:
- 异步爬虫 (任务解析与下载分离)
- 数据处理流水线 (读取 → 清洗 → 分析)
- 日志采集系统
- 异步任务分发器
- 服务内部事件系统
- 异步爬虫框架(aiohttp + Queue)
Python 许多成熟框架(如 Scrapy 的 async 分支)内部都采用了类似思想。
九、小结
本篇你掌握了:
- 异步队列(asyncio.Queue)
- 生产者 - 消费者模型
- 用队列实现任务解耦与协作
- 多消费者并发处理任务
- 如何优雅地通知消费者退出
这是异步系统中的必备知识,为构建大型异步项目(如爬虫、消息系统、任务流水线)打下坚实基础。
正文完