顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x02-Join算法详解与优化

DMIT VPS

Join算法详解与优化

本篇是《大数据算法与UDF系列》的第2篇,带你深入理解大数据场景下的Join算法原理,掌握各种Join类型的适用场景,以及生产环境中的优化技巧。


1. Join类型与原理

1.1 六种Join类型

┌─────────────────────────────────────────────────────────────────┐
│                         六种Join类型                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   表A: {a1, a2, a3}        表B: {a2, a3, a4}                  │
│                                                                 │
│   INNER JOIN:         {a2, a3}       交集                        │
│   LEFT JOIN:          {a1, a2, a3}   A全部 + B匹配              │
│   RIGHT JOIN:         {a2, a3, a4}   B全部 + A匹配              │
│   FULL OUTER JOIN:    {a1, a2, a3, a4}  并集                    │
│   LEFT SEMI JOIN:     {a1}           A中存在于B的               │
│   LEFT ANTI JOIN:     {a1}           A中不存在于B的             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 业务场景对应

Join类型业务场景示例
INNER匹配双方订单关联用户信息
LEFT主表全保留全部用户 + 订单信息
RIGHT副表全保留全部订单 + 用户信息
FULL全部保留全量数据分析
LEFT SEMI过滤有购买记录的用户
LEFT ANTI排除无购买记录的用户

2. Join算法核心原理

2.1 Join执行流程图

┌─────────────────────────────────────────────────────────────────┐
│                      Spark SQL Join 执行流程                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────┐    ┌─────────┐                                   │
│   │  表A    │    │  表B    │   ← 输入两个表                      │
│   │(左侧)   │    │(右侧)   │                                   │
│   └────┬────┘    └────┬────┘                                   │
│        │              │                                         │
│        ▼              ▼                                         │
│   ┌─────────┐    ┌─────────┐                                   │
│   │  扫描   │    │  扫描   │   ← 读取数据                        │
│   │  Stage1 │    │  Stage2 │   ← 产生Shuffle                     │
│   └────┬────┘    └────┬────┘                                   │
│        │              │                                         │
│        ▼              ▼                                         │
│   ┌─────────────────────────────────────┐                       │
│   │         Shuffle & Sort             │   ← 数据重分布           │
│   │   (Hash Partition / Range)         │                       │
│   └──────────────────┬──────────────────┘                       │
│                      ▼                                          │
│   ┌─────────────────────────────────────┐                       │
│   │          Join 算子执行              │   ← 遍历匹配             │
│   │   (Hash Join / Sort Merge Join)    │                       │
│   └──────────────────┬──────────────────┘                       │
│                      ▼                                          │
│   ┌─────────────────────────────────────┐                       │
│   │           输出结果                  │                       │
│   └─────────────────────────────────────┘                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 核心问题:如何高效匹配?

Join的本质是找匹配。关键问题是:如何快速找到匹配的行?

两种思路:

  1. 广播小表(Broadcast):把整个小表复制到每个节点
  2. 分区 + 排序(Partition + Sort):让相同Key进入同一分区

3. Broadcast Join(广播Join)

3.1 原理图解

┌─────────────────────────────────────────────────────────────────┐
│                    Broadcast Join 原理                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   大表: 1TB                              小表: 10MB              │
│   ┌────────────────────┐                ┌──────────┐            │
│   │ partition1: 100GB │                │broadcast │            │
│   │ partition2: 100GB │ ───────┐       │  copy    │            │
│   │ partition3: 100GB │ ───────┼──────▶│  ──▶    │            │
│   │ partition4: 100GB │ ───────┐       │  ──▶    │            │
│   │ ...                │       │       │  ──▶    │            │
│   └────────────────────┘       │       └──────────┘            │
│                                │                                │
│   大表不动                     每个executor都有一份小表          │
│                                ↓                                │
│                     ┌─────────────────────┐                    │
│                     │  本地Hash Join      │                    │
│                     │  partition1 + broadcast│                 │
│                     └─────────────────────┘                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 适用场景

