共计 2210 个字符,预计需要花费 6 分钟才能阅读完成。
在前面的异步内容中,我们已经可以高效地执行大量异步任务。本篇将进一步提升系统可靠性:
学习 如何给异步任务设置超时 ,以及 如何在失败时自动重试。
这是构建高稳定性系统的重要能力,尤其适用于:
- 网络请求失败
- 文件读写不稳定
- API 超时
- 必须确保任务最终完成的业务场景
一、为什么需要超时与重试?
真实环境中任务经常会出现:
- 等待过久(网络堵塞、服务器卡住)
- 请求失败(超时、断线、服务器错误)
- 随机异常(短暂的服务不可用)
如果没有重试机制,整个系统可能会因为一次偶发错误而失败。
二、使用 asyncio.wait_for 设置超时
import asyncio
async def slow_task():
await asyncio.sleep(3)
return "OK"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=1)
print(" 结果:", result)
except asyncio.TimeoutError:
print(" 任务超时!")
asyncio.run(main())
说明:
wait_for限制任务最长运行时间- 超时会抛出
TimeoutError
三、实现一个自动重试的函数(重点)
async def retry(coro_func, retries=3, delay=1, timeout=None):
for attempt in range(1, retries + 1):
try:
if timeout:
return await asyncio.wait_for(coro_func(), timeout=timeout)
else:
return await coro_func()
except Exception as e:
print(f" 第 {attempt} 次失败:{e}")
if attempt < retries:
await asyncio.sleep(delay)
else:
raise
功能说明:
retries:最大重试次数delay:失败后等待时间timeout:超时时间- 多次失败后抛出最终异常
四、配合异步任务使用重试机制
示例:一个随机失败的任务
import random
async def unstable_task():
await asyncio.sleep(0.5)
if random.random() < 0.5:
raise ValueError(" 随机失败 ")
return " 成功!"
结合重试:
async def main():
result = await retry(unstable_task, retries=5, delay=0.5)
print(" 最终结果:", result)
asyncio.run(main())
效果:
- 会不断重试直到成功
- 如果全部失败,会抛出最终异常
五、在并发任务中使用重试(非常常用)
async def worker(i):
try:
result = await retry(unstable_task, retries=3, timeout=1)
print(f" 任务 {i} 成功:{result}")
except Exception:
print(f" 任务 {i} 最终失败!")
批量执行:
async def main():
tasks = [asyncio.create_task(worker(i)) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
六、让重试更智能:指数退避(高级策略)
指数退避可避免高频重试压垮服务器。
公式:
delay = base_delay * (2 ** attempt)
实现:
async def retry_backoff(coro_func, retries=3, base_delay=0.5):
for attempt in range(1, retries + 1):
try:
return await coro_func()
except Exception as e:
wait = base_delay * (2 ** (attempt - 1))
print(f" 失败:{e},等待 {wait:.2f}s 后重试 ")
await asyncio.sleep(wait)
应用场景:
- API 限流
- 大规模爬虫
- 服务端负载压力大
七、综合案例:带超时 + 重试的高并发请求系统
async def robust_worker(i):
async def wrapper():
await asyncio.sleep(0.2)
if random.random() < 0.3:
raise RuntimeError(" 任务失败 ")
return f" 任务 {i} 完成 "
try:
result = await retry(wrapper, retries=5, timeout=1)
print(result)
except Exception:
print(f" 任务 {i} 失败超过重试次数!")
批量执行:
async def main():
tasks = [asyncio.create_task(robust_worker(i)) for i in range(20)]
await asyncio.gather(*tasks)
asyncio.run(main())
八、小结
本篇学习了:
- 如何给异步任务设置超时(wait_for)
- 如何实现可配置的重试机制
- 如何结合重试提高任务成功率
- 如何对大量任务进行批量重试
- 如何使用指数退避提升系统稳定性
超时控制与重试机制是 生产级异步系统 的关键特性。
正文完