顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x07-字符串模糊匹配

DMIT VPS

字符串模糊匹配

本篇是《大数据算法与UDF系列》的第7篇,讲解常见的字符串相似度算法,以及在大数据场景下如何实现高效的模糊匹配。


1. 业务场景

1.1 为什么需要模糊匹配?

┌─────────────────────────────────────────────────────────────────┐
│                        业务场景举例                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   📝 名单匹配:                                                  │
│   ├─ A公司名单: "阿里巴巴(中国)有限公司"                        │
│   ├─ B公司名单: "阿里巴巴(中国)有限公司"  ← 标点差异           │
│   ├─ B公司名单: "阿里云计算有限公司"      ← 简称               │
│   └─ 期望: 识别为同一家公司                                      │
│                                                                 │
│   🔍 搜索纠错:                                                  │
│   ├─ 用户输入: "iphon"                                         │
│   └─ 期望: 纠正为 "iPhone"                                      │
│                                                                 │
│   📋 数据清洗:                                                  │
│   ├─ 数据源1: "北京市朝阳区建国路"                              │
│   └─ 数据源2: "北京 朝阳区 建国路"                              │
│                                                                 │
│   🆔 身份匹配:                                                  │
│   ├─ 证件号1: "110101199001011234"                              │
│   └─ 证件号2: "110101199001011235"  ← 一位之差                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2. 相似度算法

2.1 概览

┌─────────────────────────────────────────────────────────────────┐
│                      相似度算法分类                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   基于编辑距离:                                                 │
│   ├─ Levenshtein(编辑距离)                                    │
│   ├─ Damerau-Levenshtein(允许换位)                           │
│   └─ Jaro-Winkler(考虑前缀匹配)                               │
│                                                                 │
│   基于集合:                                                     │
│   ├─ Jaccard(交集/并集)                                       │
│   ├─ Sorensen-Dice                                              │
│   └─ Overlap Coefficient                                       │
│                                                                 │
│   基于字符:                                                     │
│   ├─ Cosine Similarity(余弦相似度)                            │
│   ├─ Longest Common Subsequence                                │
│   └─ n-gram                                                    │
│                                                                 │
│   常用场景:                                                     │
│   ├─ 短文本 → Jaro-Winkler                                     │
│   ├─ 长文本 → Cosine / Jaccard                                 │
│   └─ 允许换位 → Damerau-Levenshtein                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 Levenshtein距离

编辑距离:将一个字符串变成另一个字符串所需的最少操作次数

操作类型: 删除、插入、替换

"kitten" → "sitting"

k i t t e n
↓          ↓
s i t t e n   (k→s, 替换)
s i t t i n   (e→i, 替换)
s i t t i n g  (添加g)

编辑距离 = 3

┌────────────────────────────────────┐
│  Levenshtein距离可视化              │
├────────────────────────────────────┤
│                                    │
│  字符串A:  "Hello"                  │
│  字符串B:  "Hallo"                  │
│                                    │
│  H e l l o                         │
│  | | | | |                        │
│  H a l l o                         │
│    ↓                               │
│  位置2: e→a (替换)                  │
│                                    │
│  距离 = 1                          │
│                                    │
└────────────────────────────────────┘

Python实现

def levenshtein_distance(s1, s2):
    """计算编辑距离"""
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)

    if len(s2) == 0:
        return len(s1)

    previous_row = range(len(s2) + 1)

    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            # insertions, deletions, substitutions
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row

    return previous_row[-1]

def levenshtein_similarity(s1, s2, max_distance=3):
    """计算相似度 (0-1)"""
    distance = levenshtein_distance(s1, s2)
    max_len = max(len(s1), len(s2))
    if max_len == 0:
        return 1.0
    return max(0, 1 - distance / max_len)

# 测试
print(levenshtein_distance("kitten", "sitting"))  # 3
print(levenshtein_similarity("Hello", "Hallo"))   # 0.8

Scala实现

object Levenshtein {
  def distance(s1: String, s2: String): Int = {
    val len1 = s1.length
    val len2 = s2.length

    val dp = Array.ofDim[Int](len1 + 1, len2 + 1)

    for (i <- 0 to len1) dp(i)(0) = i
    for (j <- 0 to len2) dp(0)(j) = j

    for (i <- 1 to len1; j <- 1 to len2) {
      val cost = if (s1(i - 1) == s2(j - 1)) 0 else 1
      dp(i)(j) = Seq(
        dp(i - 1)(j) + 1,      // deletion
        dp(i)(j - 1) + 1,      // insertion
        dp(i - 1)(j - 1) + cost // substitution
      ).min
    }

    dp(len1)(len2)
  }

