字符串模糊匹配
本篇是《大数据算法与UDF系列》的第7篇,讲解常见的字符串相似度算法,以及在大数据场景下如何实现高效的模糊匹配。
1. 业务场景
1.1 为什么需要模糊匹配?
┌─────────────────────────────────────────────────────────────────┐
│ 业务场景举例 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 📝 名单匹配: │
│ ├─ A公司名单: "阿里巴巴(中国)有限公司" │
│ ├─ B公司名单: "阿里巴巴(中国)有限公司" ← 标点差异 │
│ ├─ B公司名单: "阿里云计算有限公司" ← 简称 │
│ └─ 期望: 识别为同一家公司 │
│ │
│ 🔍 搜索纠错: │
│ ├─ 用户输入: "iphon" │
│ └─ 期望: 纠正为 "iPhone" │
│ │
│ 📋 数据清洗: │
│ ├─ 数据源1: "北京市朝阳区建国路" │
│ └─ 数据源2: "北京 朝阳区 建国路" │
│ │
│ 🆔 身份匹配: │
│ ├─ 证件号1: "110101199001011234" │
│ └─ 证件号2: "110101199001011235" ← 一位之差 │
│ │
└─────────────────────────────────────────────────────────────────┘2. 相似度算法
2.1 概览
┌─────────────────────────────────────────────────────────────────┐
│ 相似度算法分类 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 基于编辑距离: │
│ ├─ Levenshtein(编辑距离) │
│ ├─ Damerau-Levenshtein(允许换位) │
│ └─ Jaro-Winkler(考虑前缀匹配) │
│ │
│ 基于集合: │
│ ├─ Jaccard(交集/并集) │
│ ├─ Sorensen-Dice │
│ └─ Overlap Coefficient │
│ │
│ 基于字符: │
│ ├─ Cosine Similarity(余弦相似度) │
│ ├─ Longest Common Subsequence │
│ └─ n-gram │
│ │
│ 常用场景: │
│ ├─ 短文本 → Jaro-Winkler │
│ ├─ 长文本 → Cosine / Jaccard │
│ └─ 允许换位 → Damerau-Levenshtein │
│ │
└─────────────────────────────────────────────────────────────────┘2.2 Levenshtein距离
编辑距离:将一个字符串变成另一个字符串所需的最少操作次数
操作类型: 删除、插入、替换
"kitten" → "sitting"
k i t t e n
↓ ↓
s i t t e n (k→s, 替换)
s i t t i n (e→i, 替换)
s i t t i n g (添加g)
编辑距离 = 3
┌────────────────────────────────────┐
│ Levenshtein距离可视化 │
├────────────────────────────────────┤
│ │
│ 字符串A: "Hello" │
│ 字符串B: "Hallo" │
│ │
│ H e l l o │
│ | | | | | │
│ H a l l o │
│ ↓ │
│ 位置2: e→a (替换) │
│ │
│ 距离 = 1 │
│ │
└────────────────────────────────────┘Python实现
def levenshtein_distance(s1, s2):
"""计算编辑距离"""
if len(s1) < len(s2):
return levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
# insertions, deletions, substitutions
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def levenshtein_similarity(s1, s2, max_distance=3):
"""计算相似度 (0-1)"""
distance = levenshtein_distance(s1, s2)
max_len = max(len(s1), len(s2))
if max_len == 0:
return 1.0
return max(0, 1 - distance / max_len)
# 测试
print(levenshtein_distance("kitten", "sitting")) # 3
print(levenshtein_similarity("Hello", "Hallo")) # 0.8Scala实现
object Levenshtein {
def distance(s1: String, s2: String): Int = {
val len1 = s1.length
val len2 = s2.length
val dp = Array.ofDim[Int](len1 + 1, len2 + 1)
for (i <- 0 to len1) dp(i)(0) = i
for (j <- 0 to len2) dp(0)(j) = j
for (i <- 1 to len1; j <- 1 to len2) {
val cost = if (s1(i - 1) == s2(j - 1)) 0 else 1
dp(i)(j) = Seq(
dp(i - 1)(j) + 1, // deletion
dp(i)(j - 1) + 1, // insertion
dp(i - 1)(j - 1) + cost // substitution
).min
}
dp(len1)(len2)
}
def similarity(s1: String, s2: String): Double = {
val maxLen = math.max(s1.length, s2.length)
if (maxLen == 0) 1.0
else 1.0 - distance(s1, s2).toDouble / maxLen
}
}2.3 Jaro-Winkler距离
特点:对前缀匹配有额外加分
┌─────────────────────────────────────────────────────────────────┐
│ Jaro-Winkler 特点 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ "MARTHA" vs "MARHTA" │
│ │
│ Levenshtein: 2 (替换2次) │
│ Jaro-Winkler: 0.94 (前缀匹配加分) │
│ │
│ ───────────────────────────────────── │
│ │
│ "ABC" vs "AB" │
│ │
│ Levenshtein: 1 │
│ Jaro-Winkler: 0.93 (前缀匹配加分) │
│ │
│ 适用场景: 短字符串、人名、公司名 │
│ │
└─────────────────────────────────────────────────────────────────┘def jaro_winkler_similarity(s1, s2, p=0.1):
"""
Jaro-Winkler相似度
p: 前缀缩放因子,通常为0.1
"""
def jaro_similarity(s1, s2):
if s1 == s2:
return 1.0
len1, len2 = len(s1), len(s2)
match_distance = max(len1, len2) // 2 - 1
s1_matches = [False] * len1
s2_matches = [False] * len2
matches = 0
transpositions = 0
for i in range(len1):
start = max(0, i - match_distance)
end = min(i + match_distance + 1, len2)
for j in range(start, end):
if s2_matches[j] or s1[i] != s2[j]:
continue
s1_matches[i] = True
s2_matches[j] = True
matches += 1
break
if matches == 0:
return 0.0
k = 0
for i in range(len1):
if not s1_matches[i]:
continue
while not s2_matches[k]:
k += 1
if s1[i] != s2[k]:
transpositions += 1
k += 1
return (matches / len1 + matches / len2 +
(matches - transpositions / 2) / matches) / 3
# 计算Jaro相似度
jaro = jaro_similarity(s1, s2)
# 计算共同前缀长度(最多4)
prefix_len = 0
for i in range(min(len(s1), len(s2), 4)):
if s1[i] == s2[i]:
prefix_len += 1
else:
break
# Jaro-Winkler
return jaro + prefix_len * p * (1 - jaro)
# 测试
print(jaro_winkler_similarity("MARTHA", "MARHTA")) # ~0.94
print(jaro_winkler_similarity("ABC", "AB")) # ~0.932.4 Jaccard相似度
基于集合的相似度
┌─────────────────────────────────────────────────────────────────┐
│ Jaccard 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 字符串: "hello" │
│ 字符集合: {h, e, l, o} │
│ │
│ 字符串: "hallo" │
│ 字符集合: {h, a, l, o} │
│ │
│ Jaccard = |A ∩ B| / |A ∪ B| │
│ = {h, l, o} / {h, e, l, o, a} │
│ = 3/5 = 0.6 │
│ │
│ 优点: 计算简单,速度快 │
│ 缺点: 不考虑字符顺序 │
│ │
└─────────────────────────────────────────────────────────────────┘def jaccard_similarity(s1, s2):
"""Jaccard相似度"""
set1 = set(s1.lower())
set2 = set(s2.lower())
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0
def jaccard_ngram(s1, s2, n=2):
"""N-gram Jaccard相似度"""
def get_ngrams(s, n):
return set(s[i:i+n] for i in range(len(s) - n + 1))
ngrams1 = get_ngrams(s1.lower(), n)
ngrams2 = get_ngrams(s2.lower(), n)
intersection = len(ngrams1 & ngrams2)
union = len(ngrams1 | ngrams2)
return intersection / union if union > 0 else 0
# 测试
print(jaccard_similarity("hello", "hallo")) # 0.6
print(jaccard_ngram("hello", "hallo", 2)) # 0.4
print(jaccard_ngram("hello", "hello", 2)) # 1.03. PySpark实现模糊匹配
3.1 基础UDF实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
spark = SparkSession.builder.getOrCreate()
# 模拟数据
customers = [
(1, "阿里巴巴(中国)有限公司"),
(2, "阿里巴巴(中国)有限公司"),
(3, "阿里云计算有限公司"),
(4, "腾讯科技(深圳)有限公司"),
(5, "腾讯科技有限公司"),
(6, "字节跳动有限公司"),
]
companies = [
("A01", "阿里巴巴(中国)有限公司"),
("A02", "腾讯科技(深圳)有限公司"),
("A03", "百度在线网络技术有限公司"),
]
customers_df = spark.createDataFrame(customers, ["id", "name"])
companies_df = spark.createDataFrame(companies, ["code", "name"])
# 注册UDF
from difflib import SequenceMatcher
@F.udf(DoubleType())
def similarity_udf(s1, s2):
if s1 is None or s2 is None:
return 0.0
return SequenceMatcher(None, s1, s2).ratio()
@F.udf(DoubleType())
def levenshtein_udf(s1, s2):
if s1 is None or s2 is None:
return 0.0
from Levenshtein import ratio # 需要pip install Levenshtein
return ratio(s1, s2)
# 笛卡尔积 + 相似度计算
result = customers_df.crossJoin(companies_df) \
.withColumn("similarity", similarity_udf(F.col("name"), F.col("name"))) \
.filter(F.col("similarity") > 0.7) \
.orderBy(F.desc("similarity"))
result.show(truncate=False)3.2 性能优化:广播小表
# 当公司列表较小时,广播公司列表
company_list = [(row.code, row.name) for row in companies_df.collect()]
@F.udf(DoubleType())
def find_best_match(name):
if name is None:
return (None, 0.0)
best_match = None
best_score = 0.0
for code, company_name in company_list:
score = SequenceMatcher(None, name, company_name).ratio()
if score > best_score:
best_score = score
best_match = code
return (best_match, best_score)
# 使用
result = customers_df.withColumn(
"best_match",
find_best_match(F.col("name"))
)3.3 完整示例:名单匹配
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
spark = SparkSession.builder.getOrCreate()
# =============================================
# 模拟两套名单
# =============================================
# 名单A: 历史客户
list_a = [
("A001", "北京华夏科技有限公司"),
("A002", "上海浦东发展银行"),
("A003", "深圳市腾讯计算机系统有限公司"),
("A004", "阿里巴巴(中国)有限公司"),
("A005", "中国移动通信集团有限公司"),
]
# 名单B: 新增客户
list_b = [
("B001", "北京华夏科技有限公司"),
("B002", "上海浦东发展银行股份有限公司"),
("B003", "腾讯科技(深圳)有限公司"),
("B004", "阿里巴巴(中国)有限"),
("B005", "中国移动通信集团"),
("B006", "京东科技集团"),
]
df_a = spark.createDataFrame(list_a, ["id_a", "name_a"])
df_b = spark.createDataFrame(list_b, ["id_b", "name_b"])
# =============================================
# 方法1: 精确匹配
# =============================================
print("=" * 60)
print("1. 精确匹配")
print("=" * 60)
exact_match = df_a.join(df_b, df_a.name_a == df_b.name_b)
exact_match.show(truncate=False)
# =============================================
# 方法2: 模糊匹配(笛卡尔积)
# =============================================
print("=" * 60)
print("2. 模糊匹配(相似度>0.7)")
print("=" * 60)
from difflib import SequenceMatcher
@F.udf(DoubleType())
def similarity(s1, s2):
if s1 is None or s2 is None:
return 0.0
return SequenceMatcher(None, s1, s2).ratio()
# 笛卡尔积
cross_df = df_a.crossJoin(df_b)
# 计算相似度并过滤
matched = cross_df \
.withColumn("similarity", similarity(F.col("name_a"), F.col("name_b"))) \
.filter(F.col("similarity") >= 0.7) \
.orderBy(F.col("name_a"), F.desc("similarity"))
# 取最佳匹配
from pyspark.sql.window import Window
window = Window.partitionBy("id_a").orderBy(F.desc("similarity"))
best_match = matched.withColumn(
"rank", F.row_number().over(window)
).filter(F.col("rank") == 1)
best_match.select("id_a", "name_a", "id_b", "name_b", "similarity").show(truncate=False)
# =============================================
# 方法3: 只匹配最佳
# =============================================
print("=" * 60)
print("3. 仅保留相似度最高的匹配")
print("=" * 60)
# 标记最佳匹配
final_result = best_match.select(
F.col("id_a"),
F.col("name_a"),
F.col("id_b").alias("matched_id_b"),
F.col("name_b").alias("matched_name"),
F.col("similarity")
)
final_result.show(truncate=False)
spark.stop()4. Scala实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
object StringMatching {
// Levenshtein距离实现
def levenshtein(s1: String, s2: String): Int = {
val len1 = s1.length
val len2 = s2.length
val dp = Array.ofDim[Int](len1 + 1, len2 + 1)
for (i <- 0 to len1) dp(i)(0) = i
for (j <- 0 to len2) dp(0)(j) = j
for (i <- 1 to len1; j <- 1 to len2) {
val cost = if (s1(i - 1) == s2(j - 1)) 0 else 1
dp(i)(j) = Seq(
dp(i - 1)(j) + 1,
dp(i)(j - 1) + 1,
dp(i - 1)(j - 1) + cost
).min
}
dp(len1)(len2)
}
// 相似度
def similarity(s1: String, s2: String): Double = {
val maxLen = math.max(s1.length, s2.length)
if (maxLen == 0) 1.0
else 1.0 - levenshtein(s1, s2).toDouble / maxLen
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("StringMatching")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 创建UDF
val similarityUDF = udf((s1: String, s2: String) => {
similarity(s1, s2)
})
// 测试数据
val df1 = Seq(
(1, "阿里巴巴(中国)有限公司"),
(2, "腾讯科技(深圳)有限公司")
).toDF("id", "name")
val df2 = Seq(
("A", "阿里巴巴(中国)有限公司"),
("B", "腾讯科技有限公司"),
("C", "百度在线网络技术")
).toDF("code", "name")
// 笛卡尔积 + 相似度计算
val result = df1.crossJoin(df2)
.withColumn("similarity", similarityUDF($"name", $"name"))
.filter($"similarity" >= 0.7)
.orderBy($"name", $"similarity".desc)
result.show(false)
spark.stop()
}
}5. 实战:数据清洗
5.1 场景:地址清洗
# 地址标准化
from pyspark.sql import functions as F
addresses = [
("北京市朝阳区建国路88号",),
("北京 朝阳区 建国路 88号",),
("北京市-朝阳区-建国路88号",),
("上海浦东新区世纪大道100号",),
("上海 浦东新区 世纪大道100号",),
]
df = spark.createDataFrame(addresses, ["address"])
# 标准化函数
@F.udf()
def normalize_address(addr):
if addr is None:
return None
# 去除空格、标点,统一格式
import re
addr = re.sub(r'[\s\-,\.]+', '', addr) # 去除分隔符
addr = addr.replace('市', '').replace('区', '') # 简化
return addr
df.withColumn("normalized", normalize_address("address")).show(truncate=False)5.2 场景:手机号脱敏
# 手机号模糊匹配(中间4位用*替代)
@F.udf()
def mask_phone(phone):
if phone is None or len(phone) != 11:
return phone
return phone[:3] + "****" + phone[7:]
# 使用
df.withColumn("masked_phone", mask_phone(F.col("phone"))).show()6. 性能优化技巧
6.1 避免笛卡尔积
# ❌ 低效:笛卡尔积
result = df1.crossJoin(df2)
result = result.withColumn("sim", similarity_udf(...))
# ✅ 优化1:先过滤候选
result = df1.join(df2,
(F.length(F.col("name1")) - F.length(F.col("name2"))).abs() <= 3
)
# ✅ 优化2:使用分区
result = df1.repartition(100).join(df2, "key")6.2 索引优化
# 添加长度过滤
df1.join(df2,
(F.abs(F.length(F.col("name1")) - F.length(F.col("name2"))) <= threshold) &
(F.substring(F.col("name1"), 1, 2) == F.substring(F.col("name2"), 1, 2))
)6.3 分块处理
# 按首字母分块,减少比较次数
df1.withColumn("prefix", F.substring("name", 1, 1)) \
.join(df2.withColumn("prefix", F.substring("name", 1, 1)), "prefix")7. 常见问题
7.1 中文分词
# 中文模糊匹配需要先分词
import jieba
@F.udf()
def tokenize_chinese(text):
if text is None:
return None
return " ".join(jieba.cut(text))
# 分词后再计算Jaccard
@F.udf(DoubleType())
def jaccard_chinese(s1, s2):
if s1 is None or s2 is None:
return 0.0
set1 = set(s1.split())
set2 = set(s2.split())
return len(set1 & set2) / len(set1 | set2)7.2 相似度阈值选择
| 场景 | 阈值 | 说明 |
|---|---|---|
| 严格要求 | 0.9 | 几乎相同 |
| 标准匹配 | 0.8 | 允许少量差异 |
| 宽松匹配 | 0.7 | 允许较大差异 |
| 候选筛选 | 0.6 | 初筛,后处理 |
8. 练习题
练习1
实现一个基于N-gram的模糊匹配函数。
练习2
对公司名称进行匹配,找出相似度>0.8的对。
练习3
使用布隆过滤器优化大规模字符串匹配的性能。
练习4
实现中文分词后的Jaccard相似度计算。
順子の杂货铺


