顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x06-布隆过滤器与去重

DMIT VPS

布隆过滤器与去重

本篇是《大数据算法与UDF系列》的第6篇,讲解布隆过滤器(Bloom Filter)的原理、实现,以及在大数据去重场景中的应用。


1. 什么是布隆过滤器?

1.1 概念介绍

布隆过滤器是一种空间效率极高的概率数据结构,用于判断一个元素是否可能存在于集合中。

┌─────────────────────────────────────────────────────────────────┐
│                      布隆过滤器特点                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ✅ 空间效率高:相比HashSet节省10-100倍空间                     │
│   ✅ 查询速度快:O(k),k为哈希函数个数                           │
│   ✅ 支持批量添加和查询                                         │
│                                                                 │
│   ⚠️ 可能存在误判(False Positive):                           │
│      - 说"存在"可能不存在(概率可控制)                          │
│      - 说"不存在"一定不存在                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 业务场景

场景使用方式作用
缓存穿透防护先查BloomFilter,不存在则直接返回保护后端数据库
订单去重BloomFilter判断订单是否已处理避免重复处理
用户画像判断用户是否在某个标签集合中快速筛选
URL去重判断URL是否已爬取避免重复爬取

2. 原理详解

2.1 核心思想

布隆过滤器使用多个哈希函数,将一个元素映射到位数组的多个位置

原始元素: "user_123"
    ↓
    ↓ 哈希函数1: hash1("user_123") = 5   →  位数组[5] = 1
    ↓ 哈希函数2: hash2("user_123") = 12  → 位数组[12] = 1
    ↓ 哈希函数3: hash3("user_123") = 23  → 位数组[23] = 1
    ↓
    ↓
位数组: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1 ...]
                          ↑              ↑
                         位置5          位置23

2.2 添加元素

┌─────────────────────────────────────────────────────────────────┐
│                      添加元素流程                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   步骤1: 计算k个哈希值,得到k个位置                              │
│                                                                 │
│       元素 "user_100"                                          │
│            ↓                                                    │
│       hash1 → 位置 5                                           │
│       hash2 → 位置 12                                          │
│       hash3 → 位置 23                                          │
│                                                                 │
│   步骤2: 将这k个位置设为1                                       │
│                                                                 │
│   Before: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...]    │
│   After:  [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1]  │
│                                                    ↑         ↑  │
│                                                   12          23 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.3 查询元素

┌─────────────────────────────────────────────────────────────────┐
│                      查询元素流程                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   查询 "user_100" 是否存在?                                    │
│                                                                 │
│   步骤1: 计算k个哈希值,得到k个位置                              │
│       hash1 → 位置 5                                           │
│       hash2 → 位置 12                                          │
│       hash3 → 位置 23                                          │
│                                                                 │
│   步骤2: 检查这k个位置是否都为1                                  │
│                                                                 │
│   位数组: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, ... 1]  │
│                    ↓      ↓        ↓                           │
│                  位置5  位置12   位置23                          │
│                   1      1        1   ← 全部为1                 │
│                   ↓                                            │
│           结果: 可能存在 ✓                                      │
│                                                                 │
│   ─────────────────────────────────────────────────────        │
│                                                                 │
│   查询 "user_200" 是否存在?                                    │
│                                                                 │
│   步骤1: 计算k个哈希值                                          │
│       hash1 → 位置 7                                           │
│       hash2 → 位置 15                                          │
│       hash3 → 位置 23                                          │
│                                                                 │
│   步骤2: 检查这k个位置                                          │
│                                                                 │
│   位数组: [0, 0, 0, 0, 0, 1, 0, 1, ...]                      │
│                              ↓                                  │
│                           位置7 = 0                             │
│                           ↓                                     │
│                   结果: 一定不存在 ✗                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.4 误判原理

┌─────────────────────────────────────────────────────────────────┐
│                       误判原因                                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   当两个不同元素A和B的哈希位置恰好重叠时:                        │
│                                                                 │
│   添加元素A:                                                    │
│   ┌─────────────────────────────────┐                          │
│   │ 位置:  [5] [12] [23] = 1        │  ← A的三个位置            │
│   └─────────────────────────────────┘                          │
│                                                                 │
│   查询元素B(不存在):                                          │
│   ┌─────────────────────────────────┐                          │
│   │ 位置:  [5] [12] [45] = 1       │  ← B的前两个位置与A重叠  │
│   │              ↑  误判!           │                          │
│   └─────────────────────────────────┘                          │
│                                                                 │
│   结果: B被判断为"可能存在"(实际不存在)                        │
│                                                                 │
│   解决方案: 控制误判率                                          │
│   - 增大位数组大小                                             │
│   - 增加哈希函数数量                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3. 布隆过滤器实现