  def similarity(s1: String, s2: String): Double = {
    val maxLen = math.max(s1.length, s2.length)
    if (maxLen == 0) 1.0
    else 1.0 - distance(s1, s2).toDouble / maxLen
  }
}

2.3 Jaro-Winkler距离

特点:对前缀匹配有额外加分

┌─────────────────────────────────────────────────────────────────┐
│                    Jaro-Winkler 特点                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   "MARTHA" vs "MARHTA"                                         │
│                                                                 │
│   Levenshtein: 2 (替换2次)                                     │
│   Jaro-Winkler: 0.94 (前缀匹配加分)                            │
│                                                                 │
│   ─────────────────────────────────────                        │
│                                                                 │
│   "ABC" vs "AB"                                                │
│                                                                 │
│   Levenshtein: 1                                               │
│   Jaro-Winkler: 0.93 (前缀匹配加分)                            │
│                                                                 │
│   适用场景: 短字符串、人名、公司名                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
def jaro_winkler_similarity(s1, s2, p=0.1):
    """
    Jaro-Winkler相似度
    p: 前缀缩放因子,通常为0.1
    """
    def jaro_similarity(s1, s2):
        if s1 == s2:
            return 1.0

        len1, len2 = len(s1), len(s2)
        match_distance = max(len1, len2) // 2 - 1

        s1_matches = [False] * len1
        s2_matches = [False] * len2

        matches = 0
        transpositions = 0

        for i in range(len1):
            start = max(0, i - match_distance)
            end = min(i + match_distance + 1, len2)

            for j in range(start, end):
                if s2_matches[j] or s1[i] != s2[j]:
                    continue
                s1_matches[i] = True
                s2_matches[j] = True
                matches += 1
                break

        if matches == 0:
            return 0.0

        k = 0
        for i in range(len1):
            if not s1_matches[i]:
                continue
            while not s2_matches[k]:
                k += 1
            if s1[i] != s2[k]:
                transpositions += 1
            k += 1

        return (matches / len1 + matches / len2 +
                (matches - transpositions / 2) / matches) / 3

    # 计算Jaro相似度
    jaro = jaro_similarity(s1, s2)

    # 计算共同前缀长度(最多4)
    prefix_len = 0
    for i in range(min(len(s1), len(s2), 4)):
        if s1[i] == s2[i]:
            prefix_len += 1
        else:
            break

    # Jaro-Winkler
    return jaro + prefix_len * p * (1 - jaro)

# 测试
print(jaro_winkler_similarity("MARTHA", "MARHTA"))  # ~0.94
print(jaro_winkler_similarity("ABC", "AB"))         # ~0.93

2.4 Jaccard相似度

基于集合的相似度

┌─────────────────────────────────────────────────────────────────┐
│                      Jaccard 原理                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   字符串: "hello"                                               │
│   字符集合: {h, e, l, o}                                        │
│                                                                 │
│   字符串: "hallo"                                               │
│   字符集合: {h, a, l, o}                                        │
│                                                                 │
│   Jaccard = |A ∩ B| / |A ∪ B|                                 │
│          = {h, l, o} / {h, e, l, o, a}                        │
│          = 3/5 = 0.6                                           │
│                                                                 │
│   优点: 计算简单,速度快                                         │
│   缺点: 不考虑字符顺序                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
def jaccard_similarity(s1, s2):
    """Jaccard相似度"""
    set1 = set(s1.lower())
    set2 = set(s2.lower())

    intersection = len(set1 & set2)
    union = len(set1 | set2)

    return intersection / union if union > 0 else 0

def jaccard_ngram(s1, s2, n=2):
    """N-gram Jaccard相似度"""
    def get_ngrams(s, n):
        return set(s[i:i+n] for i in range(len(s) - n + 1))

    ngrams1 = get_ngrams(s1.lower(), n)
    ngrams2 = get_ngrams(s2.lower(), n)

    intersection = len(ngrams1 & ngrams2)
    union = len(ngrams1 | ngrams2)

    return intersection / union if union > 0 else 0

# 测试
print(jaccard_similarity("hello", "hallo"))        # 0.6
print(jaccard_ngram("hello", "hallo", 2))          # 0.4
print(jaccard_ngram("hello", "hello", 2))         # 1.0

3. PySpark实现模糊匹配

3.1 基础UDF实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

# 模拟数据
customers = [
    (1, "阿里巴巴(中国)有限公司"),
    (2, "阿里巴巴(中国)有限公司"),
    (3, "阿里云计算有限公司"),
    (4, "腾讯科技(深圳)有限公司"),
    (5, "腾讯科技有限公司"),
    (6, "字节跳动有限公司"),
]

companies = [
    ("A01", "阿里巴巴(中国)有限公司"),
    ("A02", "腾讯科技(深圳)有限公司"),
    ("A03", "百度在线网络技术有限公司"),
]

