Python `collections` 模块:告别繁琐字典操作,让数据统计更高效

109次阅读
没有评论

共计 8120 个字符,预计需要花费 21 分钟才能阅读完成。

前阵子帮新同事 review 项目代码时,发现他在处理一组用户行为日志时,为了统计每个用户访问了哪些页面,或者统计每个页面的访问次数,写了一堆 if key in dict: ... else: ... 的判断逻辑。这让我想起我刚入行时也这么干过,但后来深入学习 Python 标准库后,才发现 collections 模块里有几个宝藏工具,能让这些数据聚合和统计操作省下大把时间,代码也更清晰、更优雅。

今天,咱们就来聊聊 collections 模块里,我个人在实际项目中用得最多的两个工具:defaultdictCounter。它们能帮你告别那些冗余的字典初始化和条件判断,让你的数据处理代码焕然一新。


实操一:defaultdict——再见,冗余的字典初始化!

你是不是也经常遇到这样的场景:需要根据某个键来分组数据,然后将对应的值添加到列表中,或者对某个键的值进行累加?最常见的做法,可能会是这样:

user_logs = [{'user_id': 'user_A', 'action': 'view_homepage'},
    {'user_id': 'user_B', 'action': 'add_to_cart'},
    {'user_id': 'user_A', 'action': 'view_product_X'},
    {'user_id': 'user_C', 'action': 'checkout'},
    {'user_id': 'user_B', 'action': 'view_homepage'},
]

user_actions = {}
for log in user_logs:
    user_id = log['user_id']
    action = log['action']
    if user_id not in user_actions:
        user_actions[user_id] = []
    user_actions[user_id].append(action)

print(user_actions)
# 输出: {'user_A': ['view_homepage', 'view_product_X'], 'user_B': ['add_to_cart', 'view_homepage'], 'user_C': ['checkout']}

这段代码工作得很好,但那个 if user_id not in user_actions: ... 的判断,是不是有点碍眼?尤其当这类操作频繁出现时,代码就会显得臃肿。

第一步:理解 defaultdict 的核心魔法

defaultdictdict 的一个子类,它的核心特性是在你尝试访问一个不存在的键时,不会抛出 KeyError,而是会自动为这个键生成一个默认值。这个默认值是通过你初始化 defaultdict 时传入的“工厂函数”来创建的。

最常用的工厂函数就是 Python 内置的 listintset 等类型。

  • defaultdict(list): 如果访问的键不存在,会自动创建一个空列表 [] 作为该键的值。
  • defaultdict(int): 如果访问的键不存在,会自动创建一个整数 0 作为该键的值。
  • defaultdict(set): 如果访问的键不存在,会自动创建一个空集合 {} 作为该键的值。

明白了这个原理,咱们就可以用 defaultdict(list) 来优化上面的用户行为统计代码:

from collections import defaultdict

user_logs = [{'user_id': 'user_A', 'action': 'view_homepage'},
    {'user_id': 'user_B', 'action': 'add_to_cart'},
    {'user_id': 'user_A', 'action': 'view_product_X'},
    {'user_id': 'user_C', 'action': 'checkout'},
    {'user_id': 'user_B', 'action': 'view_homepage'},
]

# 初始化一个 defaultdict,指定当键不存在时,默认值为一个空列表
user_actions = defaultdict(list)
for log in user_logs:
    user_id = log['user_id']
    action = log['action']
    # 访问 user_actions[user_id] 时,如果 user_id 不存在,defaultdict 会自动创建 []
    # 然后再将 action 追加到这个列表中
    user_actions[user_id].append(action)

print(user_actions)
# 输出: defaultdict(<class 'list'>, {'user_A': ['view_homepage', 'view_product_X'], 'user_B': ['add_to_cart', 'view_homepage'], 'user_C': ['checkout']})

