Python基础入门 Day122 多任务协作:asyncio 的队列(Queue)实现生产者与消费者模型

8次阅读
没有评论

共计 1580 个字符,预计需要花费 4 分钟才能阅读完成。

在前面的异步内容中,我们学会了任务调度、并发执行与后台任务。
本篇将深入异步系统中非常关键的一部分: 任务之间如何协作

最常见的模式是 生产者 - 消费者模型(Producer-Consumer),在异步环境中我们可以使用 asyncio.Queue 来优雅地实现。


一、为什么需要生产者与消费者?

在复杂系统中,往往存在以下需求:

  • 一个任务负责“生成数据”
  • 另一个任务负责“处理数据”
  • 数据生成速度和处理速度不同
  • 不希望两者彼此阻塞

这就是生产者 - 消费者模式的价值所在。


二、asyncio.Queue 简介

asyncio.Queue 提供一个线程安全、协程安全的队列,支持:

  • put():异步放入任务
  • get():异步取出任务
  • qsize():队列大小
  • task_done():处理完成标记
  • join():等待队列所有任务处理完毕

它是构建异步流水线任务的基础。


三、创建队列

import asyncio

queue = asyncio.Queue()

简单、直接。


四、生产者:不断向队列写入数据

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.2)  # 模拟生成数据耗时
        await queue.put(f" 任务 -{i}")
        print(f" 生产:任务 -{i}")

    # 标记结束(放入 None)await queue.put(None)

要点:

  • 生产数据
  • 放入队列
  • None 通知消费者停止

五、消费者:不断处理队列中的任务

async def consumer(queue):
    while True:
        task = await queue.get()

        if task is None:
            queue.task_done()
            break  # 收到终止信号,退出消费者

        await asyncio.sleep(0.5)  # 模拟处理
        print(f" 消费:{task}")

        queue.task_done()

要点:

  • 不断从队列取数据
  • 收到 None 后退出
  • task_done() 表示处理完一个任务

六、构建生产者 - 消费者系统

async def main():
    queue = asyncio.Queue()

    prod = asyncio.create_task(producer(queue, 10))
    cons = asyncio.create_task(consumer(queue))

    await asyncio.gather(prod)
    await queue.join()  # 等待所有任务处理完毕
    await cons  # 等消费者退出

asyncio.run(main())

流程说明:

  1. 启动生产者
  2. 启动消费者
  3. 等待所有队列任务被处理
  4. 等消费者真正退出

七、同时运行多个消费者(并发处理任务)

async def main():
    queue = asyncio.Queue()

    prod = asyncio.create_task(producer(queue, 20))

    consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]

    await prod
    await queue.join()

    for c in consumers:
        await c

效果:

  • 3 个消费者并发处理
  • 队列生产速度与消费速度完全解耦
  • 实际系统常用的高性能模式

八、实际应用场景(非常常见)

生产者 - 消费者模型常见于:

  • 异步爬虫 (任务解析与下载分离)
  • 数据处理流水线 (读取 → 清洗 → 分析)
  • 日志采集系统
  • 异步任务分发器
  • 服务内部事件系统
  • 异步爬虫框架(aiohttp + Queue)

Python 许多成熟框架(如 Scrapy 的 async 分支)内部都采用了类似思想。


九、小结

本篇你掌握了:

  • 异步队列(asyncio.Queue)
  • 生产者 - 消费者模型
  • 用队列实现任务解耦与协作
  • 多消费者并发处理任务
  • 如何优雅地通知消费者退出

这是异步系统中的必备知识,为构建大型异步项目(如爬虫、消息系统、任务流水线)打下坚实基础。

正文完
 0
评论(没有评论)