customers_df = spark.createDataFrame(customers, ["id", "name"])
companies_df = spark.createDataFrame(companies, ["code", "name"])

# 注册UDF
from difflib import SequenceMatcher

@F.udf(DoubleType())
def similarity_udf(s1, s2):
    if s1 is None or s2 is None:
        return 0.0
    return SequenceMatcher(None, s1, s2).ratio()

@F.udf(DoubleType())
def levenshtein_udf(s1, s2):
    if s1 is None or s2 is None:
        return 0.0
    from Levenshtein import ratio  # 需要pip install Levenshtein
    return ratio(s1, s2)

# 笛卡尔积 + 相似度计算
result = customers_df.crossJoin(companies_df) \
    .withColumn("similarity", similarity_udf(F.col("name"), F.col("name"))) \
    .filter(F.col("similarity") > 0.7) \
    .orderBy(F.desc("similarity"))

result.show(truncate=False)

3.2 性能优化:广播小表

# 当公司列表较小时,广播公司列表

company_list = [(row.code, row.name) for row in companies_df.collect()]

@F.udf(DoubleType())
def find_best_match(name):
    if name is None:
        return (None, 0.0)
    best_match = None
    best_score = 0.0
    for code, company_name in company_list:
        score = SequenceMatcher(None, name, company_name).ratio()
        if score > best_score:
            best_score = score
            best_match = code
    return (best_match, best_score)

# 使用
result = customers_df.withColumn(
    "best_match",
    find_best_match(F.col("name"))
)

3.3 完整示例:名单匹配

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.getOrCreate()

# =============================================
# 模拟两套名单
# =============================================

# 名单A: 历史客户
list_a = [
    ("A001", "北京华夏科技有限公司"),
    ("A002", "上海浦东发展银行"),
    ("A003", "深圳市腾讯计算机系统有限公司"),
    ("A004", "阿里巴巴(中国)有限公司"),
    ("A005", "中国移动通信集团有限公司"),
]

# 名单B: 新增客户
list_b = [
    ("B001", "北京华夏科技有限公司"),
    ("B002", "上海浦东发展银行股份有限公司"),
    ("B003", "腾讯科技(深圳)有限公司"),
    ("B004", "阿里巴巴(中国)有限"),
    ("B005", "中国移动通信集团"),
    ("B006", "京东科技集团"),
]

df_a = spark.createDataFrame(list_a, ["id_a", "name_a"])
df_b = spark.createDataFrame(list_b, ["id_b", "name_b"])

# =============================================
# 方法1: 精确匹配
# =============================================
print("=" * 60)
print("1. 精确匹配")
print("=" * 60)

exact_match = df_a.join(df_b, df_a.name_a == df_b.name_b)
exact_match.show(truncate=False)

# =============================================
# 方法2: 模糊匹配(笛卡尔积)
# =============================================
print("=" * 60)
print("2. 模糊匹配(相似度>0.7)")
print("=" * 60)

from difflib import SequenceMatcher

@F.udf(DoubleType())
def similarity(s1, s2):
    if s1 is None or s2 is None:
        return 0.0
    return SequenceMatcher(None, s1, s2).ratio()

# 笛卡尔积
cross_df = df_a.crossJoin(df_b)

# 计算相似度并过滤
matched = cross_df \
    .withColumn("similarity", similarity(F.col("name_a"), F.col("name_b"))) \
    .filter(F.col("similarity") >= 0.7) \
    .orderBy(F.col("name_a"), F.desc("similarity"))

# 取最佳匹配
from pyspark.sql.window import Window

window = Window.partitionBy("id_a").orderBy(F.desc("similarity"))
best_match = matched.withColumn(
    "rank", F.row_number().over(window)
).filter(F.col("rank") == 1)

best_match.select("id_a", "name_a", "id_b", "name_b", "similarity").show(truncate=False)

# =============================================
# 方法3: 只匹配最佳
# =============================================
print("=" * 60)
print("3. 仅保留相似度最高的匹配")
print("=" * 60)

# 标记最佳匹配
final_result = best_match.select(
    F.col("id_a"),
    F.col("name_a"),
    F.col("id_b").alias("matched_id_b"),
    F.col("name_b").alias("matched_name"),
    F.col("similarity")
)

final_result.show(truncate=False)

spark.stop()

4. Scala实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType

