Python基础入门 Day125 构建异步爬虫框架:任务调度、限速、重试与存储全流程

2次阅读
没有评论

共计 2345 个字符,预计需要花费 6 分钟才能阅读完成。

前几篇我们学习了异步任务、超时控制、重试机制与队列。本篇将把这些能力整合起来,实现一个“小型异步爬虫框架雏形”。

你将学到:

  • 如何调度大量 URL
  • 如何使用队列实现爬虫流水线
  • 如何设置限速(限制 QPS)
  • 如何为请求设置超时与重试
  • 如何异步保存数据
  • 如何构建完整爬虫架构

这是一个非常实用的专题,也是实际爬虫框架的核心机制。


一、项目功能描述

构建一个异步爬虫系统,具备以下能力:

  1. 任务队列(URL)
  2. 异步请求网页内容
  3. 重试机制(失败任务不丢失)
  4. 限速功能(控制每秒请求数)
  5. 多消费者协同工作
  6. 异步保存数据到文件

这是一个真正能投入使用的爬虫基础框架!


二、项目库依赖

pip install aiohttp aiofiles

三、URL 生产者(放入待爬取任务)

import asyncio

async def producer(queue, urls):
    for url in urls:
        await queue.put(url)
        print(f"[ 生产] 添加任务:{url}")
    await queue.put(None)  # 结束标记

四、带重试与超时的请求函数(核心)

import aiohttp
import asyncio

async def fetch(session, url, timeout=5, retries=3):
    for attempt in range(1, retries + 1):
        try:
            async with session.get(url, timeout=timeout) as resp:
                return await resp.text()
        except Exception as e:
            print(f"[ 失败] {url} 第 {attempt} 次失败:{e}")
            if attempt == retries:
                return None
            await asyncio.sleep(0.5 * attempt)  # 简单指数退避

五、限速控制(速率限制器)

限制 QPS(每秒几次请求)的方法:

rate_sem = asyncio.Semaphore(5)  # 每秒最多 5 个请求

async def rate_limit():
    async with rate_sem:
        await asyncio.sleep(0.2)  # 控制速率(1 秒大约 5 次)

六、消费者:爬取网页并写入文件

import aiofiles
import re

async def consumer(queue, session):
    while True:
        url = await queue.get()

        if url is None:
            queue.task_done()
            # 继续传播结束标记给其他消费者
            await queue.put(None)
            break

        await rate_limit()  # 限速
        html = await fetch(session, url)

        if html:
            # 提取标题
            title_match = re.search(r"<title>(.*?)</title>", html, re.S)
            title = title_match.group(1).strip() if title_match else " 未找到标题 "

            # 保存结果
            async with aiofiles.open("result.txt", "a", encoding="utf-8") as f:
                await f.write(f"{url} - {title}\n")

            print(f"[ 成功] {url} - 标题:{title}")

        else:
            print(f"[ 失败] {url} 最终未成功 ")

        queue.task_done()

七、主程序(整合爬虫系统)

async def main():
    urls = [
        "https://www.python.org/",
        "https://www.github.com/",
        "https://www.wikipedia.org/",
        "https://www.baidu.com/",
        "https://www.bing.com/",
    ]

    queue = asyncio.Queue()

    async with aiohttp.ClientSession() as session:
        prod = asyncio.create_task(producer(queue, urls))

        consumers = [asyncio.create_task(consumer(queue, session))
            for _ in range(3)  # 三个消费者并发爬取
        ]

        await prod
        await queue.join()

        for c in consumers:
            await c

asyncio.run(main())

八、运行结果示例

[生产] 添加任务:https://www.python.org/
[生产] 添加任务:https://www.github.com/
[成功] https://www.python.org/ - 标题:Welcome to Python.org
[成功] https://www.github.com/ - 标题:GitHub: Let’s build from here
...

生成的 result.txt 内容:

https://www.python.org/ - Welcome to Python.org
https://www.github.com/ - GitHub: Let’s build from here
...

九、框架扩展方向

按照目前结构,你可以轻松扩展:

  • 增加解析器(提取更多字段)
  • 爬取图片并异步保存
  • 加入 URL 去重机制
  • 加入日志系统(异步写日志)
  • 用 async Redis 存储 URL
  • 构建分布式爬虫队列
  • 实现页面深度爬取(爬完自动继续添加新链接)

甚至,你可以在此基础上做一个 迷你版 aiohttp 爬虫框架


十、小结

本篇我们构建了一个可工作的异步爬虫框架,学习了:

  • 生产者消费者模型
  • URL 调度
  • 限速控制
  • 超时与重试
  • 异步保存文件
  • 使用多个消费者并发爬取

这是一个非常关键的异步编程综合案例。

正文完
 0
评论(没有评论)