条件说明
小表大小< 10MB(默认阈值,可配置)
内存充足广播表能放入Executor内存
Join类型INNER, LEFT, RIGHT
Key分布任意,不需要考虑数据倾斜

3.3 Python (PySpark) 实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# 示例表
users = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
], ["user_id", "name"])

orders = spark.createDataFrame([
    (1, 1, 100),
    (2, 1, 200),
    (3, 2, 150),
    (4, 3, 300),
    (5, 4, 50),  # user_id=4 不存在
], ["order_id", "user_id", "amount"])

# 方式1:自动选择Broadcast(默认行为)
# 当小表足够小时,Spark会自动使用broadcast
result = orders.join(users, "user_id")
result.explain()

# 方式2:强制使用Broadcast Hint
result = orders.join(F.broadcast(users), "user_id")

# 方式3:配置自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10MB

result.show()

3.4 Scala (Spark) 实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast

object BroadcastJoinDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("BroadcastJoinDemo")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 示例表
    val users = Seq((1, "Alice"), (2, "Bob"), (3, "Charlie"))
      .toDF("user_id", "name")

    val orders = Seq(
      (1, 1, 100),
      (2, 1, 200),
      (3, 2, 150),
      (4, 3, 300),
      (5, 4, 50)
    ).toDF("order_id", "user_id", "amount")

    // 方式1:自动Broadcast
    val result1 = orders.join(users, "user_id")

    // 方式2:强制Broadcast
    val result2 = orders.join(broadcast(users), "user_id")

    // 方式3:配置阈值
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

    result2.explain(true)
    result2.show()

    spark.stop()
  }
}

4. Shuffle Hash Join(随机分区哈希Join)

4.1 原理图解

┌─────────────────────────────────────────────────────────────────┐
│                    Shuffle Hash Join 原理                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Step 1: Shuffle (按Join Key分区)                              │
│   ─────────────────────────────────────────────────────────     │
│                                                                 │
│   表A                        表B                                │
│   ┌──────────────┐          ┌──────────────┐                  │
│   │ Key=1  [a1]  │──Shuffle─▶│ partition1  │                  │
│   │ Key=2  [a2]  │          │ Key=1 [b1]   │                  │
│   │ Key=3  [a3]  │          │ Key=2 [b2]   │                  │
│   │ Key=1  [a4]  │──Shuffle─▶│ partition2   │                  │
│   │ Key=2  [a5]  │          │ Key=1 [b3]   │                  │
│   │ Key=3  [a6]  │          │ Key=3 [b4]   │                  │
│   └──────────────┘          └──────────────┘                  │
│                                                                 │
│   Step 2: Build & Probe (构建哈希表 + 探测)                      │
│   ─────────────────────────────────────────────────────────     │
│                                                                 │
│   partition1:                                                   │
│   ┌─────────────┐                                               │
│   │ Build:      │  ← 构建Hash表                                   │
│   │ Key=1 → [b1]│                                               │
│   └─────────────┘                                               │
│        │                                                        │
│   ┌────┴────┐                                                    │
│   │ Probe:  │  ← 遍历A表查找                                     │
│   │ a1 Key=1│ ──匹配──▶ [b1]                                    │
│   └─────────┘                                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

4.2 适用场景

条件说明
大表 + 大表两表都较大,无法广播
Key分布均匀无严重数据倾斜
内存有足够内存构建Hash表

4.3 启用Shuffle Hash Join

# 启用 Shuffle Hash Join(Spark 3.0+ 默认关闭)
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "true")

# 设置Hash表构建的内存阈值
spark.conf.set("spark.sql.shuffle.partitions", "200")  # 分区数

5. Sort Merge Join(排序合并Join)

5.1 原理图解

