Python 处理大型 CSV 文件:内存优化与分块读取实战

77次阅读
没有评论

共计 5422 个字符,预计需要花费 14 分钟才能阅读完成。

刚接手一个数据分析项目时,发现同事还在用 pd.read_csv() 一次性读取几个 G 的 CSV 文件,结果内存爆了,电脑直接卡死。其实针对大型 CSV 文件,咱们有很多巧妙的方法可以避免内存溢出,今天就来分享几个我在实际项目中亲测有效的技巧,让你处理大文件也能游刃有余。

步骤一:预估内存消耗与选择合适的数据类型

处理大型 CSV 文件时,最常见的问题就是内存占用过高。Pandas 默认的数据类型(如 int64float64object)往往会比实际所需占用更多内存。所以,第一步是“知己知彼”,了解数据类型并进行优化。

实操:

  1. 读取少量数据进行预估:
    我们不可能把整个大文件读进来分析,所以先用 nrows 参数读取少量数据作为样本。

    import pandas as pd
    import numpy as np
    import sys
    import os
    
    # --- 模拟一个大型 CSV 文件 (用于演示,实际操作请替换为你的大文件路径) ---
    file_path = 'large_demo_data.csv'
    num_rows = 5000000 # 500 万行
    num_cols = 10
    data = {}
    for i in range(num_cols):
        if i % 3 == 0: # 整数列,范围较小
            data[f'id_{i}'] = np.random.randint(0, 10000, num_rows)
        elif i % 3 == 1: # 字符串列,低基数
            data[f'category_{i}'] = np.random.choice(['A', 'B', 'C', 'D', 'E'], num_rows)
        else: # 浮点数列
            data[f'value_{i}'] = np.random.rand(num_rows) * 1000
    
    df_sample_gen = pd.DataFrame(data)
    df_sample_gen.to_csv(file_path, index=False)
    print(f"已生成模拟大型 CSV 文件: {file_path}, 大小: {os.path.getsize(file_path) / (1024**2):.2f} MB")
    # ----------------------------------------------------------------------
    
    print("n--- 步骤一:预估内存消耗与选择合适的数据类型 ---")
    
    try:
        sample_df = pd.read_csv(file_path, nrows=100000) # 第一次处理一个 5GB 的 CSV 时,没注意数据类型,直接 `read_csv` 就 OOM 了,后来才发现原来 int64 和 object 类型占了好多内存。print("n 采样数据初步信息 ( 未优化前):")
        sample_df.info(memory_usage='deep') # deep=True 会更准确计算字符串内存
    
        # 2. 查看数据类型分布及优化建议
        print("n 建议优化前的数据类型:")
        print(sample_df.dtypes)
    
        # 手动构建 dtypes 映射
        optimized_dtypes = {}
        for col in sample_df.columns:
            if 'int' in str(sample_df[col].dtype):
                min_val, max_val = sample_df[col].min(), sample_df[col].max()
                # 根据值的范围选择最小的整数类型
                if min_val >= np.iinfo(np.int8).min and max_val <= np.iinfo(np.int8).max:
                    optimized_dtypes[col] = 'int8'
                elif min_val >= np.iinfo(np.int16).min and max_val <= np.iinfo(np.int16).max:
                    optimized_dtypes[col] = 'int16'
                elif min_val >= np.iinfo(np.int32).min and max_val <= np.iinfo(np.int32).max:
                    optimized_dtypes[col] = 'int32'
                else:
                    optimized_dtypes[col] = 'int64' # 实在不行就 int64
            elif 'float' in str(sample_df[col].dtype):
                optimized_dtypes[col] = 'float32' # 大多数场景 float32 足够精度
            elif 'object' in str(sample_df[col].dtype):
                # 对于低基数字符串(唯一值数量远小于总行数),转为 category 省内存
                if sample_df[col].nunique() < len(sample_df) / 2: # 比如唯一值少于一半行数
                    optimized_dtypes[col] = 'category'
                else:
                    optimized_dtypes[col] = 'object' # 如果基数高,可能转 category 效果不明显甚至更慢
    
        print("n 根据采样数据分析得到的优化后数据类型:")
        print(pd.Series(optimized_dtypes))
    
        # 3. 尝试用优化后的 dtypes 读取少量数据并比较内存
        # 提醒下,类型推断不是万能的,尤其是遇到混合类型列时,Pandas 可能会默认更宽泛的类型,手动指定是更保险的做法。sample_df_optimized = pd.read_csv(file_path, nrows=100000, dtype=optimized_dtypes)
        print("n 优化后采样数据内存信息:")
        sample_df_optimized.info(memory_usage='deep')
    
        initial_memory = sample_df.memory_usage(deep=True).sum()
        optimized_memory = sample_df_optimized.memory_usage(deep=True).sum()
        print(f"n 内存优化效果: 从 {initial_memory / (1024**2):.2f} MB 减少到 {optimized_memory / (1024**2):.2f} MB")
    
    except Exception as e:
        print(f"处理步骤一发生错误: {e}")

    小提醒: 类型推断不是万能的,尤其是遇到混合类型列或特殊编码时,Pandas 可能会默认更宽泛的类型,手动指定是更保险的做法。对于非常大的文件,即使优化了数据类型,一次性读取也可能超出内存限制。

步骤二:分块读取大型 CSV