# 提醒下:defaultdict 打印出来会带上它的工厂函数信息,但它本质上仍然像一个普通的字典
# 你可以通过 dict(user_actions) 转换成普通字典
print(dict(user_actions))
# 输出: {'user_A': ['view_homepage', 'view_product_X'], 'user_B': ['add_to_cart', 'view_homepage'], 'user_C': ['checkout']}

是不是感觉清爽多了?少了一行判断,代码的可读性和简洁性都得到了提升。我个人平时用 defaultdict 的一个习惯是,尽量把默认值工厂函数写得简洁明了,比如 listint。如果逻辑非常复杂,需要动态创建对象,可以考虑使用 lambda 表达式作为工厂函数,比如 defaultdict(lambda: MyCustomObject())

第二步:实战 defaultdict:更复杂的数据聚合

defaultdict 不仅可以用于列表聚合,也可以用于计数、集合去重等场景。

场景:统计每个用户的总消费金额。

from collections import defaultdict

transactions = [{'user_id': 'user_A', 'amount': 100},
    {'user_id': 'user_B', 'amount': 50},
    {'user_id': 'user_A', 'amount': 200},
    {'user_id': 'user_C', 'amount': 150},
    {'user_id': 'user_B', 'amount': 75},
]

# 初始化一个 defaultdict,指定当键不存在时,默认值为整数 0
user_total_spending = defaultdict(int)
for t in transactions:
    user_id = t['user_id']
    amount = t['amount']
    # 访问 user_total_spending[user_id] 时,如果 user_id 不存在,defaultdict 会自动创建 0
    # 然后再将 amount 累加到这个值上
    user_total_spending[user_id] += amount
    # 这里用 int 作为工厂函数很方便。我以前偷懒,直接用了 int() 而不是 int,# 结果发现统计的是一个 int 类型的实例而不是 0,差点搞出 bug,所以现在习惯直接传入类型作为工厂函数。print(dict(user_total_spending))
# 输出: {'user_A': 300, 'user_B': 125, 'user_C': 150}

场景:从日志中统计每个 IP 访问过的唯一页面。

from collections import defaultdict

access_logs = [{'ip': '192.168.1.1', 'page': '/index'},
    {'ip': '192.168.1.2', 'page': '/about'},
    {'ip': '192.168.1.1', 'page': '/products'},
    {'ip': '192.168.1.3', 'page': '/index'},
    {'ip': '192.168.1.2', 'page': '/about'}, # 重复访问
    {'ip': '192.168.1.1', 'page': '/index'}, # 重复访问
    {'ip': '192.168.1.4', 'page': '/error'}
]

# 初始化一个 defaultdict,指定当键不存在时,默认值为一个空集合
ip_unique_pages = defaultdict(set)
for log in access_logs:
    ip_address = log['ip']
    page = log['page']
    # 这里加 try-except 是因为之前处理一些脏数据时,遇到过 IP 地址为空或者页面路径缺失的情况,# 直接取值会 KeyError,后来养成习惯要防一手,确保数据质量。try:
        ip_address = log['ip']
        page = log['page']
        ip_unique_pages[ip_address].add(page)
    except KeyError as e:
        print(f"警告:日志中缺少键 {e},跳过此条记录。")


print(dict(ip_unique_pages))
# 输出: {'192.168.1.1': {'/index', '/products'}, '192.168.1.2': {'/about'}, '192.168.1.3': {'/index'}, '192.168.1.4': {'/error'}}

实操二:Counter——让计数和频率统计变得简单直观!

当我们需要统计列表中元素的出现次数,或者字符串中字符的出现频率时,Counter 绝对是你的首选。它比你手动写循环计数要高效和简洁得多。

第一步:Counter 的基础用法

Counter 也是 dict 的一个子类,它专门用于 统计可哈希对象的频率。你可以给它传入一个可迭代对象(如列表、元组、字符串等),它会返回一个字典,其中键是元素,值是该元素出现的次数。

from collections import Counter

