共计 1615 个字符,预计需要花费 5 分钟才能阅读完成。
在前面几篇内容里,我们学习了协程、async/await、异步 I/O、create_task 等核心机制。
从这一篇开始,我们将把这些知识 组合到一起 ,构建一个“小型异步任务系统”,让你真正理解异步编程在实际项目中的应用方式。
一、项目目标
设计一个简单的异步任务系统,具备以下能力:
- 批量创建任务
- 限制最大并发数
- 自动处理异常
- 任务执行完成后打印结果
- 支持后台任务
最终你将看到一个能同时调度多个异步任务的“小框架”。
二、项目结构说明
我们将创建一个异步任务系统,包括:
- 一个任务处理函数(模拟耗时任务)
- 一个异步调度器
- 并发数量控制(Semaphore)
- 通过 create_task 批量创建任务
- gather 汇总结果
三、任务处理函数(模拟 I/O)
import asyncio
import random
async def do_task(task_id):
# 模拟随机耗时
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
# 模拟随机失败
if random.random() < 0.2:
raise ValueError(f" 任务 {task_id} 执行失败 ")
return f" 任务 {task_id} 完成,用时:{delay:.2f}s"
说明:
- 随机耗时模拟 I/O
- 随机失败用于测试异常处理
四、调度器:带并发限制
sem = asyncio.Semaphore(5) # 最大并发 5
async def worker(task_id):
async with sem:
try:
result = await do_task(task_id)
print(result)
return result
except Exception as e:
print(f" 任务 {task_id} 失败:{e}")
return None
作用:
Semaphore(5)表示每次最多有 5 个任务并发执行- 确保失败任务不会中断整个系统
五、批量创建任务(任务系统核心)
async def run_tasks(num):
tasks = []
for i in range(num):
task = asyncio.create_task(worker(i))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
说明:
create_task负责启动任务gather负责等待任务全部完成- 返回结果列表
六、后台日志任务(Fire-and-forget)
async def background_logger():
while True:
await asyncio.sleep(3)
print("[ 后台] 系统心跳正常 ")
后台任务特点:
- 持续运行
- 不阻塞主任务
- 常用于心跳检测、缓存刷新、状态同步等场景
七、整合所有组件(主程序)
async def main():
# 启动后台任务
asyncio.create_task(background_logger())
print(" 开始执行任务系统...")
results = await run_tasks(20)
print("\n 所有任务执行完毕!")
print(f" 成功任务数:{sum(1 for r in results if r)}")
print(f" 失败任务数:{sum(1 for r in results if not r)}")
asyncio.run(main())
八、执行后的示例输出
开始执行任务系统...
任务 0 完成,用时:1.23s
任务 3 完成,用时:0.78s
任务 4 失败:任务 4 执行失败
[后台] 系统心跳正常
任务 8 完成,用时:1.01s
...
所有任务执行完毕!成功任务数:16
失败任务数:4
九、总结
本篇我们整合了前面所有内容,构建了一个异步任务系统,实现了:
- create_task 并发调度
- gather 汇总结果
- 异常可控
- 后台运行任务
- 并发数量限制
- 真实项目常见结构
这类结构在真实生产环境非常常见,例如:
- 异步爬虫系统
- 批量文件处理
- 批量 API 调用
- 后端服务中的任务调度器
正文完