object StringMatching {
  // Levenshtein距离实现
  def levenshtein(s1: String, s2: String): Int = {
    val len1 = s1.length
    val len2 = s2.length
    val dp = Array.ofDim[Int](len1 + 1, len2 + 1)

    for (i <- 0 to len1) dp(i)(0) = i
    for (j <- 0 to len2) dp(0)(j) = j

    for (i <- 1 to len1; j <- 1 to len2) {
      val cost = if (s1(i - 1) == s2(j - 1)) 0 else 1
      dp(i)(j) = Seq(
        dp(i - 1)(j) + 1,
        dp(i)(j - 1) + 1,
        dp(i - 1)(j - 1) + cost
      ).min
    }
    dp(len1)(len2)
  }

  // 相似度
  def similarity(s1: String, s2: String): Double = {
    val maxLen = math.max(s1.length, s2.length)
    if (maxLen == 0) 1.0
    else 1.0 - levenshtein(s1, s2).toDouble / maxLen
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("StringMatching")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 创建UDF
    val similarityUDF = udf((s1: String, s2: String) => {
      similarity(s1, s2)
    })

    // 测试数据
    val df1 = Seq(
      (1, "阿里巴巴(中国)有限公司"),
      (2, "腾讯科技(深圳)有限公司")
    ).toDF("id", "name")

    val df2 = Seq(
      ("A", "阿里巴巴(中国)有限公司"),
      ("B", "腾讯科技有限公司"),
      ("C", "百度在线网络技术")
    ).toDF("code", "name")

    // 笛卡尔积 + 相似度计算
    val result = df1.crossJoin(df2)
      .withColumn("similarity", similarityUDF($"name", $"name"))
      .filter($"similarity" >= 0.7)
      .orderBy($"name", $"similarity".desc)

    result.show(false)

    spark.stop()
  }
}

5. 实战:数据清洗

5.1 场景:地址清洗

# 地址标准化
from pyspark.sql import functions as F

addresses = [
    ("北京市朝阳区建国路88号",),
    ("北京 朝阳区 建国路 88号",),
    ("北京市-朝阳区-建国路88号",),
    ("上海浦东新区世纪大道100号",),
    ("上海 浦东新区 世纪大道100号",),
]

df = spark.createDataFrame(addresses, ["address"])

# 标准化函数
@F.udf()
def normalize_address(addr):
    if addr is None:
        return None
    # 去除空格、标点,统一格式
    import re
    addr = re.sub(r'[\s\-,\.]+', '', addr)  # 去除分隔符
    addr = addr.replace('市', '').replace('区', '')  # 简化
    return addr

df.withColumn("normalized", normalize_address("address")).show(truncate=False)

5.2 场景:手机号脱敏

# 手机号模糊匹配(中间4位用*替代)
@F.udf()
def mask_phone(phone):
    if phone is None or len(phone) != 11:
        return phone
    return phone[:3] + "****" + phone[7:]

# 使用
df.withColumn("masked_phone", mask_phone(F.col("phone"))).show()

6. 性能优化技巧

6.1 避免笛卡尔积

# ❌ 低效:笛卡尔积
result = df1.crossJoin(df2)
result = result.withColumn("sim", similarity_udf(...))

# ✅ 优化1:先过滤候选
result = df1.join(df2,
    (F.length(F.col("name1")) - F.length(F.col("name2"))).abs() <= 3
)

# ✅ 优化2:使用分区
result = df1.repartition(100).join(df2, "key")

6.2 索引优化

# 添加长度过滤
df1.join(df2,
    (F.abs(F.length(F.col("name1")) - F.length(F.col("name2"))) <= threshold) &
    (F.substring(F.col("name1"), 1, 2) == F.substring(F.col("name2"), 1, 2))
)

6.3 分块处理

# 按首字母分块,减少比较次数
df1.withColumn("prefix", F.substring("name", 1, 1)) \
   .join(df2.withColumn("prefix", F.substring("name", 1, 1)), "prefix")

7. 常见问题

7.1 中文分词

# 中文模糊匹配需要先分词
import jieba

@F.udf()
def tokenize_chinese(text):
    if text is None:
        return None
    return " ".join(jieba.cut(text))

# 分词后再计算Jaccard
@F.udf(DoubleType())
def jaccard_chinese(s1, s2):
    if s1 is None or s2 is None:
        return 0.0
    set1 = set(s1.split())
    set2 = set(s2.split())
    return len(set1 & set2) / len(set1 | set2)

7.2 相似度阈值选择

场景阈值说明
严格要求0.9几乎相同
标准匹配0.8允许少量差异
宽松匹配0.7允许较大差异
候选筛选0.6初筛,后处理

8. 练习题

练习1

实现一个基于N-gram的模糊匹配函数。

练习2

对公司名称进行匹配,找出相似度>0.8的对。

练习3

使用布隆过滤器优化大规模字符串匹配的性能。

练习4

实现中文分词后的Jaccard相似度计算。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x07-字符串模糊匹配
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们