┌─────────────────────────────────────────────────────────────────┐
│                    Sort Merge Join 原理                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Step 1: Shuffle (按Key分区)                                   │
│   ─────────────────────────────────────────────────────────     │
│   表A:  ──Shuffle──▶ partition1 (Key 1-100)                     │
│        ──Shuffle──▶ partition2 (Key 101-200)                     │
│        ──Shuffle──▶ partition3 (Key 201-300)                     │
│                                                                 │
│   Step 2: Sort (分区内部排序)                                    │
│   ─────────────────────────────────────────────────────────     │
│   partition1-A: [Key=1, 5, 12, 50, 80...]  ← 按Key升序          │
│   partition1-B: [Key=1, 12, 30, 50, 90...]  ← 按Key升序          │
│                                                                 │
│   Step 3: Merge (双指针扫描合并)                                 │
│   ─────────────────────────────────────────────────────────     │
│                                                                 │
│   A: [1, 5, 12, 50, 80...]                                      │
│   B: [1, 12, 30, 50, 90...]                                      │
│   ↓ 双指针                                                      │
│   匹配: (1,1), (12,12), (50,50)                                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 适用场景

条件说明
大表 + 大表两表都很大
有序数据或者可接受排序成本
Key可比较支持排序操作

5.3 启用Sort Merge Join

# Sort Merge Join 默认开启
spark.conf.set("spark.sql.join.sortMergeJoinEnabled", "true")

# 排序相关配置
spark.conf.set("spark.sql.shuffle.partitions", "200")

6. Join类型选择策略

6.1 自动选择流程

┌─────────────────────────────────────────────────────────────────┐
│                    Spark Join 选择策略                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   输入: 表A, 表B, Join Key                                       │
│                                                                 │
│         ▼                                                       │
│   ┌─────────────────┐                                           │
│   │ A或B < 阈值?    │  ← 默认10MB                                │
│   └────────┬────────┘                                           │
│       Yes  │  No                                                │
│       ▼    ▼                                                   │
│   ┌─────────────────┐    ┌─────────────────┐                 │
│   │Broadcast Join   │    │ Key分布倾斜?     │                 │
│   └─────────────────┘    └────────┬────────┘                 │
│                              Yes   │  No                        │
│                              ▼    ▼                            │
│                        ┌─────────────────┐                     │
│                        │ 广播大表         │  ← 缓解倾斜            │
│                        └─────────────────┘                     │
│                              │                                  │
│                              ▼                                  │
│                        ┌─────────────────┐                     │
│                        │ Shuffle Hash    │  ← 内存足够           │
│                        └────────┬────────┘                     │
│                             │                                   │
│                             ▼                                   │
│                        ┌─────────────────┐                     │
│                        │ Sort Merge Join │  ← 默认选择           │
│                        └─────────────────┘                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6.2 手动指定Join策略

from pyspark.sql import functions as F

# 强制Broadcast
df1.join(F.broadcast(df2), "key")

# 强制Shuffle Hash(通过配置)
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # 禁用自动广播
df1.join(df2, "key")

# 强制Sort Merge
spark.conf.set("spark.sql.join.sortMergeJoinEnabled", "true")
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "false")
df1.join(df2, "key")

7. 常见问题与解决方案

7.1 数据倾斜(Skew Join)

问题:某个Key数据量特别大,导致个别任务特别慢

┌─────────────────────────────────────────────────────────────────┐
│                         数据倾斜示例                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   正常分布:          倾斜分布:                                   │
│   ┌────────┐        ┌────────┐                                  │
│   │Key=A   │        │Key=999 │ ← 99%的数据                       │
│   │Key=B   │        │Key=1   │                                  │
│   │Key=C   │        │Key=2   │                                  │
│   │Key=D   │        │Key=3   │                                  │
│   └────────┘        └────────┘                                  │
│                                                                 │
│   Task: 4个平衡    Task: 1个超级慢,其他空闲                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

解决方案1:广播倾斜Key

# 找出倾斜的Key
skew_keys = df1.groupBy("key").count() \
    .filter(F.col("count") > threshold) \
    .select("key")

# 广播倾斜Key进行单独处理
skew_part = df1.join(F.broadcast(skew_keys), "key")
normal_part = df1.join(df2, "key", "left") \
    .join(skew_part, "key", "left_anti")