当文件实在太大,即使优化了数据类型也无法一次性加载到内存时,分块读取(chunking)就是你的救星。Pandas 允许你指定 chunksize 参数,将大文件分割成多个小块(DataFrame),然后逐块处理。

实操:

print("n--- 步骤二:分块读取大型 CSV ---")

total_rows_processed = 0
# 这里我们假设要计算 'id_0' 列的总和,并统计 'category_1' 中 'A' 出现的次数
total_id_0_sum = 0
category_A_count = 0
chunk_size = 100000 # 每次读取 10 万行

print(f"n 开始分块读取,每次处理 {chunk_size} 行...")
try:
    # 之前有次需要统计一个超大日志文件的词频,如果不是用 chunksize,根本没法在普通电脑上跑起来,分块处理简直是神器。# 使用上一步分析出来的 optimized_dtypes 可以让每个 chunk 也更省内存
    for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size, dtype=optimized_dtypes)):
        # 对每个 chunk 进行处理,这里我们简单计算 'id_0' 列的和及 'category_1' 中 'A' 的数量
        total_rows_processed += len(chunk)
        if 'id_0' in chunk.columns:
            total_id_0_sum += chunk['id_0'].sum()
        if 'category_1' in chunk.columns:
            category_A_count += (chunk['category_1'] == 'A').sum()

        print(f"已处理 {i+1} 个 chunk,当前总行数: {total_rows_processed}")

    print(f"n 分块处理完成!总行数: {total_rows_processed},'id_0'列总和: {total_id_0_sum},'category_1'中'A'的计数: {category_A_count}")

except Exception as e:
    print(f"处理步骤二发生错误: {e}")

小提醒: 大家在处理每个 chunk 时,尽量只保留需要的列,并及时释放不再使用的内存(例如,不要将所有 chunk 都存入一个大列表),防止累积效应导致最终还是内存溢出。如果只需要聚合结果,只保留聚合结果即可。

步骤三:使用原生 csv 模块进行轻量级处理

不是所有的数据处理任务都需要 Pandas 这样的“重型武器”。对于一些简单的任务,比如按行读取、过滤特定字段、计数等,Python 的原生 csv 模块往往是更高效、内存占用更低的解决方案。

实操:

import csv
print("n--- 步骤三:使用原生 `csv` 模块进行轻量级处理 ---")

line_count = 0
target_id_count = 0 # 假设我们要计数 'id_0' 列大于 5000 的行
target_column_index = -1 # 初始化为无效索引

print("n 开始使用原生 csv 模块处理...")
try:
    # 有个场景只需要遍历文件,做一些简单过滤和计数,如果还引入 Pandas,内存占用反而更高,后来发现原生的 `csv` 模块效率更高,省了不少资源。with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        header = next(reader) # 读取表头

        # 找到 'id_0' 列的索引
        if 'id_0' in header:
            target_column_index = header.index('id_0')
        else:
            print("Warning:'id_0'not found in header, skipping target ID count.")


        for row in reader:
            line_count += 1
            if target_column_index != -1 and len(row) > target_column_index:
                try:
                    # 注意:从 CSV 读取的数据都是字符串,需要手动转换类型
                    if int(row[target_column_index]) > 5000:
                        target_id_count += 1
                except ValueError:
                    # 忽略非数字的转换错误,或者进行更复杂的错误处理
                    pass

    # 这个方法虽然内存占用低,但如果你需要复杂的表格操作(如 join, merge, pivot),Pandas 依然是首选,灵活选择工具很重要。print(f"n 原生 csv 模块处理完成!总行数 ( 不含表头): {line_count},'id_0'列大于 5000 的行数: {target_id_count}")

except Exception as e:
    print(f"处理步骤三发生错误: {e}")
finally:
    # 清理模拟文件
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"n 已删除模拟文件: {file_path}")

小提醒: 原生 csv 模块虽然内存占用低、速度快,但它不提供 Pandas 那样方便的数据结构和丰富的操作函数。所以,如果你需要复杂的表格操作(如数据透视、连接、合并),Pandas 依然是首选;如果只是简单的行处理或统计,csv 模块会让你事半功倍。

常见误区

  1. 不预估内存,直接 read_csv 大文件: 我刚开始学 Pandas 时,总觉得 read_csv 万能,结果一遇到大文件就卡死或者 OOM,调试半天才发现是内存爆了。这是新手最常犯的错误,也是最致命的。
  2. 忽略数据类型优化: 很多人以为 Pandas 会自动优化数据类型,后来才发现 object 类型(用于存储字符串)是内存大户,而 int64float64 也有很大的压缩空间。手动转 category 或更小的 int/float 能省好几倍内存,在处理百万级甚至千万级数据时,这差异非常明显。
  3. 分块处理后未及时清理中间变量: 很多人分块处理后,会把每个 chunk 的结果不加思索地存入一个列表或新的 DataFrame,导致最终所有 chunk 的数据又积累在内存里,失去了分块的意义。正确的做法是,处理完一个 chunk 后,只保留聚合结果或立即写入磁盘,释放该 chunk 占用的内存。

经验总结

处理大型 CSV 文件,核心在于“精打细算”:从源头预估内存、针对性优化数据类型、灵活运用分块读取策略,以及在合适的场景选择合适的工具(Pandas 或原生 csv 模块),才能让你的 Python 代码跑得更稳更快。

你平时在处理大文件时,还遇到过哪些坑?欢迎在评论区分享你的经验!

正文完
 0
评论(没有评论)