# 场景一:统计列表中元素的出现次数
fruits = ['apple', 'orange', 'apple', 'banana', 'orange', 'apple', 'grape']
fruit_counts = Counter(fruits)
print(fruit_counts)
# 输出: Counter({'apple': 3, 'orange': 2, 'banana': 1, 'grape': 1})

# 场景二:统计字符串中字符的出现频率
text = "hello world"
char_counts = Counter(text)
print(char_counts)
# 输出: Counter({'l': 3, 'o': 2, 'h': 1, 'e': 1, '': 1,'w': 1,'r': 1,'d': 1})

# 场景三:直接传入一个字典初始化
initial_counts = Counter({'apple': 5, 'banana': 2})
print(initial_counts)
# 输出: Counter({'apple': 5, 'banana': 2})

# 提醒下:刚开始学 Counter 时,我总想着用它统计字典的键值对,# 比如统计不同用户 ID 对应的消费总额,结果发现它只对可哈希对象有效,# 踩了坑才知道要用对场景,对于更复杂的键值对计数,可能需要结合其他方法。

Counterdict 的子类,所以你可以像操作普通字典一样操作它,比如访问某个元素的计数:fruit_counts['apple'] 会返回 3。如果访问不存在的元素,它会返回 0,而不是 KeyError,这在统计时非常方便。

第二步:Counter 的高级应用与组合拳

Counter 不仅仅是统计,它还提供了一些非常实用的方法,以及支持集合操作。

  1. most_common(n):查找出现频率最高的 N 个元素

    这个方法在数据分析中非常有用,比如找出最热门的商品、访问量最高的页面等。

    from collections import Counter
    
    sales_data = ['product_A', 'product_B', 'product_A', 'product_C', 'product_B', 'product_A', 'product_D']
    product_sales = Counter(sales_data)
    
    # 找出销量前两名的商品
    top_2_products = product_sales.most_common(2)
    print(top_2_products)
    # 输出: [('product_A', 3), ('product_B', 2)]

    我试过好几种方法统计 Top N 元素,Counter.most_common() 在处理几十万甚至百万级数据时效率非常高,而且代码简洁,亲测有效,大家可根据数据量和需求选择。

  2. update():累加计数

    update() 方法可以让你将另一个可迭代对象或另一个 Counter 的计数加到当前的 Counter 中。

    from collections import Counter
    
    c1 = Counter(['apple', 'orange', 'apple'])
    print(f"C1 初始计数: {c1}") # Counter({'apple': 2, 'orange': 1})
    
    # 模拟新增的销售数据
    new_sales = ['banana', 'apple', 'orange', 'grape']
    c1.update(new_sales)
    print(f"C1 更新后计数: {c1}") # Counter({'apple': 3, 'orange': 2, 'banana': 1, 'grape': 1})
    
    # 使用另一个 Counter 更新
    c2 = Counter({'apple': 1, 'banana': 2})
    c1.update(c2)
    print(f"C1 再次更新后计数: {c1}") # Counter({'apple': 4, 'banana': 3, 'orange': 2, 'grape': 1})
  3. 集合操作:+, -, &, |

    Counter 对象之间还可以进行类似集合的加减法、交集、并集操作,这在一些特定场景下非常强大。

    • +: 元素计数相加。
    • -: 元素计数相减(结果小于等于 0 的会被移除)。
    • &: 求交集(取两者中较小的计数)。
    • |: 求并集(取两者中较大的计数)。
    from collections import Counter
    
    # 销售日志 A
    sales_a = Counter(['apple', 'banana', 'apple', 'orange']) # Counter({'apple': 2, 'banana': 1, 'orange': 1})
    # 销售日志 B
    sales_b = Counter(['apple', 'grape', 'banana', 'grape']) # Counter({'apple': 1, 'grape': 2, 'banana': 1})
    
    # 总销售量 (相加)
    total_sales = sales_a + sales_b
    print(f"总销售量: {total_sales}")
    # 输出: Counter({'apple': 3, 'banana': 2, 'orange': 1, 'grape': 2})
    
    # 销售 A 比销售 B 多卖的 (相减,小于等于 0 的会被移除)
    diff_sales = sales_a - sales_b
    print(f"销售 A 比销售 B 多卖的: {diff_sales}")
    # 输出: Counter({'apple': 1, 'orange': 1})
    
    # 两者都有销售的商品 (交集)
    common_sales = sales_a & sales_b
    print(f"两者都有销售的商品: {common_sales}")
    # 输出: Counter({'apple': 1, 'banana': 1})
    
    # 两者所有销售过的商品 (并集)
    all_sold_items = sales_a | sales_b
    print(f"两者所有销售过的商品: {all_sold_items}")
    # 输出: Counter({'apple': 2, 'banana': 1, 'grape': 2, 'orange': 1})

    之前遇到过一个需求,需要找出同时被购买的商品组合,我尝试用 Counter 的加减法,但发现对组合计数效果不好,后来用 itertools.combinations 结合 Counter 才完美解决,大家可以举一反三,根据实际需求灵活组合使用这些工具。


