Join算法详解与优化
本篇是《大数据算法与UDF系列》的第2篇,带你深入理解大数据场景下的Join算法原理,掌握各种Join类型的适用场景,以及生产环境中的优化技巧。
1. Join类型与原理
1.1 六种Join类型
┌─────────────────────────────────────────────────────────────────┐
│ 六种Join类型 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 表A: {a1, a2, a3} 表B: {a2, a3, a4} │
│ │
│ INNER JOIN: {a2, a3} 交集 │
│ LEFT JOIN: {a1, a2, a3} A全部 + B匹配 │
│ RIGHT JOIN: {a2, a3, a4} B全部 + A匹配 │
│ FULL OUTER JOIN: {a1, a2, a3, a4} 并集 │
│ LEFT SEMI JOIN: {a1} A中存在于B的 │
│ LEFT ANTI JOIN: {a1} A中不存在于B的 │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 业务场景对应
| Join类型 | 业务场景 | 示例 |
|---|---|---|
| INNER | 匹配双方 | 订单关联用户信息 |
| LEFT | 主表全保留 | 全部用户 + 订单信息 |
| RIGHT | 副表全保留 | 全部订单 + 用户信息 |
| FULL | 全部保留 | 全量数据分析 |
| LEFT SEMI | 过滤 | 有购买记录的用户 |
| LEFT ANTI | 排除 | 无购买记录的用户 |
2. Join算法核心原理
2.1 Join执行流程图
┌─────────────────────────────────────────────────────────────────┐
│ Spark SQL Join 执行流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ 表A │ │ 表B │ ← 输入两个表 │
│ │(左侧) │ │(右侧) │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ 扫描 │ │ 扫描 │ ← 读取数据 │
│ │ Stage1 │ │ Stage2 │ ← 产生Shuffle │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Shuffle & Sort │ ← 数据重分布 │
│ │ (Hash Partition / Range) │ │
│ └──────────────────┬──────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Join 算子执行 │ ← 遍历匹配 │
│ │ (Hash Join / Sort Merge Join) │ │
│ └──────────────────┬──────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 输出结果 │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘2.2 核心问题:如何高效匹配?
Join的本质是找匹配。关键问题是:如何快速找到匹配的行?
两种思路:
- 广播小表(Broadcast):把整个小表复制到每个节点
- 分区 + 排序(Partition + Sort):让相同Key进入同一分区
3. Broadcast Join(广播Join)
3.1 原理图解
┌─────────────────────────────────────────────────────────────────┐
│ Broadcast Join 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 大表: 1TB 小表: 10MB │
│ ┌────────────────────┐ ┌──────────┐ │
│ │ partition1: 100GB │ │broadcast │ │
│ │ partition2: 100GB │ ───────┐ │ copy │ │
│ │ partition3: 100GB │ ───────┼──────▶│ ──▶ │ │
│ │ partition4: 100GB │ ───────┐ │ ──▶ │ │
│ │ ... │ │ │ ──▶ │ │
│ └────────────────────┘ │ └──────────┘ │
│ │ │
│ 大表不动 每个executor都有一份小表 │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ 本地Hash Join │ │
│ │ partition1 + broadcast│ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 适用场景
| 条件 | 说明 |
|---|---|
| 小表大小 | < 10MB(默认阈值,可配置) |
| 内存充足 | 广播表能放入Executor内存 |
| Join类型 | INNER, LEFT, RIGHT |
| Key分布 | 任意,不需要考虑数据倾斜 |
3.3 Python (PySpark) 实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
# 示例表
users = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
], ["user_id", "name"])
orders = spark.createDataFrame([
(1, 1, 100),
(2, 1, 200),
(3, 2, 150),
(4, 3, 300),
(5, 4, 50), # user_id=4 不存在
], ["order_id", "user_id", "amount"])
# 方式1:自动选择Broadcast(默认行为)
# 当小表足够小时,Spark会自动使用broadcast
result = orders.join(users, "user_id")
result.explain()
# 方式2:强制使用Broadcast Hint
result = orders.join(F.broadcast(users), "user_id")
# 方式3:配置自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") # 10MB
result.show()3.4 Scala (Spark) 实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast
object BroadcastJoinDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("BroadcastJoinDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 示例表
val users = Seq((1, "Alice"), (2, "Bob"), (3, "Charlie"))
.toDF("user_id", "name")
val orders = Seq(
(1, 1, 100),
(2, 1, 200),
(3, 2, 150),
(4, 3, 300),
(5, 4, 50)
).toDF("order_id", "user_id", "amount")
// 方式1:自动Broadcast
val result1 = orders.join(users, "user_id")
// 方式2:强制Broadcast
val result2 = orders.join(broadcast(users), "user_id")
// 方式3:配置阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")
result2.explain(true)
result2.show()
spark.stop()
}
}4. Shuffle Hash Join(随机分区哈希Join)
4.1 原理图解
┌─────────────────────────────────────────────────────────────────┐
│ Shuffle Hash Join 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Step 1: Shuffle (按Join Key分区) │
│ ───────────────────────────────────────────────────────── │
│ │
│ 表A 表B │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Key=1 [a1] │──Shuffle─▶│ partition1 │ │
│ │ Key=2 [a2] │ │ Key=1 [b1] │ │
│ │ Key=3 [a3] │ │ Key=2 [b2] │ │
│ │ Key=1 [a4] │──Shuffle─▶│ partition2 │ │
│ │ Key=2 [a5] │ │ Key=1 [b3] │ │
│ │ Key=3 [a6] │ │ Key=3 [b4] │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ Step 2: Build & Probe (构建哈希表 + 探测) │
│ ───────────────────────────────────────────────────────── │
│ │
│ partition1: │
│ ┌─────────────┐ │
│ │ Build: │ ← 构建Hash表 │
│ │ Key=1 → [b1]│ │
│ └─────────────┘ │
│ │ │
│ ┌────┴────┐ │
│ │ Probe: │ ← 遍历A表查找 │
│ │ a1 Key=1│ ──匹配──▶ [b1] │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘4.2 适用场景
| 条件 | 说明 |
|---|---|
| 大表 + 大表 | 两表都较大,无法广播 |
| Key分布均匀 | 无严重数据倾斜 |
| 内存 | 有足够内存构建Hash表 |
4.3 启用Shuffle Hash Join
# 启用 Shuffle Hash Join(Spark 3.0+ 默认关闭)
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "true")
# 设置Hash表构建的内存阈值
spark.conf.set("spark.sql.shuffle.partitions", "200") # 分区数5. Sort Merge Join(排序合并Join)
5.1 原理图解
┌─────────────────────────────────────────────────────────────────┐
│ Sort Merge Join 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Step 1: Shuffle (按Key分区) │
│ ───────────────────────────────────────────────────────── │
│ 表A: ──Shuffle──▶ partition1 (Key 1-100) │
│ ──Shuffle──▶ partition2 (Key 101-200) │
│ ──Shuffle──▶ partition3 (Key 201-300) │
│ │
│ Step 2: Sort (分区内部排序) │
│ ───────────────────────────────────────────────────────── │
│ partition1-A: [Key=1, 5, 12, 50, 80...] ← 按Key升序 │
│ partition1-B: [Key=1, 12, 30, 50, 90...] ← 按Key升序 │
│ │
│ Step 3: Merge (双指针扫描合并) │
│ ───────────────────────────────────────────────────────── │
│ │
│ A: [1, 5, 12, 50, 80...] │
│ B: [1, 12, 30, 50, 90...] │
│ ↓ 双指针 │
│ 匹配: (1,1), (12,12), (50,50) │
│ │
└─────────────────────────────────────────────────────────────────┘5.2 适用场景
| 条件 | 说明 |
|---|---|
| 大表 + 大表 | 两表都很大 |
| 有序数据 | 或者可接受排序成本 |
| Key可比较 | 支持排序操作 |
5.3 启用Sort Merge Join
# Sort Merge Join 默认开启
spark.conf.set("spark.sql.join.sortMergeJoinEnabled", "true")
# 排序相关配置
spark.conf.set("spark.sql.shuffle.partitions", "200")6. Join类型选择策略
6.1 自动选择流程
┌─────────────────────────────────────────────────────────────────┐
│ Spark Join 选择策略 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 输入: 表A, 表B, Join Key │
│ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ A或B < 阈值? │ ← 默认10MB │
│ └────────┬────────┘ │
│ Yes │ No │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │Broadcast Join │ │ Key分布倾斜? │ │
│ └─────────────────┘ └────────┬────────┘ │
│ Yes │ No │
│ ▼ ▼ │
│ ┌─────────────────┐ │
│ │ 广播大表 │ ← 缓解倾斜 │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Shuffle Hash │ ← 内存足够 │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Sort Merge Join │ ← 默认选择 │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘6.2 手动指定Join策略
from pyspark.sql import functions as F
# 强制Broadcast
df1.join(F.broadcast(df2), "key")
# 强制Shuffle Hash(通过配置)
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # 禁用自动广播
df1.join(df2, "key")
# 强制Sort Merge
spark.conf.set("spark.sql.join.sortMergeJoinEnabled", "true")
spark.conf.set("spark.sql.join.shuffleHashJoinEnabled", "false")
df1.join(df2, "key")7. 常见问题与解决方案
7.1 数据倾斜(Skew Join)
问题:某个Key数据量特别大,导致个别任务特别慢
┌─────────────────────────────────────────────────────────────────┐
│ 数据倾斜示例 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 正常分布: 倾斜分布: │
│ ┌────────┐ ┌────────┐ │
│ │Key=A │ │Key=999 │ ← 99%的数据 │
│ │Key=B │ │Key=1 │ │
│ │Key=C │ │Key=2 │ │
│ │Key=D │ │Key=3 │ │
│ └────────┘ └────────┘ │
│ │
│ Task: 4个平衡 Task: 1个超级慢,其他空闲 │
│ │
└─────────────────────────────────────────────────────────────────┘解决方案1:广播倾斜Key
# 找出倾斜的Key
skew_keys = df1.groupBy("key").count() \
.filter(F.col("count") > threshold) \
.select("key")
# 广播倾斜Key进行单独处理
skew_part = df1.join(F.broadcast(skew_keys), "key")
normal_part = df1.join(df2, "key", "left") \
.join(skew_part, "key", "left_anti")
result = skew_part.union(normal_part)解决方案2:加盐(Salting)
import random
# 给倾斜Key添加随机后缀,分散到多个分区
df1_salted = df1.withColumn("key_salt",
F.when(F.col("key") == "倾斜Key",
F.concat(F.col("key"), F.lit("_"), F.lit(random.randint(0, 9))))
.otherwise(F.col("key"))
)
# Join后去掉盐
result = df1_salted.join(df2, df1_salted.key_salt == df2.key) \
.drop(df1_salted.key)7.2 笛卡尔积
问题:忘记写Join条件,导致笛卡尔积
# ❌ 错误:笛卡尔积
result = df1.join(df2) # 没有on条件
# ✅ 正确:明确Join条件
result = df1.join(df2, "key") # 等值Join
result = df1.join(df2, df1.key == df2.key) # 多条件
result = df1.join(df2, ["key1", "key2"]) # 多列7.3 小表重复计算
问题:同一个大表Join多个小表,小表被重复广播
# ❌ 低效:每次Join都广播dim1, dim2
result = fact.join(dim1, "key").join(dim2, "key")
# ✅ 优化:先合并小表,再广播
dim_merged = dim1.join(dim2, "key")
result = fact.join(F.broadcast(dim_merged), "key")8. 性能优化最佳实践
8.1Join顺序优化
# ❌ 错误:大表先Join
result = big_table.join(table1, "key").join(table2, "key")
# ✅ 正确:小表先Join
result = table1.join(table2, "key").join(big_table, "key")8.2 提前过滤数据
# ❌ 低效:先Join再过滤
result = fact.join(dim, "key").filter(F.col("status") == "active")
# ✅ 优化:先过滤再Join
dim_filtered = dim.filter(F.col("status") == "active")
result = fact.join(dim_filtered, "key")8.3 选择合适的数据类型
# ❌ 问题:类型不匹配导致性能问题
df1.select(F.col("id").cast("string")).join(
df2.select(F.col("id").cast("int")), "id"
)
# ✅ 优化:统一类型
df1 = df1.withColumn("id", F.col("id").cast("bigint"))
df2 = df2.withColumn("id", F.col("id").cast("bigint"))
result = df1.join(df2, "id")8.4 分区裁剪
# ✅ 优化:利用分区裁剪
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "true")
# 只读取需要的分区
df = spark.read.table("partitioned_table")
df.filter(F.col("date") == "2024-01-01").join(dim, "key")9. 完整示例:电商多表Join
9.1 业务场景
订单分析:关联 订单表 + 用户表 + 商品表 + 地区表
9.2 数据模型
orders (大表, 1TB)
├── order_id
├── user_id
├── product_id
├── region_id
├── amount
└── create_time
users (小表, 10MB)
├── user_id
├── name
├── age
└── level
products (小表, 20MB)
├── product_id
├── category
├── price
└── brand
regions (小表, 1MB)
├── region_id
├── region_name
└── country9.3 完整代码
Python版本
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("EcommerceJoin") \
.config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
.getOrCreate()
# 模拟数据
orders = spark.createDataFrame([
(1, 101, 1001, 1, 200, "2024-01-01"),
(2, 101, 1002, 2, 150, "2024-01-01"),
(3, 102, 1001, 1, 300, "2024-01-02"),
(4, 103, 1003, 3, 500, "2024-01-02"),
(5, 104, 1002, 2, 180, "2024-01-03"),
], ["order_id", "user_id", "product_id", "region_id", "amount", "create_date"])
users = spark.createDataFrame([
(101, "Alice", 25, "VIP"),
(102, "Bob", 30, "Normal"),
(103, "Charlie", 28, "VIP"),
(104, "David", 35, "Normal"),
(105, "Eve", 22, "Normal"),
], ["user_id", "name", "age", "level"])
products = spark.createDataFrame([
(1001, "Electronics", 2000, "BrandA"),
(1002, "Clothing", 500, "BrandB"),
(1003, "Food", 100, "BrandC"),
(1004, "Electronics", 3000, "BrandA"),
], ["product_id", "category", "price", "brand"])
regions = spark.createDataFrame([
(1, "Beijing", "China"),
(2, "Shanghai", "China"),
(3, "Guangzhou", "China"),
(4, "Shenzhen", "China"),
], ["region_id", "region_name", "country"])
# =============================================
# 优化策略1:先广播小表,合并后再与大表Join
# =============================================
# 广播所有小表
users_bc = F.broadcast(users)
products_bc = F.broadcast(products)
regions_bc = F.broadcast(regions)
# 按顺序Join
result = orders \
.join(users_bc, "user_id") \
.join(products_bc, "product_id") \
.join(regions_bc, "region_id")
# 添加分析指标
result = result.withColumn("profit", F.col("amount") * 0.1)
result.show()
# 查看执行计划
result.explain()Scala版本
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EcommerceJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("EcommerceJoin")
.config("spark.sql.autoBroadcastJoinThreshold", "10485760")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 模拟数据
val orders = Seq(
(1, 101, 1001, 1, 200, "2024-01-01"),
(2, 101, 1002, 2, 150, "2024-01-01"),
(3, 102, 1001, 1, 300, "2024-01-02"),
(4, 103, 1003, 3, 500, "2024-01-02"),
(5, 104, 1002, 2, 180, "2024-01-03")
).toDF("order_id", "user_id", "product_id", "region_id", "amount", "create_date")
val users = Seq(
(101, "Alice", 25, "VIP"),
(102, "Bob", 30, "Normal"),
(103, "Charlie", 28, "VIP"),
(104, "David", 35, "Normal"),
(105, "Eve", 22, "Normal")
).toDF("user_id", "name", "age", "level")
val products = Seq(
(1001, "Electronics", 2000, "BrandA"),
(1002, "Clothing", 500, "BrandB"),
(1003, "Food", 100, "BrandC"),
(1004, "Electronics", 3000, "BrandA")
).toDF("product_id", "category", "price", "brand")
val regions = Seq(
(1, "Beijing", "China"),
(2, "Shanghai", "China"),
(3, "Guangzhou", "China"),
(4, "Shenzhen", "China")
).toDF("region_id", "region_name", "country")
// 广播小表
val usersBc = broadcast(users)
val productsBc = broadcast(products)
val regionsBc = broadcast(regions)
// Join
val result = orders
.join(usersBc, "user_id")
.join(productsBc, "product_id")
.join(regionsBc, "region_id")
.withColumn("profit", col("amount") * 0.1)
result.show()
result.explain(true)
spark.stop()
}
}10. Join类型实战指南
10.1 快速选择表
| 场景 | 推荐Join |
|---|---|
| 大表 + 小表 | Broadcast Join |
| 大表 + 大表,Key均匀 | Sort Merge Join |
| 大表 + 大表,Key倾斜 | 广播倾斜Key + Sort Merge |
| 只需要左表匹配 | LEFT SEMI JOIN |
| 排除左表匹配 | LEFT ANTI JOIN |
10.2 调试命令
# 查看Join策略
result.explain(True)
# 查看数据分布
df.groupBy("key").count().orderBy(F.desc("count")).show(10)
# 查看分区大小
spark.sparkContext.statusTracker().getJobGroupInfo(jobId)11. 下篇预告
第3篇:聚合与TopN问题
- 基础聚合函数详解
- 精确TopN实现
- 近似TopN优化
- 分组TopN问题
- 预聚合策略
12. 练习题
练习1
分析orders和users表,找出没有下过订单的用户(使用LEFT ANTI JOIN)。
练习2
有用户表(10万条)和行为表(1亿条),如何高效Join?
练习3
如何处理Join中的数据倾斜问题?
順子の杂货铺


