窗口函数与累计计算
本篇是《大数据算法与UDF系列》的第1篇,将带领大家从零认识窗口函数,掌握累计计算、排名分析等高级技能。
1. 什么是窗口函数?
1.1 通俗解释
想象一下,你站在一扇窗户前,窗户大小可以调整:
┌─────────────────────────────────────────┐
│ 原始数据: 1月 2月 3月 4月 5月 6月 │
│ │
│ 窗口=当月: [1月] [2月] [3月] [4月] │ ← 只看当前月
│ 窗口=累计: [1月] [1-2月] [1-3月]... │ ← 从开始累计到当前月
│ 窗口=滑动: [1-2月] [2-3月] [3-4月] │ ← 移动的2个月窗口
└─────────────────────────────────────────┘窗口函数就是这样一个"动态窗户",它让每一行数据都能"看到"周围的数据,进行计算。
1.2 正式定义
窗口函数(Window Function)是一种在当前行的前后上下文中执行计算的函数,它不会改变原始数据的行数,每一行都独立计算。
1.3 对比理解
| 传统聚合 | 窗口函数 |
|---|---|
SELECT SUM(amount) FROM sales | SELECT amount, SUM(amount) OVER() FROM sales |
| 结果:1行(总数) | 结果:多行(每行都有累计值) |
| 数据被"压缩"了 | 数据保持原样 |
2. 窗口函数基础
2.1 核心语法
函数名() OVER (
PARTITION BY 分组列 -- 可选:按某列分组,相当于 GROUP BY
ORDER BY 排序列 -- 可选:定义窗口内数据的顺序
窗口框架 -- 可选:定义窗口大小
)2.2 窗口函数分类
┌────────────────────────────────────────────────────────┐
│ 窗口函数分类 │
├────────────────────────────────────────────────────────┤
│ │
│ 1️⃣ 聚合窗口函数 │
│ SUM / AVG / COUNT / MIN / MAX │
│ │
│ 2️⃣ 排名窗口函数 │
│ ROW_NUMBER / RANK / DENSE_RANK │
│ │
│ 3️⃣ 分析窗口函数 │
│ LAG / LEAD / FIRST_VALUE / LAST_VALUE │
│ │
│ 4️⃣ 分位数函数 │
│ PERCENT_RANK / CUME_DIST / NTILE │
│ │
└────────────────────────────────────────────────────────┘3. 累计求和与移动平均
3.1 业务场景
假设我们有如下销售数据:
| date | category | sales |
|---|---|---|
| 2024-01-01 | A | 100 |
| 2024-01-02 | A | 150 |
| 2024-01-03 | A | 200 |
| 2024-01-04 | A | 180 |
| 2024-01-01 | B | 80 |
| 2024-01-02 | B | 120 |
| 2024-01-03 | B | 90 |
| 2024-01-04 | B | 160 |
目标:计算每个类别每天的累计销售额
3.2 Python (PySpark) 实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 1. 创建SparkSession
spark = SparkSession.builder \
.appName("WindowFunctionDemo") \
.master("local[*]") \
.getOrCreate()
# 2. 创建示例数据
data = [
("2024-01-01", "A", 100),
("2024-01-02", "A", 150),
("2024-01-03", "A", 200),
("2024-01-04", "A", 180),
("2024-01-01", "B", 80),
("2024-01-02", "B", 120),
("2024-01-03", "B", 90),
("2024-01-04", "B", 160),
]
df = spark.createDataFrame(data, ["date", "category", "sales"])
# 3. 定义窗口规范
# 按 category 分组,按 date 排序,从开始到当前行累计
window_spec = Window.partitionBy("category") \
.orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
# 4. 添加累计销售额列
result = df.withColumn(
"cumsum_sales",
F.sum("sales").over(window_spec)
)
# 5. 显示结果
result.orderBy("category", "date").show()输出结果:
+----------+--------+-----+------------+
| date|category|sales| cumsum_sales|
+----------+--------+-----+------------+
|2024-01-01| A| 100| 100|
|2024-01-02| A| 150| 250|
|2024-01-03| A| 200| 450|
|2024-01-04| A| 180| 630|
|2024-01-01| B| 80| 80|
|2024-01-02| B| 120| 200|
|2024-01-03| B| 90| 290|
|2024-01-04| B| 160| 450|
+----------+--------+-----+------------+3.3 Scala (Spark) 实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object WindowFunctionDemo {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession
val spark = SparkSession.builder()
.appName("WindowFunctionDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 2. 创建示例数据
val data = Seq(
("2024-01-01", "A", 100),
("2024-01-02", "A", 150),
("2024-01-03", "A", 200),
("2024-01-04", "A", 180),
("2024-01-01", "B", 80),
("2024-01-02", "B", 120),
("2024-01-03", "B", 90),
("2024-01-04", "B", 160)
)
val df = data.toDF("date", "category", "sales")
// 3. 定义窗口规范
val windowSpec = Window.partitionBy("category")
.orderBy("date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
// 4. 添加累计销售额列
val result = df.withColumn(
"cumsum_sales",
sum("sales").over(windowSpec)
)
// 5. 显示结果
result.orderBy("category", "date").show()
spark.stop()
}
}3.4 窗口框架详解
┌────────────────────────────────────────────────────────────────┐
│ 窗口框架类型对比 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 假设数据: day1=100, day2=150, day3=200, day4=180 │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ rowsBetween(UNBOUNDED_PRECEDING, CURRENT_ROW) │
│ ┌───┬───┬───┬───┐ │
│ │100│150│200│180│ ← 累计到当前行: [100, 250, 450, 630] │
│ └───┴───┴───┴───┘ │
│ │
│ rowsBetween(CURRENT_ROW, UNBOUNDED_FOLLOWING) │
│ ┌───┬───┬───┬───┐ │
│ │100│150│200│180│ ← 从当前行到最后: [630, 530, 380, 180] │
│ └───┴───┴───┴───┘ │
│ │
│ rowsBetween(-1, 1) ── 前后1行 │
│ ┌───┬───┬───┬───┐ │
│ │100│150│200│180│ ← 滑动窗口: [未定义, 450, 530, 380] │
│ └───┴───┴───┴───┘ │
│ │
│ rowsBetween(0, 0) ── 只有当前行 │
│ ┌───┬───┬───┬───┐ │
│ │100│150│200│180│ ← 就是原始值: [100, 150, 200, 180] │
│ └───┴───┴───┴───┘ │
│ │
└────────────────────────────────────────────────────────────────┘3.5 移动平均(Moving Average)
移动平均是数据分析中常用的平滑技术:
# Python: 计算最近3天的移动平均
window_3days = Window.partitionBy("category") \
.orderBy("date") \
.rowsBetween(-2, 0) # 当前行及前2行
result = df.withColumn(
"moving_avg_3d",
F.avg("sales").over(window_3days)
)// Scala: 计算最近3天的移动平均
val window_3days = Window.partitionBy("category")
.orderBy("date")
.rowsBetween(-2, 0)
val result = df.withColumn(
"moving_avg_3d",
avg("sales").over(window_3days)
)4. 排名与分位数
4.1 三种排名函数对比
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 创建示例数据(有并列排名)
data = [
("Alice", "Math", 95),
("Bob", "Math", 90),
("Charlie", "Math", 90),
("David", "Math", 85),
]
df = spark.createDataFrame(data, ["name", "subject", "score"])
# 定义窗口
window_spec = Window.partitionBy("subject").orderBy(F.desc("score"))
# 三种排名方式
result = df.withColumn("row_number", F.row_number().over(window_spec)) \
.withColumn("rank", F.rank().over(window_spec)) \
.withColumn("dense_rank", F.dense_rank().over(window_spec))
result.show()输出对比:
+--------+-------+-----+----------+-----+-----------+
| name|subject|score|row_number|rank|dense_rank|
+--------+-------+-----+----------+-----+-----------+
| Alice| Math| 95| 1| 1| 1|
| Bob| Math| 90| 2| 2| 2|
| Charlie| Math| 90| 3| 2| 2|
| David| Math| 85| 4| 4| 3|
+--------+-------+-----+----------+-----+-----------+对比说明:
| 函数 | 特点 | 适用场景 |
|---|---|---|
ROW_NUMBER | 1,2,3,4 唯一递增 | 精确排名、取TopN |
RANK | 1,2,2,4 跳跃排名 | 有并列、显示差距 |
DENSE_RANK | 1,2,2,3 连续排名 | 连续排名如"第N名" |
4.2 分组TopN问题
业务场景:找出每个类别销售额Top3的商品
# Python: 每个类别销量Top3
window_rank = Window.partitionBy("category").orderBy(F.desc("sales"))
top3 = df.withColumn("rank", F.row_number().over(window_rank)) \
.filter(F.col("rank") <= 3) \
.orderBy("category", "rank")// Scala: 每个类别销量Top3
val windowRank = Window.partitionBy("category").orderBy(desc("sales"))
val top3 = df.withColumn("rank", row_number().over(windowRank))
.filter(col("rank") <= 3)
.orderBy("category", "rank")4.3 分位数分析
# 计算分位数 - 每行数据在分组中的百分比位置
window_spec = Window.partitionBy("subject").orderBy(F.desc("score"))
# PERCENT_RANK: 0到1的百分比排名
df.withColumn("percent_rank", F.percent_rank().over(window_spec))
# CUME_DIST: 累计分布(小于等于当前值的比例)
df.withColumn("cume_dist", F.cume_dist().over(window_spec))
# NTILE(n): 将数据分成n组
df.withColumn("quartile", F.ntile(4).over(window_spec))5. 滑动窗口实战
5.1 滚动窗口 vs 滑动窗口
┌─────────────────────────────────────────────────────────────┐
│ 滚动窗口 (Tumbling) │
│ 固定大小,不重叠 │
│ ───┬────┬────┬────┬────┬────┬──── │
│ │窗口1│窗口2│窗口3│窗口4│ │
│ ───┴────┴────┴────┴────┴────┴──── │
│ │
│ 滑动窗口 (Sliding) │
│ 固定大小,可重叠 │
│ ───┬────┬────┬────┬────┬────┬──── │
│ │╔══╗╔══╗╔══╗╔══╗ │
│ ║窗口1║窗口2║窗口3║窗口4 │
│ ╚══╝╚══╝╚══╝╚══╝ │
│ ───┴────┴────┴────┴────┴────┴──── │
│ │
│ 会话窗口 (Session) │
│ 根据活动间隙动态生成 │
│ ════┐ ═══════┐ │
│ │ │ │ │
│ 会话1 │ 会话2 │ │
└─────────────────────────────────────────────────────────────┘5.2 PySpark滑动窗口示例
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 模拟用户行为日志
log_data = [
("2024-01-01 10:00", "user1", "view"),
("2024-01-01 10:01", "user1", "click"),
("2024-01-01 10:05", "user1", "view"),
("2024-01-01 10:06", "user1", "buy"),
("2024-01-01 10:20", "user1", "view"),
]
df = spark.createDataFrame(log_data, ["timestamp", "user_id", "action"])
# 转换时间戳
df = df.withColumn("ts", F.to_timestamp("timestamp"))
# 计算5分钟滑动窗口内的行为次数
window_sliding = Window.partitionBy("user_id") \
.orderBy("ts") \
.rangeBetween(-5 * 60, 0) # 5分钟 = 300秒
result = df.withColumn("action_count_5min", F.count("*").over(window_sliding))
result.show()5.3 时间戳窗口(Range Windows)
# 使用时间戳定义窗口范围
window_range = Window.partitionBy("user_id") \
.orderBy("ts") \
.rangeBetween(Window.currentRow, 3 * 60 * 60) # 3小时内
# 计算3小时内的累计消费金额
result = df.withColumn(
"amount_3h",
F.sum("amount").over(window_range)
)6. 分析窗口函数
6.1 LAG / LEAD - 访问上下行
# LAG: 获取前N行数据
# LEAD: 获取后N行数据
window_spec = Window.partitionBy("category").orderBy("date")
# 获取昨天的销售额
df.withColumn("yesterday_sales", F.lag("sales", 1).over(window_spec))
# 获取明天的销售额(预测用)
df.withColumn("tomorrow_sales", F.lead("sales", 1).over(window_spec))
# 计算环比增长
df.withColumn("yesterday_sales", F.lag("sales", 1).over(window_spec)) \
.withColumn("growth", F.col("sales") - F.col("yesterday_sales"))可视化:
原始数据: day1 day2 day3 day4
sales: 100 150 200 180
LAG(sales,1): 100 150 200 ← 前一行
LEAD(sales,1): 150 200 180 null ← 后一行6.2 FIRST_VALUE / LAST_VALUE
# 获取窗口内第一个值
df.withColumn("first_sales", F.first("sales").over(window_spec))
# 获取窗口内最后一个值
df.withColumn("last_sales", F.last("sales").over(window_spec))
# ⚠️ 注意:默认窗口是 first 到 current row
# 如果需要从第一行到最後一行,使用:
window_all = Window.partitionBy("category").orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("total_sales", F.sum("sales").over(window_all))7. 常见注意事项
7.1 Window聚合 vs GroupBy
# ❌ 错误:Window函数不能与聚合函数一起使用(除非嵌套)
df.groupBy("category").agg(F.sum("sales"), F.row_number().over(Window.orderBy("date")))
# ✅ 正确:先聚合,再窗口
df.groupBy("category", "date").agg(F.sum("sales").alias("daily_sales")) \
.withColumn("cumsum", F.sum("daily_sales").over(Window.partitionBy("category")))7.2 Null值处理
# LAG/LEAD 默认返回null,可使用 coalesce 填充
df.withColumn("prev_sales",
F.coalesce(F.lag("sales", 1).over(window_spec), F.lit(0)))7.3 性能优化
# ❌ 避免:在超大数据集上使用无分区、无排序的窗口
df.withColumn("rank", F.row_number().over(Window.orderBy("id")))
# ✅ 优化:先分区、再排序
window_spec = Window.partitionBy("category").orderBy("date")
df.withColumn("rank", F.row_number().over(window_spec))
# ✅ 更好的优化:使用 Bloom Filter 加速7.4 数据倾斜处理
# 当某分区数据量巨大时,使用 salt(加盐)技术
import random
# 给大分区添加随机后缀,分散处理
df.withColumn("category_salt",
F.concat("category", F.lit("_"), F.lit(random.randint(0, 9))))8. 深度思考
8.1 窗口函数 vs UDTF
| 特性 | 窗口函数 | UDTF(用户定义表函数) |
|---|---|---|
| 返回行数 | 不变 | 可能增加 |
| 语法 | 内置 | 自定义 |
| 性能 | 优化过 | 可能较慢 |
| 适用场景 | 排名、累计 | 复杂逻辑 |
8.2 Streaming场景
在Flink中,窗口函数更加重要:
-- Flink SQL 滚动窗口
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
COUNT(*) as cnt
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
-- Flink SQL 滑动窗口
SELECT
HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start,
COUNT(*) as cnt
FROM source_table
GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)8.3 窗口水印(Watermark)
在处理乱序数据时:
数据流: [t=1][t=3][t=2][t=5][t=4][t=8]
↓ watermark=3s
丢弃: [t=1][t=3] ← t<=3 被丢弃
处理: [t=2][t=5][t=4][t=8]9. 完整示例:电商数据分析
9.1 需求
分析每个用户、每天的:
- 当日销售额
- 累计销售额
- 排名
- 环比增长
- 最近3天移动平均
9.2 完整代码
Python版本
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
# 模拟电商订单数据
orders = [
("2024-01-01", "user1", "productA", 100),
("2024-01-01", "user1", "productB", 50),
("2024-01-01", "user2", "productA", 200),
("2024-01-02", "user1", "productA", 150),
("2024-01-02", "user2", "productC", 80),
("2024-01-03", "user1", "productB", 120),
("2024-01-03", "user2", "productA", 90),
("2024-01-04", "user1", "productC", 200),
("2024-01-04", "user2", "productB", 150),
]
df = spark.createDataFrame(orders, ["date", "user_id", "product", "amount"])
# 按用户、日期聚合
daily = df.groupBy("user_id", "date").agg(
F.sum("amount").alias("daily_amount")
).orderBy("user_id", "date")
# 定义窗口
window_cumsum = Window.partitionBy("user_id").orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_3d = Window.partitionBy("user_id").orderBy("date") \
.rowsBetween(-2, 0)
window_rank = Window.partitionBy("user_id").orderBy(F.desc("daily_amount"))
# 添加所有分析指标
result = daily \
.withColumn("cumsum_amount", F.sum("daily_amount").over(window_cumsum)) \
.withColumn("prev_amount", F.lag("daily_amount", 1).over(window_cumsum)) \
.withColumn("growth", F.col("daily_amount") - F.col("prev_amount")) \
.withColumn("growth_pct",
F.when(F.col("prev_amount").isNotNull(),
F.round((F.col("daily_amount") - F.col("prev_amount")) / F.col("prev_amount") * 100, 2))
.otherwise(F.lit(0))) \
.withColumn("ma_3d", F.avg("daily_amount").over(window_3d)) \
.withColumn("rank", F.row_number().over(window_rank))
result.show()输出:
+-------+----------+------------+-----------+------+--------+------+----+
|user_id| date|daily_amount|cumsum_amount|prev_amount|growth|growth_pct| ma_3d|rank|
+-------+----------+------------+-----------+------+--------+------+----+
| user1|2024-01-01| 150| 150| null| null| null|150.0| 3|
| user1|2024-01-02| 150| 300| 150| 0| 0.0|150.0| 3|
| user1|2024-01-03| 120| 420| 150| -30| -20.0|140.0| 4|
| user1|2024-01-04| 200| 620| 120| 80| 66.67|156.67| 1|
| user2|2024-01-01| 200| 200| null| null| null|200.0| 2|
| user2|2024-01-02| 80| 280| 200| -120| -60.0|140.0| 4|
| user2|2024-01-03| 90| 370| 80| 10| 12.5|123.33| 3|
| user2|2024-01-04| 150| 520| 90| 60| 66.67|106.67| 1|
+-------+----------+------------+-----------+------+--------+------+----+10. 练习题
练习1:累计占比
计算每个用户累计销售额占总销售额的百分比。
练习2:分组TopN
找出每个类别销量Top3的商品,且包含并列。
练习3:连续登录
找出连续登录3天以上的用户。
练习4:移动最大
计算最近3天的最大单日销售额。
順子の杂货铺



