Python基础入门 Day124 异步任务超时控制与重试机制实现方案

3次阅读
没有评论

共计 2210 个字符,预计需要花费 6 分钟才能阅读完成。

在前面的异步内容中,我们已经可以高效地执行大量异步任务。本篇将进一步提升系统可靠性:
学习 如何给异步任务设置超时 ,以及 如何在失败时自动重试

这是构建高稳定性系统的重要能力,尤其适用于:

  • 网络请求失败
  • 文件读写不稳定
  • API 超时
  • 必须确保任务最终完成的业务场景

一、为什么需要超时与重试?

真实环境中任务经常会出现:

  • 等待过久(网络堵塞、服务器卡住)
  • 请求失败(超时、断线、服务器错误)
  • 随机异常(短暂的服务不可用)

如果没有重试机制,整个系统可能会因为一次偶发错误而失败。


二、使用 asyncio.wait_for 设置超时

import asyncio

async def slow_task():
    await asyncio.sleep(3)
    return "OK"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=1)
        print(" 结果:", result)
    except asyncio.TimeoutError:
        print(" 任务超时!")

asyncio.run(main())

说明:

  • wait_for 限制任务最长运行时间
  • 超时会抛出 TimeoutError

三、实现一个自动重试的函数(重点)

async def retry(coro_func, retries=3, delay=1, timeout=None):
    for attempt in range(1, retries + 1):
        try:
            if timeout:
                return await asyncio.wait_for(coro_func(), timeout=timeout)
            else:
                return await coro_func()
        except Exception as e:
            print(f" 第 {attempt} 次失败:{e}")
            if attempt < retries:
                await asyncio.sleep(delay)
            else:
                raise

功能说明:

  • retries:最大重试次数
  • delay:失败后等待时间
  • timeout:超时时间
  • 多次失败后抛出最终异常

四、配合异步任务使用重试机制

示例:一个随机失败的任务

import random

async def unstable_task():
    await asyncio.sleep(0.5)
    if random.random() < 0.5:
        raise ValueError(" 随机失败 ")
    return " 成功!"

结合重试:

async def main():
    result = await retry(unstable_task, retries=5, delay=0.5)
    print(" 最终结果:", result)

asyncio.run(main())

效果:

  • 会不断重试直到成功
  • 如果全部失败,会抛出最终异常

五、在并发任务中使用重试(非常常用)

async def worker(i):
    try:
        result = await retry(unstable_task, retries=3, timeout=1)
        print(f" 任务 {i} 成功:{result}")
    except Exception:
        print(f" 任务 {i} 最终失败!")

批量执行:

async def main():
    tasks = [asyncio.create_task(worker(i)) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

六、让重试更智能:指数退避(高级策略)

指数退避可避免高频重试压垮服务器。

公式:

delay = base_delay * (2 ** attempt)

实现:

async def retry_backoff(coro_func, retries=3, base_delay=0.5):
    for attempt in range(1, retries + 1):
        try:
            return await coro_func()
        except Exception as e:
            wait = base_delay * (2 ** (attempt - 1))
            print(f" 失败:{e},等待 {wait:.2f}s 后重试 ")
            await asyncio.sleep(wait)

应用场景:

  • API 限流
  • 大规模爬虫
  • 服务端负载压力大

七、综合案例:带超时 + 重试的高并发请求系统

async def robust_worker(i):
    async def wrapper():
        await asyncio.sleep(0.2)
        if random.random() < 0.3:
            raise RuntimeError(" 任务失败 ")
        return f" 任务 {i} 完成 "

    try:
        result = await retry(wrapper, retries=5, timeout=1)
        print(result)
    except Exception:
        print(f" 任务 {i} 失败超过重试次数!")

批量执行:

async def main():
    tasks = [asyncio.create_task(robust_worker(i)) for i in range(20)]
    await asyncio.gather(*tasks)

asyncio.run(main())

八、小结

本篇学习了:

  • 如何给异步任务设置超时(wait_for)
  • 如何实现可配置的重试机制
  • 如何结合重试提高任务成功率
  • 如何对大量任务进行批量重试
  • 如何使用指数退避提升系统稳定性

超时控制与重试机制是 生产级异步系统 的关键特性。

正文完
 0
评论(没有评论)