Python基础入门 Day127 构建异步爬虫框架:任务持久化与失败恢复机制

5次阅读
没有评论

共计 3083 个字符,预计需要花费 8 分钟才能阅读完成。

在构建大规模异步爬虫的过程中,任务管理是整个系统的关键组成部分。如果任务仅保存在内存中,一旦程序中断(异常退出、服务器重启、网络波动等),未完成的任务就会全部丢失,这对于多站点、多层级抓取尤其致命。因此,本节我们将为框架加入“任务持久化”与“失败恢复机制”,使爬虫具备断点续爬的能力。

一、为什么必须做任务持久化?

当爬虫规模扩大后,任务列表的数量常常达到数万甚至数百万。此时,单纯依赖内存队列存在明显问题:

  1. 程序崩溃时全部任务丢失
  2. 无法进行跨进程或跨机器协作
  3. 多步爬虫中间状态无法记录(如下一页、详情页未爬)
  4. 无法统计任务进度、剩余任务量

任务持久化可将任务信息写入数据库,使爬虫在任意节点中断后仍可恢复运行。

二、持久化存储的选型

常用持久化方案包括:

存储 优点 缺点
SQLite 简单、无需外部依赖、适合单机 并发性能有限
MySQL/PostgreSQL 可靠、高并发、可扩展 维护成本高
Redis 原生支持队列结构,适合高并发 数据持久性取决于配置
MongoDB 文档结构友好,灵活度高 多进程写入需谨慎

对于教学与中小项目,推荐使用 SQLite 或 Redis。对于生产爬虫,可考虑 MySQL + Redis 组合。

本节以 SQLite 示例。

三、任务数据结构设计

一个任务至少需要记录以下信息:

  • id:任务唯一标识
  • url:目标链接
  • status:pending/running/failed/done
  • retries:已重试次数
  • create_time/update_time:用于统计与管理

任务表示例:

CREATE TABLE IF NOT EXISTS tasks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    url TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    retries INTEGER DEFAULT 0,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

四、将任务入库(生产者)

爬虫启动时可以提前写入任务,也可以在运行中动态生成:

import sqlite3

def add_task(url):
    conn = sqlite3.connect("tasks.db")
    cursor = conn.cursor()
    cursor.execute("INSERT INTO tasks (url, status) VALUES (?, 'pending')",
        (url,)
    )
    conn.commit()
    conn.close()

可以批量初始化:

for url in start_urls:
    add_task(url)

五、从数据库读取任务(消费者)

异步爬虫需要不断从持久化存储中提取任务:

def fetch_pending_task(limit=10):
    conn = sqlite3.connect("tasks.db")
    cursor = conn.cursor()
    cursor.execute(
        "SELECT id, url FROM tasks WHERE status='pending' LIMIT ?",
        (limit,)
    )
    tasks = cursor.fetchall()
    conn.close()
    return tasks

通常会在提取任务后立即将状态标记为 running:

def mark_running(task_id):
    conn = sqlite3.connect("tasks.db")
    cursor = conn.cursor()
    cursor.execute(
        "UPDATE tasks SET status='running', update_time=CURRENT_TIMESTAMP WHERE id=?",
        (task_id,)
    )
    conn.commit()
    conn.close()

六、在异步框架中执行任务

将数据库任务接入我们已有的异步调度器:

async def worker(session):
    while True:
        tasks = fetch_pending_task(limit=1)
        if not tasks:
            await asyncio.sleep(1)
            continue

        task_id, url = tasks[0]
        mark_running(task_id)

        try:
            html = await fetch(session, url)
            mark_done(task_id)
        except Exception:
            mark_failed(task_id)

其中:

def mark_done(task_id):
    conn = sqlite3.connect("tasks.db")
    cursor = conn.cursor()
    cursor.execute(
        "UPDATE tasks SET status='done', update_time=CURRENT_TIMESTAMP WHERE id=?",
        (task_id,)
    )
    conn.commit()
    conn.close()

七、失败恢复机制

定义失败逻辑:

  1. 请求异常 → 重试次数 +1
  2. 达到最大重试次数 → 标记为 failed
  3. 未达到重试上限 → 放回 pending 队列
MAX_RETRY = 3

def mark_failed(task_id):
    conn = sqlite3.connect("tasks.db")
    cursor = conn.cursor()
    cursor.execute(
        "UPDATE tasks SET retries = retries + 1 WHERE id=?",
        (task_id,)
    )
    conn.commit()

    cursor.execute(
        "SELECT retries FROM tasks WHERE id=?",
        (task_id,)
    )
    retry_times = cursor.fetchone()[0]

    if retry_times >= MAX_RETRY:
        cursor.execute(
            "UPDATE tasks SET status='failed', update_time=CURRENT_TIMESTAMP WHERE id=?",
            (task_id,)
        )
    else:
        cursor.execute(
            "UPDATE tasks SET status='pending', update_time=CURRENT_TIMESTAMP WHERE id=?",
            (task_id,)
        )
    conn.commit()
    conn.close()

这样任务可以自动参与“失败 → 重试 → 最终判定”的生命周期。

八、断点续爬的完整流程

当程序重启时,只需重新启动 worker,系统将自动:

  1. 读取 pending 任务
  2. 继续处理 running 任务(可选择重新标为 pending)
  3. 对 failed 的任务统计或人工检查

一个典型的恢复流程是:

UPDATE tasks SET status='pending' WHERE status='running';

这样系统即可从中断的位置继续执行。

九、任务队列优化与高级特性

在系统规模扩大后,可以引入更多能力:

1. 优先级任务队列

增加 priority 字段,使用 ORDER BY 优先选取优先级高的任务。

2. 多进程或分布式支持

使用 Redis 的 list、sorted set 或 stream 实现跨机器任务队列。

3. 任务去重机制

为 url 加唯一索引,避免重复任务写入。

4. 动态任务生成机制

在解析页面时自动添加新任务到数据库,形成闭环。

5. 任务监控面板

统计:

  • 已完成任务数
  • 剩余任务数
  • 失败任务数
  • 重试次数峰值
  • 平均响应时间

可使用 Streamlit 或简单 Web 服务展示。

十、小结

本节实现了异步爬虫框架中关键的“任务持久化”与“失败恢复机制”,让爬虫具备断点续爬、自动重试、异常自愈等能力,为后续构建可扩展的分布式调度体系奠定基础。

正文完
 0
评论(没有评论)