3.1 Python实现

import hashlib
import math
import bitarray

class BloomFilter:
    """布隆过滤器实现"""

    def __init__(self, expected_items=100000, false_positive_rate=0.01):
        """
        初始化布隆过滤器

        参数:
            expected_items: 预期存储的元素数量
            false_positive_rate: 期望的误判率
        """
        self.expected_items = expected_items
        self.false_positive_rate = false_positive_rate

        # 计算位数组大小
        # m = -n * ln(p) / (ln(2)^2)
        self.size = int(-expected_items * math.log(false_positive_rate) / (math.log(2) ** 2))

        # 计算哈希函数数量
        # k = (m/n) * ln(2)
        self.hash_count = int((self.size / expected_items) * math.log(2))

        # 初始化位数组
        self.bit_array = bitarray.bitarray(self.size)
        self.bit_array.setall(0)

        print(f"初始化: 预期元素={expected_items}, 误判率={false_positive_rate*100}%")
        print(f"位数组大小: {self.size/8/1024:.2f} KB")
        print(f"哈希函数数量: {self.hash_count}")

    def _get_positions(self, item):
        """计算元素对应的k个位置"""
        positions = []
        for i in range(self.hash_count):
            # 使用不同的盐值
            hash_input = f"{item}_{i}".encode()
            hash_val = int(hashlib.md5(hash_input).hexdigest(), 16)
            position = hash_val % self.size
            positions.append(position)
        return positions

    def add(self, item):
        """添加一个元素"""
        for pos in self._get_positions(item):
            self.bit_array[pos] = 1

    def might_contain(self, item):
        """检查元素是否可能存在"""
        for pos in self._get_positions(item):
            if self.bit_array[pos] == 0:
                return False
        return True

    def get_bit_count(self):
        """获取已使用的位数"""
        return self.bit_array.count(1)

# 使用示例
bf = BloomFilter(expected_items=10000, false_positive_rate=0.01)

# 添加1000个元素
for i in range(1000):
    bf.add(f"user_{i}")

# 查询
print(f"存在的元素查询: {bf.might_contain('user_500')}")  # True
print(f"不存在的元素查询: {bf.might_contain('user_9999')}")  # 应该False,但可能误判

# 统计误判率
false_positive = 0
for i in range(1000, 2000):
    if bf.might_contain(f"user_{i}"):
        false_positive += 1

print(f"误判数: {false_positive}/1000 = {false_positive/10}%")

3.2 PySpark集成

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType

spark = SparkSession.builder.getOrCreate()

# 创建布隆过滤器
# Spark 3.0+ 内置了 bloom_filter agg

# 创建测试数据
data = [(f"user_{i}",) for i in range(10000)]
df = spark.createDataFrame(data, ["user_id"])

# 创建布隆过滤器(使用内置函数)
# 注意:Spark的bloom_filter需要明确指定expectedItems

# 方法1:使用Python UDF
from pyspark.sql.functions import udf

bloom_udf = udf(
    lambda items: BloomFilter(len(items), 0.01).add_all(items),
    BooleanType()
)

# 方法2:使用Spark内置的bloom_approx_distinct
# (实际上内部使用HyperLogLog)
result = df.agg(
    F.approx_count_distinct("user_id").alias("approx_count")
)

3.3 Scala实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter

object BloomFilterDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("BloomFilterDemo")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 创建测试数据
    val data = (1 to 10000).map(i => (s"user_$i", 1)).toDF("user_id", "value")

    // 创建布隆过滤器
    val bloomFilter = BloomFilter.create(expectedItems = 10000, fpp = 0.01)

    // 添加元素
    data.collect().foreach(row => {
      bloomFilter.put(row.getString(0).getBytes)
    })

    // 查询
    println(s"user_500 exists: ${bloomFilter.mightContain("user_500".getBytes)}")
    println(s"user_9999 exists: ${bloomFilter.mightContain("user_9999".getBytes)}")

    // 测试误判率
    var falsePositive = 0
    for (i <- 10000 to 11000) {
      if (bloomFilter.mightContain(s"user_$i".getBytes)) {
        falsePositive += 1
      }
    }
    println(s"False positive: $falsePositive / 1000")

    spark.stop()
  }
}

