共计 3083 个字符,预计需要花费 8 分钟才能阅读完成。
在构建大规模异步爬虫的过程中,任务管理是整个系统的关键组成部分。如果任务仅保存在内存中,一旦程序中断(异常退出、服务器重启、网络波动等),未完成的任务就会全部丢失,这对于多站点、多层级抓取尤其致命。因此,本节我们将为框架加入“任务持久化”与“失败恢复机制”,使爬虫具备断点续爬的能力。
一、为什么必须做任务持久化?
当爬虫规模扩大后,任务列表的数量常常达到数万甚至数百万。此时,单纯依赖内存队列存在明显问题:
- 程序崩溃时全部任务丢失
- 无法进行跨进程或跨机器协作
- 多步爬虫中间状态无法记录(如下一页、详情页未爬)
- 无法统计任务进度、剩余任务量
任务持久化可将任务信息写入数据库,使爬虫在任意节点中断后仍可恢复运行。
二、持久化存储的选型
常用持久化方案包括:
| 存储 | 优点 | 缺点 |
|---|---|---|
| SQLite | 简单、无需外部依赖、适合单机 | 并发性能有限 |
| MySQL/PostgreSQL | 可靠、高并发、可扩展 | 维护成本高 |
| Redis | 原生支持队列结构,适合高并发 | 数据持久性取决于配置 |
| MongoDB | 文档结构友好,灵活度高 | 多进程写入需谨慎 |
对于教学与中小项目,推荐使用 SQLite 或 Redis。对于生产爬虫,可考虑 MySQL + Redis 组合。
本节以 SQLite 示例。
三、任务数据结构设计
一个任务至少需要记录以下信息:
- id:任务唯一标识
- url:目标链接
- status:pending/running/failed/done
- retries:已重试次数
- create_time/update_time:用于统计与管理
任务表示例:
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
status TEXT DEFAULT 'pending',
retries INTEGER DEFAULT 0,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
四、将任务入库(生产者)
爬虫启动时可以提前写入任务,也可以在运行中动态生成:
import sqlite3
def add_task(url):
conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO tasks (url, status) VALUES (?, 'pending')",
(url,)
)
conn.commit()
conn.close()
可以批量初始化:
for url in start_urls:
add_task(url)
五、从数据库读取任务(消费者)
异步爬虫需要不断从持久化存储中提取任务:
def fetch_pending_task(limit=10):
conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()
cursor.execute(
"SELECT id, url FROM tasks WHERE status='pending' LIMIT ?",
(limit,)
)
tasks = cursor.fetchall()
conn.close()
return tasks
通常会在提取任务后立即将状态标记为 running:
def mark_running(task_id):
conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET status='running', update_time=CURRENT_TIMESTAMP WHERE id=?",
(task_id,)
)
conn.commit()
conn.close()
六、在异步框架中执行任务
将数据库任务接入我们已有的异步调度器:
async def worker(session):
while True:
tasks = fetch_pending_task(limit=1)
if not tasks:
await asyncio.sleep(1)
continue
task_id, url = tasks[0]
mark_running(task_id)
try:
html = await fetch(session, url)
mark_done(task_id)
except Exception:
mark_failed(task_id)
其中:
def mark_done(task_id):
conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET status='done', update_time=CURRENT_TIMESTAMP WHERE id=?",
(task_id,)
)
conn.commit()
conn.close()
七、失败恢复机制
定义失败逻辑:
- 请求异常 → 重试次数 +1
- 达到最大重试次数 → 标记为 failed
- 未达到重试上限 → 放回 pending 队列
MAX_RETRY = 3
def mark_failed(task_id):
conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET retries = retries + 1 WHERE id=?",
(task_id,)
)
conn.commit()
cursor.execute(
"SELECT retries FROM tasks WHERE id=?",
(task_id,)
)
retry_times = cursor.fetchone()[0]
if retry_times >= MAX_RETRY:
cursor.execute(
"UPDATE tasks SET status='failed', update_time=CURRENT_TIMESTAMP WHERE id=?",
(task_id,)
)
else:
cursor.execute(
"UPDATE tasks SET status='pending', update_time=CURRENT_TIMESTAMP WHERE id=?",
(task_id,)
)
conn.commit()
conn.close()
这样任务可以自动参与“失败 → 重试 → 最终判定”的生命周期。
八、断点续爬的完整流程
当程序重启时,只需重新启动 worker,系统将自动:
- 读取 pending 任务
- 继续处理 running 任务(可选择重新标为 pending)
- 对 failed 的任务统计或人工检查
一个典型的恢复流程是:
UPDATE tasks SET status='pending' WHERE status='running';
这样系统即可从中断的位置继续执行。
九、任务队列优化与高级特性
在系统规模扩大后,可以引入更多能力:
1. 优先级任务队列
增加 priority 字段,使用 ORDER BY 优先选取优先级高的任务。
2. 多进程或分布式支持
使用 Redis 的 list、sorted set 或 stream 实现跨机器任务队列。
3. 任务去重机制
为 url 加唯一索引,避免重复任务写入。
4. 动态任务生成机制
在解析页面时自动添加新任务到数据库,形成闭环。
5. 任务监控面板
统计:
- 已完成任务数
- 剩余任务数
- 失败任务数
- 重试次数峰值
- 平均响应时间
可使用 Streamlit 或简单 Web 服务展示。
十、小结
本节实现了异步爬虫框架中关键的“任务持久化”与“失败恢复机制”,让爬虫具备断点续爬、自动重试、异常自愈等能力,为后续构建可扩展的分布式调度体系奠定基础。