共计 2345 个字符,预计需要花费 6 分钟才能阅读完成。
前几篇我们学习了异步任务、超时控制、重试机制与队列。本篇将把这些能力整合起来,实现一个“小型异步爬虫框架雏形”。
你将学到:
- 如何调度大量 URL
- 如何使用队列实现爬虫流水线
- 如何设置限速(限制 QPS)
- 如何为请求设置超时与重试
- 如何异步保存数据
- 如何构建完整爬虫架构
这是一个非常实用的专题,也是实际爬虫框架的核心机制。
一、项目功能描述
构建一个异步爬虫系统,具备以下能力:
- 任务队列(URL)
- 异步请求网页内容
- 重试机制(失败任务不丢失)
- 限速功能(控制每秒请求数)
- 多消费者协同工作
- 异步保存数据到文件
这是一个真正能投入使用的爬虫基础框架!
二、项目库依赖
pip install aiohttp aiofiles
三、URL 生产者(放入待爬取任务)
import asyncio
async def producer(queue, urls):
for url in urls:
await queue.put(url)
print(f"[ 生产] 添加任务:{url}")
await queue.put(None) # 结束标记
四、带重试与超时的请求函数(核心)
import aiohttp
import asyncio
async def fetch(session, url, timeout=5, retries=3):
for attempt in range(1, retries + 1):
try:
async with session.get(url, timeout=timeout) as resp:
return await resp.text()
except Exception as e:
print(f"[ 失败] {url} 第 {attempt} 次失败:{e}")
if attempt == retries:
return None
await asyncio.sleep(0.5 * attempt) # 简单指数退避
五、限速控制(速率限制器)
限制 QPS(每秒几次请求)的方法:
rate_sem = asyncio.Semaphore(5) # 每秒最多 5 个请求
async def rate_limit():
async with rate_sem:
await asyncio.sleep(0.2) # 控制速率(1 秒大约 5 次)
六、消费者:爬取网页并写入文件
import aiofiles
import re
async def consumer(queue, session):
while True:
url = await queue.get()
if url is None:
queue.task_done()
# 继续传播结束标记给其他消费者
await queue.put(None)
break
await rate_limit() # 限速
html = await fetch(session, url)
if html:
# 提取标题
title_match = re.search(r"<title>(.*?)</title>", html, re.S)
title = title_match.group(1).strip() if title_match else " 未找到标题 "
# 保存结果
async with aiofiles.open("result.txt", "a", encoding="utf-8") as f:
await f.write(f"{url} - {title}\n")
print(f"[ 成功] {url} - 标题:{title}")
else:
print(f"[ 失败] {url} 最终未成功 ")
queue.task_done()
七、主程序(整合爬虫系统)
async def main():
urls = [
"https://www.python.org/",
"https://www.github.com/",
"https://www.wikipedia.org/",
"https://www.baidu.com/",
"https://www.bing.com/",
]
queue = asyncio.Queue()
async with aiohttp.ClientSession() as session:
prod = asyncio.create_task(producer(queue, urls))
consumers = [asyncio.create_task(consumer(queue, session))
for _ in range(3) # 三个消费者并发爬取
]
await prod
await queue.join()
for c in consumers:
await c
asyncio.run(main())
八、运行结果示例
[生产] 添加任务:https://www.python.org/
[生产] 添加任务:https://www.github.com/
[成功] https://www.python.org/ - 标题:Welcome to Python.org
[成功] https://www.github.com/ - 标题:GitHub: Let’s build from here
...
生成的 result.txt 内容:
https://www.python.org/ - Welcome to Python.org
https://www.github.com/ - GitHub: Let’s build from here
...
九、框架扩展方向
按照目前结构,你可以轻松扩展:
- 增加解析器(提取更多字段)
- 爬取图片并异步保存
- 加入 URL 去重机制
- 加入日志系统(异步写日志)
- 用 async Redis 存储 URL
- 构建分布式爬虫队列
- 实现页面深度爬取(爬完自动继续添加新链接)
甚至,你可以在此基础上做一个 迷你版 aiohttp 爬虫框架 。
十、小结
本篇我们构建了一个可工作的异步爬虫框架,学习了:
- 生产者消费者模型
- URL 调度
- 限速控制
- 超时与重试
- 异步保存文件
- 使用多个消费者并发爬取
这是一个非常关键的异步编程综合案例。
正文完