4. 实战:订单去重

4.1 业务场景

处理订单数据时,需要:

  1. 判断订单是否已处理过
  2. 快速过滤重复订单

4.2 完整代码

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import hashlib

spark = SparkSession.builder.getOrCreate()

# 模拟订单数据(包含重复订单)
orders = [
    ("order_001", "user_1", 100, "2024-01-01 10:00:00"),
    ("order_002", "user_2", 200, "2024-01-01 10:01:00"),
    ("order_001", "user_1", 100, "2024-01-01 10:02:00"),  # 重复
    ("order_003", "user_3", 150, "2024-01-01 10:03:00"),
    ("order_002", "user_2", 200, "2024-01-01 10:04:00"),  # 重复
    ("order_004", "user_4", 300, "2024-01-01 10:05:00"),
    ("order_005", "user_5", 180, "2024-01-01 10:06:00"),
]

df = spark.createDataFrame(orders, ["order_id", "user_id", "amount", "create_time"])

print("原始订单数据:")
df.show()

# =============================================
# 方法1:使用DataFrame去重
# =============================================
print("=" * 50)
print("方法1: DataFrame去重(精确)")
print("=" * 50)

deduped_df = df.dropDuplicates(["order_id"])
deduped_df.show()
print(f"去重后数量: {deduped_df.count()}")

# =============================================
# 方法2:使用布隆过滤器去重(大数据量)
# =============================================
print("=" * 50)
print("方法2: 布隆过滤器去重(近似)")
print("=" * 50)

# 收集所有order_id到driver端(仅适用于小数据量)
order_ids = df.select("order_id").distinct().collect()
order_id_set = set([row.order_id for row in order_ids])

# 创建布隆过滤器
import math

class SimpleBloomFilter:
    def __init__(self, size=100000):
        self.size = size
        self.bit_array = [0] * size

    def _hash(self, item, seed):
        h = hashlib.md5(f"{item}_{seed}".encode()).hexdigest()
        return int(h, 16) % self.size

    def add(self, item):
        for i in range(3):
            pos = self._hash(item, i)
            self.bit_array[pos] = 1

    def might_contain(self, item):
        for i in range(3):
            pos = self._hash(item, i)
            if self.bit_array[pos] == 0:
                return False
        return True

bf = SimpleBloomFilter()

# 过滤重复订单
def filter_duplicates(order_id):
    if bf.might_contain(order_id):
        return False
    bf.add(order_id)
    return True

# 使用UDF过滤
filter_udf = F.udf(lambda x: filter_duplicates(x))

result = df.filter(filter_udf(F.col("order_id")))
result.show()
print(f"布隆过滤后数量: {result.count()}")

# =============================================
# 方法3:高性能去重(使用SQL)
# =============================================
print("=" * 50)
print("方法3: SQL去重 + 统计")
print("=" * 50)

# 注册临时表
df.createOrReplaceTempView("orders")

# SQL去重
spark.sql("""
    SELECT order_id, user_id, amount, MIN(create_time) as create_time
    FROM orders
    GROUP BY order_id, user_id, amount
""").show()

spark.stop()

5. 布隆过滤器 vs 其他去重

5.1 对比表

方法空间精度速度适用场景
DISTINCTO(N)100%小数据量
HashSetO(N)100%小数据量
BitmapN/8100%ID连续
HyperLogLog~12KB~2%最快只需基数
布隆过滤器可配置可配置判断存在性

5.2 选择指南

┌─────────────────────────────────────────────────────────────────┐
│                        选择建议                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   需要精确去重?                                                 │
│   ├─ 数据量 < 100万 → HashSet / DISTINCT                       │
│   └─ 数据量 > 100万 → 先BloomFilter + 后精确                    │
│                                                                 │
│   需要统计基数(UV)?                                           │
│   ├─ 数据量小 → COUNT(DISTINCT)                                │
│   └─ 数据量大 → HyperLogLog                                    │
│                                                                 │
│   需要判断存在性?                                               │
│   └─ 布隆过滤器                                                 │
│                                                                 │
│   数据ID连续(如手机号、ID)?                                   │
│   └─ Bitmap                                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6. 布隆过滤器在Join中的应用