result = skew_part.union(normal_part)

解决方案2:加盐(Salting)

import random

# 给倾斜Key添加随机后缀,分散到多个分区
df1_salted = df1.withColumn("key_salt",
    F.when(F.col("key") == "倾斜Key",
        F.concat(F.col("key"), F.lit("_"), F.lit(random.randint(0, 9))))
    .otherwise(F.col("key"))
)

# Join后去掉盐
result = df1_salted.join(df2, df1_salted.key_salt == df2.key) \
    .drop(df1_salted.key)

7.2 笛卡尔积

问题:忘记写Join条件,导致笛卡尔积

# ❌ 错误:笛卡尔积
result = df1.join(df2)  # 没有on条件

# ✅ 正确:明确Join条件
result = df1.join(df2, "key")           # 等值Join
result = df1.join(df2, df1.key == df2.key)  # 多条件
result = df1.join(df2, ["key1", "key2"])    # 多列

7.3 小表重复计算

问题:同一个大表Join多个小表,小表被重复广播

# ❌ 低效:每次Join都广播dim1, dim2
result = fact.join(dim1, "key").join(dim2, "key")

# ✅ 优化:先合并小表,再广播
dim_merged = dim1.join(dim2, "key")
result = fact.join(F.broadcast(dim_merged), "key")

8. 性能优化最佳实践

8.1Join顺序优化

# ❌ 错误:大表先Join
result = big_table.join(table1, "key").join(table2, "key")

# ✅ 正确:小表先Join
result = table1.join(table2, "key").join(big_table, "key")

8.2 提前过滤数据

# ❌ 低效:先Join再过滤
result = fact.join(dim, "key").filter(F.col("status") == "active")

# ✅ 优化:先过滤再Join
dim_filtered = dim.filter(F.col("status") == "active")
result = fact.join(dim_filtered, "key")

8.3 选择合适的数据类型

# ❌ 问题:类型不匹配导致性能问题
df1.select(F.col("id").cast("string")).join(
    df2.select(F.col("id").cast("int")), "id"
)

# ✅ 优化:统一类型
df1 = df1.withColumn("id", F.col("id").cast("bigint"))
df2 = df2.withColumn("id", F.col("id").cast("bigint"))
result = df1.join(df2, "id")

8.4 分区裁剪

# ✅ 优化:利用分区裁剪
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "true")

# 只读取需要的分区
df = spark.read.table("partitioned_table")
df.filter(F.col("date") == "2024-01-01").join(dim, "key")

9. 完整示例:电商多表Join

9.1 业务场景

订单分析:关联 订单表 + 用户表 + 商品表 + 地区表

9.2 数据模型

orders (大表, 1TB)
├── order_id
├── user_id
├── product_id
├── region_id
├── amount
└── create_time

users (小表, 10MB)
├── user_id
├── name
├── age
└── level

products (小表, 20MB)
├── product_id
├── category
├── price
└── brand

regions (小表, 1MB)
├── region_id
├── region_name
└── country

9.3 完整代码

Python版本

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("EcommerceJoin") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
    .getOrCreate()

# 模拟数据
orders = spark.createDataFrame([
    (1, 101, 1001, 1, 200, "2024-01-01"),
    (2, 101, 1002, 2, 150, "2024-01-01"),
    (3, 102, 1001, 1, 300, "2024-01-02"),
    (4, 103, 1003, 3, 500, "2024-01-02"),
    (5, 104, 1002, 2, 180, "2024-01-03"),
], ["order_id", "user_id", "product_id", "region_id", "amount", "create_date"])

users = spark.createDataFrame([
    (101, "Alice", 25, "VIP"),
    (102, "Bob", 30, "Normal"),
    (103, "Charlie", 28, "VIP"),
    (104, "David", 35, "Normal"),
    (105, "Eve", 22, "Normal"),
], ["user_id", "name", "age", "level"])

