共计 1323 个字符,预计需要花费 4 分钟才能阅读完成。
在前一篇中,我们通过优先级调度解决了“先执行谁”的问题。本篇将继续补齐调度体系中的另一个关键维度——时间 。在真实系统中,任务并不总是“立即执行”,而是经常需要 延迟、定时或周期性执行。
一、为什么异步系统必须支持时间调度
常见工程场景包括:
- 失败任务延迟重试
- 定时数据同步
- 过期状态清理
- 周期性健康检查
如果没有统一的时间调度机制,这些需求往往会被零散、不可控地实现,最终演变为系统复杂度失控。
二、最简单的延迟执行方式
在 asyncio 中,最直观的延迟执行方式是 asyncio.sleep。
async def delayed_task(delay, coro):
await asyncio.sleep(delay)
return await coro
这种方式适合:
- 数量少
- 延迟时间短
- 不需要持久化
但它并不适合大规模任务调度。
三、基于时间戳的延迟队列模型
在工程实践中,更通用的做法是 延迟队列。
任务不再“立刻可执行”,而是携带一个 run_at 时间戳。
import time
class Task:
def __init__(self, task_id, payload, run_at):
self.task_id = task_id
self.payload = payload
self.run_at = run_at
调度器根据时间判断是否可以执行。
四、使用 PriorityQueue 实现延迟调度
可以将 run_at 作为优先级,实现最小延迟调度:
queue = asyncio.PriorityQueue()
await queue.put((task.run_at, task))
调度循环示例:
async def scheduler(queue):
while True:
run_at, task = await queue.get()
now = time.time()
if run_at > now:
await asyncio.sleep(run_at - now)
await dispatch(task)
这种方式可以统一处理:
- 延迟任务
- 定时任务
- 重试任务
五、周期性任务的抽象模型
周期任务可以视为“执行完成后重新入队的延迟任务”。
async def periodic_task(interval, task):
while True:
await execute(task)
await asyncio.sleep(interval)
在工程系统中,周期任务通常也会被建模为 Task,并统一纳入调度体系,避免“特殊代码路径”。
六、时间调度中的稳定性问题
时间调度系统必须关注以下问题:
- 时间漂移
- 大量任务同时到期
- 系统重启后的任务恢复
工程上常见策略包括:
- 批量唤醒控制
- 到期任务限流
- 调度状态持久化
这些设计决定了系统在长期运行中的可靠性。
七、延迟执行与前文能力的协同
你会发现,延迟调度并不是孤立能力,而是与前文紧密结合:
- 重试 → 延迟执行
- 熔断恢复 → 定时探测
- 优先级 → 时间 + 权重综合调度
这正是系统工程的特点:
能力之间彼此叠加,而不是相互替代。
八、从“立即执行”到“时间驱动系统”
当系统开始主动管理“时间”,它已经不再是简单的任务执行器,而是演进为:
- 具备节奏控制能力
- 能主动安排未来行为
- 支持长期自治运行的系统
下一篇,我们将继续完成调度体系的最后一块拼图:
异步系统中的任务恢复与容错——系统重启后如何继续工作。