常见误区与避坑指南

学习这些高效工具时,一些小细节可能导致意想不到的问题,我把自己踩过的坑整理出来,希望能帮大家避开。

  1. 误区一:混淆 defaultdict 和普通 dict.setdefault()
    dict.setdefault(key, default_value) 也能在键不存在时设置一个默认值。

    my_dict = {}
    my_dict.setdefault('key1', []).append('value1')
    my_dict.setdefault('key1', []).append('value2')
    print(my_dict) # {'key1': ['value1', 'value2']}

    看起来和 defaultdict 效果类似,但两者有本质区别:setdefault 每次调用都会做一次键查找,如果键存在,它会返回键对应的值;如果键不存在,它会先设置默认值,然后再返回。而 defaultdict 在键不存在时才调用工厂函数生成默认值。当你在一个循环中频繁操作字典,尤其是数据量非常大时,defaultdict 会因为其内部实现上的优化而拥有更高的效率和更清晰的代码。我刚开始学的时候,觉得 setdefault 也能实现类似效果,但处理大量数据时,defaultdict 的性能优势就体现出来了,调试时才发现,原来每次 setdefault 都会做一次键查找,而 defaultdict 在键不存在时才调用工厂函数,这在处理海量数据时,性能开销是不可忽视的。

  2. 误区二:错误地清空 Counter 中的某个元素计数
    当你想要将 Counter 中某个元素的计数归零,并使其不再出现在 most_common() 结果中时,直接赋值 c['item'] = 0 可能达不到预期效果。

    from collections import Counter
    c = Counter(['apple', 'banana', 'apple'])
    print(c) # Counter({'apple': 2, 'banana': 1})
    
    c['apple'] = 0 # 尝试将 'apple' 的计数设为 0
    print(c) # Counter({'apple': 0, 'banana': 1})
    print(c.most_common()) # [('banana', 1), ('apple', 0)] - 'apple' 依然存在!

    可以看到,'apple' 仍然会出现在 most_common() 的结果中,只是计数为 0。如果希望彻底移除这个元素,应该使用 del 关键字:

    del c['apple']
    print(c) # Counter({'banana': 1})
    print(c.most_common()) # [('banana', 1)]

    我之前用 Counter 统计用户访问路径,想清空某个路径的计数,直接 c['path'] = 0,结果发现 most_common() 依然会包含它。这个小细节差点让我统计结果出错,导致报表数据不准确。


经验总结与互动

总而言之,collections 模块里的 defaultdictCounter 是 Python 处理数据聚合和统计的利器,它们能帮助我们写出更简洁、更高效、更 Pythonic 的代码,告别那些繁琐的条件判断和冗余的字典初始化。掌握它们,无疑会大幅提升你的 Python 开发效率。

你平时在项目中还用到 collections 模块的哪些宝藏工具?或者在使用 defaultdictCounter 时,踩过哪些有意思的坑?欢迎在评论区分享你的经验和故事!

正文完
 0
评论(没有评论)