6.1 优化原理

┌─────────────────────────────────────────────────────────────────┐
│                    布隆过滤器优化Join                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   场景: 大表 JOIN 小表                                          │
│                                                                 │
│   优化前:                                                      │
│   ┌─────────┐    ┌─────────┐                                   │
│   │ 大表(1亿)│ JOIN │ 小表(10万)│                                │
│   └────┬────┘    └────┬────┘                                   │
│        │              │                                          │
│        └──────────────┘                                         │
│        Shuffle: 1亿条数据网络传输                                 │
│                                                                 │
│   优化后:                                                      │
│   ┌─────────┐    ┌─────────┐                                   │
│   │ 大表(1亿)│ ──▶│布隆过滤 │  ← 提前过滤                        │
│   └────┬────┘    └─────────┘                                   │
│        │                                                       │
│   Shuffle: 只传输匹配的数据                                     │
│                                                                 │
│   效果: 减少90%以上的Shuffle数据量                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6.2 PySpark实现

# 布隆过滤器优化Join

# 步骤1: 创建小表的布隆过滤器
small_df = spark.read.table("dim_product")
bloom_filter = BloomFilter.create(expectedItems=small_df.count(), fpp=0.01)

# 添加所有product_id
for row in small_df.collect():
    bloom_filter.put(str(row.product_id).encode())

# 步骤2: 大表过滤
large_df = spark.read.table("fact_orders")

# 将布隆过滤器广播到每个分区
broad_cast_bf = spark.sparkContext.broadcast(bloom_filter)

def filter_func(product_id):
    return broad_cast_bf.value.mightContain(str(product_id).encode())

filter_udf = F.udf(filter_func, BooleanType())

# 先过滤,再Join
filtered_large = large_df.filter(filter_udf(F.col("product_id")))
result = filtered_large.join(small_df, "product_id")

7. 布隆过滤器参数设计

7.1 公式

n = 预期插入元素数量
m = 位数组大小
k = 哈希函数数量
p = 误判率

基本公式:
1. m = -n * ln(p) / (ln(2)^2)
2. k = (m/n) * ln(2)
3. p = (1 - e^(-kn/m))^k

7.2 参数速查表

n(预期数量)p=1%p=1%p=5%p=5%
mkmk
1,0009.6KB71.3KB4
10,00096KB1013KB5
100,000960KB13130KB6
1,000,0009.6MB161.3MB7
10,000,00096MB1913MB8

7.3 配置建议

# 场景1: 缓存穿透防护
# 预期100万数据,误判率1%
bloom = BloomFilter(expected_items=1000000, false_positive_rate=0.01)

# 场景2: 订单去重
# 预期10万订单,误判率5%(可接受)
bloom = BloomFilter(expected_items=100000, false_positive_rate=0.05)

# 场景3: 精确场景(不推荐使用BloomFilter)
# 请使用HashSet或数据库DISTINCT

8. 常见问题

8.1 删除元素

布隆过滤器不支持删除,因为可能影响其他元素:

问题:
┌────────────────────────────────────┐
│  元素A: 位置[1,2,3]=1              │
│  元素B: 位置[3,4,5]=1              │
│  删除A: 位置[1,2,3]=0              │
│  查询B: 位置[3,4,5]=[1,1,1]? NO!  │
│  结果: B被误判为不存在               │
└────────────────────────────────────┘

解决方案: 使用 Counting Bloom Filter

8.2 空间不够

# 当实际数据超过预期时,误判率会上升
# 解决方案:
# 1. 预留空间: 预期数量 × 2
bloom = BloomFilter(expected_items=actual_count * 2)

# 2. 定期重建
if current_count > initial_capacity * 0.7:
    bloom = BloomFilter(current_count * 2)

9. 练习题

练习1

实现一个 Counting Bloom Filter,支持删除操作。

练习2

设计一个布隆过滤器,用于判断一个URL是否已经被爬取过。

练习3

对比布隆过滤器和HashSet在100万数据下的内存占用。

练习4

使用布隆过滤器优化大表JOIN小表的场景。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x06-布隆过滤器与去重
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们