products = spark.createDataFrame([
    (1001, "Electronics", 2000, "BrandA"),
    (1002, "Clothing", 500, "BrandB"),
    (1003, "Food", 100, "BrandC"),
    (1004, "Electronics", 3000, "BrandA"),
], ["product_id", "category", "price", "brand"])

regions = spark.createDataFrame([
    (1, "Beijing", "China"),
    (2, "Shanghai", "China"),
    (3, "Guangzhou", "China"),
    (4, "Shenzhen", "China"),
], ["region_id", "region_name", "country"])

# =============================================
# 优化策略1:先广播小表,合并后再与大表Join
# =============================================

# 广播所有小表
users_bc = F.broadcast(users)
products_bc = F.broadcast(products)
regions_bc = F.broadcast(regions)

# 按顺序Join
result = orders \
    .join(users_bc, "user_id") \
    .join(products_bc, "product_id") \
    .join(regions_bc, "region_id")

# 添加分析指标
result = result.withColumn("profit", F.col("amount") * 0.1)

result.show()

# 查看执行计划
result.explain()

Scala版本

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EcommerceJoin {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("EcommerceJoin")
      .config("spark.sql.autoBroadcastJoinThreshold", "10485760")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 模拟数据
    val orders = Seq(
      (1, 101, 1001, 1, 200, "2024-01-01"),
      (2, 101, 1002, 2, 150, "2024-01-01"),
      (3, 102, 1001, 1, 300, "2024-01-02"),
      (4, 103, 1003, 3, 500, "2024-01-02"),
      (5, 104, 1002, 2, 180, "2024-01-03")
    ).toDF("order_id", "user_id", "product_id", "region_id", "amount", "create_date")

    val users = Seq(
      (101, "Alice", 25, "VIP"),
      (102, "Bob", 30, "Normal"),
      (103, "Charlie", 28, "VIP"),
      (104, "David", 35, "Normal"),
      (105, "Eve", 22, "Normal")
    ).toDF("user_id", "name", "age", "level")

    val products = Seq(
      (1001, "Electronics", 2000, "BrandA"),
      (1002, "Clothing", 500, "BrandB"),
      (1003, "Food", 100, "BrandC"),
      (1004, "Electronics", 3000, "BrandA")
    ).toDF("product_id", "category", "price", "brand")

    val regions = Seq(
      (1, "Beijing", "China"),
      (2, "Shanghai", "China"),
      (3, "Guangzhou", "China"),
      (4, "Shenzhen", "China")
    ).toDF("region_id", "region_name", "country")

    // 广播小表
    val usersBc = broadcast(users)
    val productsBc = broadcast(products)
    val regionsBc = broadcast(regions)

    // Join
    val result = orders
      .join(usersBc, "user_id")
      .join(productsBc, "product_id")
      .join(regionsBc, "region_id")
      .withColumn("profit", col("amount") * 0.1)

    result.show()
    result.explain(true)

    spark.stop()
  }
}

10. Join类型实战指南

10.1 快速选择表

场景推荐Join
大表 + 小表Broadcast Join
大表 + 大表,Key均匀Sort Merge Join
大表 + 大表,Key倾斜广播倾斜Key + Sort Merge
只需要左表匹配LEFT SEMI JOIN
排除左表匹配LEFT ANTI JOIN

10.2 调试命令

# 查看Join策略
result.explain(True)

# 查看数据分布
df.groupBy("key").count().orderBy(F.desc("count")).show(10)

# 查看分区大小
spark.sparkContext.statusTracker().getJobGroupInfo(jobId)

11. 下篇预告

第3篇:聚合与TopN问题

  • 基础聚合函数详解
  • 精确TopN实现
  • 近似TopN优化
  • 分组TopN问题
  • 预聚合策略

12. 练习题

练习1

分析orders和users表,找出没有下过订单的用户(使用LEFT ANTI JOIN)。

练习2

有用户表(10万条)和行为表(1亿条),如何高效Join?

练习3

如何处理Join中的数据倾斜问题?


赞(0)
未经允许不得转载:順子の杂货铺 » 0x02-Join算法详解与优化
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们