布隆过滤器与去重
本篇是《大数据算法与UDF系列》的第6篇,讲解布隆过滤器(Bloom Filter)的原理、实现,以及在大数据去重场景中的应用。
1. 什么是布隆过滤器?
1.1 概念介绍
布隆过滤器是一种空间效率极高的概率数据结构,用于判断一个元素是否可能存在于集合中。
┌─────────────────────────────────────────────────────────────────┐
│ 布隆过滤器特点 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ✅ 空间效率高:相比HashSet节省10-100倍空间 │
│ ✅ 查询速度快:O(k),k为哈希函数个数 │
│ ✅ 支持批量添加和查询 │
│ │
│ ⚠️ 可能存在误判(False Positive): │
│ - 说"存在"可能不存在(概率可控制) │
│ - 说"不存在"一定不存在 │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 业务场景
| 场景 | 使用方式 | 作用 |
|---|---|---|
| 缓存穿透防护 | 先查BloomFilter,不存在则直接返回 | 保护后端数据库 |
| 订单去重 | BloomFilter判断订单是否已处理 | 避免重复处理 |
| 用户画像 | 判断用户是否在某个标签集合中 | 快速筛选 |
| URL去重 | 判断URL是否已爬取 | 避免重复爬取 |
2. 原理详解
2.1 核心思想
布隆过滤器使用多个哈希函数,将一个元素映射到位数组的多个位置:
原始元素: "user_123"
↓
↓ 哈希函数1: hash1("user_123") = 5 → 位数组[5] = 1
↓ 哈希函数2: hash2("user_123") = 12 → 位数组[12] = 1
↓ 哈希函数3: hash3("user_123") = 23 → 位数组[23] = 1
↓
↓
位数组: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1 ...]
↑ ↑
位置5 位置232.2 添加元素
┌─────────────────────────────────────────────────────────────────┐
│ 添加元素流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 步骤1: 计算k个哈希值,得到k个位置 │
│ │
│ 元素 "user_100" │
│ ↓ │
│ hash1 → 位置 5 │
│ hash2 → 位置 12 │
│ hash3 → 位置 23 │
│ │
│ 步骤2: 将这k个位置设为1 │
│ │
│ Before: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] │
│ After: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1] │
│ ↑ ↑ │
│ 12 23 │
│ │
└─────────────────────────────────────────────────────────────────┘2.3 查询元素
┌─────────────────────────────────────────────────────────────────┐
│ 查询元素流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 查询 "user_100" 是否存在? │
│ │
│ 步骤1: 计算k个哈希值,得到k个位置 │
│ hash1 → 位置 5 │
│ hash2 → 位置 12 │
│ hash3 → 位置 23 │
│ │
│ 步骤2: 检查这k个位置是否都为1 │
│ │
│ 位数组: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1] │
│ ↓ ↓ ↓ │
│ 位置5 位置12 位置23 │
│ 1 1 1 ← 全部为1 │
│ ↓ │
│ 结果: 可能存在 ✓ │
│ │
│ ───────────────────────────────────────────────────── │
│ │
│ 查询 "user_200" 是否存在? │
│ │
│ 步骤1: 计算k个哈希值 │
│ hash1 → 位置 7 │
│ hash2 → 位置 15 │
│ hash3 → 位置 23 │
│ │
│ 步骤2: 检查这k个位置 │
│ │
│ 位数组: [0, 0, 0, 0, 0, 1, 0, 1, ...] │
│ ↓ │
│ 位置7 = 0 │
│ ↓ │
│ 结果: 一定不存在 ✗ │
│ │
└─────────────────────────────────────────────────────────────────┘2.4 误判原理
┌─────────────────────────────────────────────────────────────────┐
│ 误判原因 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 当两个不同元素A和B的哈希位置恰好重叠时: │
│ │
│ 添加元素A: │
│ ┌─────────────────────────────────┐ │
│ │ 位置: [5] [12] [23] = 1 │ ← A的三个位置 │
│ └─────────────────────────────────┘ │
│ │
│ 查询元素B(不存在): │
│ ┌─────────────────────────────────┐ │
│ │ 位置: [5] [12] [45] = 1 │ ← B的前两个位置与A重叠 │
│ │ ↑ 误判! │ │
│ └─────────────────────────────────┘ │
│ │
│ 结果: B被判断为"可能存在"(实际不存在) │
│ │
│ 解决方案: 控制误判率 │
│ - 增大位数组大小 │
│ - 增加哈希函数数量 │
│ │
└─────────────────────────────────────────────────────────────────┘3. 布隆过滤器实现
3.1 Python实现
import hashlib
import math
import bitarray
class BloomFilter:
"""布隆过滤器实现"""
def __init__(self, expected_items=100000, false_positive_rate=0.01):
"""
初始化布隆过滤器
参数:
expected_items: 预期存储的元素数量
false_positive_rate: 期望的误判率
"""
self.expected_items = expected_items
self.false_positive_rate = false_positive_rate
# 计算位数组大小
# m = -n * ln(p) / (ln(2)^2)
self.size = int(-expected_items * math.log(false_positive_rate) / (math.log(2) ** 2))
# 计算哈希函数数量
# k = (m/n) * ln(2)
self.hash_count = int((self.size / expected_items) * math.log(2))
# 初始化位数组
self.bit_array = bitarray.bitarray(self.size)
self.bit_array.setall(0)
print(f"初始化: 预期元素={expected_items}, 误判率={false_positive_rate*100}%")
print(f"位数组大小: {self.size/8/1024:.2f} KB")
print(f"哈希函数数量: {self.hash_count}")
def _get_positions(self, item):
"""计算元素对应的k个位置"""
positions = []
for i in range(self.hash_count):
# 使用不同的盐值
hash_input = f"{item}_{i}".encode()
hash_val = int(hashlib.md5(hash_input).hexdigest(), 16)
position = hash_val % self.size
positions.append(position)
return positions
def add(self, item):
"""添加一个元素"""
for pos in self._get_positions(item):
self.bit_array[pos] = 1
def might_contain(self, item):
"""检查元素是否可能存在"""
for pos in self._get_positions(item):
if self.bit_array[pos] == 0:
return False
return True
def get_bit_count(self):
"""获取已使用的位数"""
return self.bit_array.count(1)
# 使用示例
bf = BloomFilter(expected_items=10000, false_positive_rate=0.01)
# 添加1000个元素
for i in range(1000):
bf.add(f"user_{i}")
# 查询
print(f"存在的元素查询: {bf.might_contain('user_500')}") # True
print(f"不存在的元素查询: {bf.might_contain('user_9999')}") # 应该False,但可能误判
# 统计误判率
false_positive = 0
for i in range(1000, 2000):
if bf.might_contain(f"user_{i}"):
false_positive += 1
print(f"误判数: {false_positive}/1000 = {false_positive/10}%")3.2 PySpark集成
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
spark = SparkSession.builder.getOrCreate()
# 创建布隆过滤器
# Spark 3.0+ 内置了 bloom_filter agg
# 创建测试数据
data = [(f"user_{i}",) for i in range(10000)]
df = spark.createDataFrame(data, ["user_id"])
# 创建布隆过滤器(使用内置函数)
# 注意:Spark的bloom_filter需要明确指定expectedItems
# 方法1:使用Python UDF
from pyspark.sql.functions import udf
bloom_udf = udf(
lambda items: BloomFilter(len(items), 0.01).add_all(items),
BooleanType()
)
# 方法2:使用Spark内置的bloom_approx_distinct
# (实际上内部使用HyperLogLog)
result = df.agg(
F.approx_count_distinct("user_id").alias("approx_count")
)3.3 Scala实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter
object BloomFilterDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("BloomFilterDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 创建测试数据
val data = (1 to 10000).map(i => (s"user_$i", 1)).toDF("user_id", "value")
// 创建布隆过滤器
val bloomFilter = BloomFilter.create(expectedItems = 10000, fpp = 0.01)
// 添加元素
data.collect().foreach(row => {
bloomFilter.put(row.getString(0).getBytes)
})
// 查询
println(s"user_500 exists: ${bloomFilter.mightContain("user_500".getBytes)}")
println(s"user_9999 exists: ${bloomFilter.mightContain("user_9999".getBytes)}")
// 测试误判率
var falsePositive = 0
for (i <- 10000 to 11000) {
if (bloomFilter.mightContain(s"user_$i".getBytes)) {
falsePositive += 1
}
}
println(s"False positive: $falsePositive / 1000")
spark.stop()
}
}4. 实战:订单去重
4.1 业务场景
处理订单数据时,需要:
- 判断订单是否已处理过
- 快速过滤重复订单
4.2 完整代码
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import hashlib
spark = SparkSession.builder.getOrCreate()
# 模拟订单数据(包含重复订单)
orders = [
("order_001", "user_1", 100, "2024-01-01 10:00:00"),
("order_002", "user_2", 200, "2024-01-01 10:01:00"),
("order_001", "user_1", 100, "2024-01-01 10:02:00"), # 重复
("order_003", "user_3", 150, "2024-01-01 10:03:00"),
("order_002", "user_2", 200, "2024-01-01 10:04:00"), # 重复
("order_004", "user_4", 300, "2024-01-01 10:05:00"),
("order_005", "user_5", 180, "2024-01-01 10:06:00"),
]
df = spark.createDataFrame(orders, ["order_id", "user_id", "amount", "create_time"])
print("原始订单数据:")
df.show()
# =============================================
# 方法1:使用DataFrame去重
# =============================================
print("=" * 50)
print("方法1: DataFrame去重(精确)")
print("=" * 50)
deduped_df = df.dropDuplicates(["order_id"])
deduped_df.show()
print(f"去重后数量: {deduped_df.count()}")
# =============================================
# 方法2:使用布隆过滤器去重(大数据量)
# =============================================
print("=" * 50)
print("方法2: 布隆过滤器去重(近似)")
print("=" * 50)
# 收集所有order_id到driver端(仅适用于小数据量)
order_ids = df.select("order_id").distinct().collect()
order_id_set = set([row.order_id for row in order_ids])
# 创建布隆过滤器
import math
class SimpleBloomFilter:
def __init__(self, size=100000):
self.size = size
self.bit_array = [0] * size
def _hash(self, item, seed):
h = hashlib.md5(f"{item}_{seed}".encode()).hexdigest()
return int(h, 16) % self.size
def add(self, item):
for i in range(3):
pos = self._hash(item, i)
self.bit_array[pos] = 1
def might_contain(self, item):
for i in range(3):
pos = self._hash(item, i)
if self.bit_array[pos] == 0:
return False
return True
bf = SimpleBloomFilter()
# 过滤重复订单
def filter_duplicates(order_id):
if bf.might_contain(order_id):
return False
bf.add(order_id)
return True
# 使用UDF过滤
filter_udf = F.udf(lambda x: filter_duplicates(x))
result = df.filter(filter_udf(F.col("order_id")))
result.show()
print(f"布隆过滤后数量: {result.count()}")
# =============================================
# 方法3:高性能去重(使用SQL)
# =============================================
print("=" * 50)
print("方法3: SQL去重 + 统计")
print("=" * 50)
# 注册临时表
df.createOrReplaceTempView("orders")
# SQL去重
spark.sql("""
SELECT order_id, user_id, amount, MIN(create_time) as create_time
FROM orders
GROUP BY order_id, user_id, amount
""").show()
spark.stop()5. 布隆过滤器 vs 其他去重
5.1 对比表
| 方法 | 空间 | 精度 | 速度 | 适用场景 |
|---|---|---|---|---|
| DISTINCT | O(N) | 100% | 慢 | 小数据量 |
| HashSet | O(N) | 100% | 快 | 小数据量 |
| Bitmap | N/8 | 100% | 快 | ID连续 |
| HyperLogLog | ~12KB | ~2% | 最快 | 只需基数 |
| 布隆过滤器 | 可配置 | 可配置 | 快 | 判断存在性 |
5.2 选择指南
┌─────────────────────────────────────────────────────────────────┐
│ 选择建议 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 需要精确去重? │
│ ├─ 数据量 < 100万 → HashSet / DISTINCT │
│ └─ 数据量 > 100万 → 先BloomFilter + 后精确 │
│ │
│ 需要统计基数(UV)? │
│ ├─ 数据量小 → COUNT(DISTINCT) │
│ └─ 数据量大 → HyperLogLog │
│ │
│ 需要判断存在性? │
│ └─ 布隆过滤器 │
│ │
│ 数据ID连续(如手机号、ID)? │
│ └─ Bitmap │
│ │
└─────────────────────────────────────────────────────────────────┘6. 布隆过滤器在Join中的应用
6.1 优化原理
┌─────────────────────────────────────────────────────────────────┐
│ 布隆过滤器优化Join │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 场景: 大表 JOIN 小表 │
│ │
│ 优化前: │
│ ┌─────────┐ ┌─────────┐ │
│ │ 大表(1亿)│ JOIN │ 小表(10万)│ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ └──────────────┘ │
│ Shuffle: 1亿条数据网络传输 │
│ │
│ 优化后: │
│ ┌─────────┐ ┌─────────┐ │
│ │ 大表(1亿)│ ──▶│布隆过滤 │ ← 提前过滤 │
│ └────┬────┘ └─────────┘ │
│ │ │
│ Shuffle: 只传输匹配的数据 │
│ │
│ 效果: 减少90%以上的Shuffle数据量 │
│ │
└─────────────────────────────────────────────────────────────────┘6.2 PySpark实现
# 布隆过滤器优化Join
# 步骤1: 创建小表的布隆过滤器
small_df = spark.read.table("dim_product")
bloom_filter = BloomFilter.create(expectedItems=small_df.count(), fpp=0.01)
# 添加所有product_id
for row in small_df.collect():
bloom_filter.put(str(row.product_id).encode())
# 步骤2: 大表过滤
large_df = spark.read.table("fact_orders")
# 将布隆过滤器广播到每个分区
broad_cast_bf = spark.sparkContext.broadcast(bloom_filter)
def filter_func(product_id):
return broad_cast_bf.value.mightContain(str(product_id).encode())
filter_udf = F.udf(filter_func, BooleanType())
# 先过滤,再Join
filtered_large = large_df.filter(filter_udf(F.col("product_id")))
result = filtered_large.join(small_df, "product_id")7. 布隆过滤器参数设计
7.1 公式
n = 预期插入元素数量
m = 位数组大小
k = 哈希函数数量
p = 误判率
基本公式:
1. m = -n * ln(p) / (ln(2)^2)
2. k = (m/n) * ln(2)
3. p = (1 - e^(-kn/m))^k7.2 参数速查表
| n(预期数量) | p=1% | p=1% | p=5% | p=5% |
|---|---|---|---|---|
| m | k | m | k | |
| 1,000 | 9.6KB | 7 | 1.3KB | 4 |
| 10,000 | 96KB | 10 | 13KB | 5 |
| 100,000 | 960KB | 13 | 130KB | 6 |
| 1,000,000 | 9.6MB | 16 | 1.3MB | 7 |
| 10,000,000 | 96MB | 19 | 13MB | 8 |
7.3 配置建议
# 场景1: 缓存穿透防护
# 预期100万数据,误判率1%
bloom = BloomFilter(expected_items=1000000, false_positive_rate=0.01)
# 场景2: 订单去重
# 预期10万订单,误判率5%(可接受)
bloom = BloomFilter(expected_items=100000, false_positive_rate=0.05)
# 场景3: 精确场景(不推荐使用BloomFilter)
# 请使用HashSet或数据库DISTINCT8. 常见问题
8.1 删除元素
布隆过滤器不支持删除,因为可能影响其他元素:
问题:
┌────────────────────────────────────┐
│ 元素A: 位置[1,2,3]=1 │
│ 元素B: 位置[3,4,5]=1 │
│ 删除A: 位置[1,2,3]=0 │
│ 查询B: 位置[3,4,5]=[1,1,1]? NO! │
│ 结果: B被误判为不存在 │
└────────────────────────────────────┘
解决方案: 使用 Counting Bloom Filter8.2 空间不够
# 当实际数据超过预期时,误判率会上升
# 解决方案:
# 1. 预留空间: 预期数量 × 2
bloom = BloomFilter(expected_items=actual_count * 2)
# 2. 定期重建
if current_count > initial_capacity * 0.7:
bloom = BloomFilter(current_count * 2)9. 练习题
练习1
实现一个 Counting Bloom Filter,支持删除操作。
练习2
设计一个布隆过滤器,用于判断一个URL是否已经被爬取过。
练习3
对比布隆过滤器和HashSet在100万数据下的内存占用。
练习4
使用布隆过滤器优化大表JOIN小表的场景。
順子の杂货铺




