顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x01-窗口函数与累计计算

DMIT VPS

窗口函数与累计计算

本篇是《大数据算法与UDF系列》的第1篇,将带领大家从零认识窗口函数,掌握累计计算、排名分析等高级技能。


1. 什么是窗口函数?

1.1 通俗解释

想象一下,你站在一扇窗户前,窗户大小可以调整:

┌─────────────────────────────────────────┐
│  原始数据: 1月 2月 3月 4月 5月 6月       │
│                                         │
│  窗口=当月:    [1月] [2月] [3月] [4月]   │  ← 只看当前月
│  窗口=累计:    [1月] [1-2月] [1-3月]...  │  ← 从开始累计到当前月
│  窗口=滑动:       [1-2月] [2-3月] [3-4月] │  ← 移动的2个月窗口
└─────────────────────────────────────────┘

窗口函数就是这样一个"动态窗户",它让每一行数据都能"看到"周围的数据,进行计算。

1.2 正式定义

窗口函数(Window Function)是一种在当前行的前后上下文中执行计算的函数,它不会改变原始数据的行数,每一行都独立计算。

1.3 对比理解

传统聚合窗口函数
SELECT SUM(amount) FROM salesSELECT amount, SUM(amount) OVER() FROM sales
结果:1行(总数)结果:多行(每行都有累计值)
数据被"压缩"了数据保持原样

2. 窗口函数基础

2.1 核心语法

函数名() OVER (
    PARTITION BY 分组列     -- 可选:按某列分组,相当于 GROUP BY
    ORDER BY 排序列         -- 可选:定义窗口内数据的顺序
    窗口框架                -- 可选:定义窗口大小
)

2.2 窗口函数分类

┌────────────────────────────────────────────────────────┐
│                    窗口函数分类                         │
├────────────────────────────────────────────────────────┤
│                                                        │
│  1️⃣ 聚合窗口函数                                       │
│     SUM / AVG / COUNT / MIN / MAX                     │
│                                                        │
│  2️⃣ 排名窗口函数                                       │
│     ROW_NUMBER / RANK / DENSE_RANK                    │
│                                                        │
│  3️⃣ 分析窗口函数                                       │
│     LAG / LEAD / FIRST_VALUE / LAST_VALUE             │
│                                                        │
│  4️⃣ 分位数函数                                         │
│     PERCENT_RANK / CUME_DIST / NTILE                  │
│                                                        │
└────────────────────────────────────────────────────────┘

3. 累计求和与移动平均

3.1 业务场景

假设我们有如下销售数据:

datecategorysales
2024-01-01A100
2024-01-02A150
2024-01-03A200
2024-01-04A180
2024-01-01B80
2024-01-02B120
2024-01-03B90
2024-01-04B160

目标:计算每个类别每天的累计销售额

3.2 Python (PySpark) 实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. 创建SparkSession
spark = SparkSession.builder \
    .appName("WindowFunctionDemo") \
    .master("local[*]") \
    .getOrCreate()

# 2. 创建示例数据
data = [
    ("2024-01-01", "A", 100),
    ("2024-01-02", "A", 150),
    ("2024-01-03", "A", 200),
    ("2024-01-04", "A", 180),
    ("2024-01-01", "B", 80),
    ("2024-01-02", "B", 120),
    ("2024-01-03", "B", 90),
    ("2024-01-04", "B", 160),
]

df = spark.createDataFrame(data, ["date", "category", "sales"])

# 3. 定义窗口规范
# 按 category 分组,按 date 排序,从开始到当前行累计
window_spec = Window.partitionBy("category") \
                    .orderBy("date") \
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 4. 添加累计销售额列
result = df.withColumn(
    "cumsum_sales",
    F.sum("sales").over(window_spec)
)

# 5. 显示结果
result.orderBy("category", "date").show()

输出结果

+----------+--------+-----+------------+
|      date|category|sales| cumsum_sales|
+----------+--------+-----+------------+
|2024-01-01|       A|  100|         100|
|2024-01-02|       A|  150|         250|
|2024-01-03|       A|  200|         450|
|2024-01-04|       A|  180|         630|
|2024-01-01|       B|   80|          80|
|2024-01-02|       B|  120|         200|
|2024-01-03|       B|   90|         290|
|2024-01-04|       B|  160|         450|
+----------+--------+-----+------------+

3.3 Scala (Spark) 实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object WindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    // 1. 创建SparkSession
    val spark = SparkSession.builder()
      .appName("WindowFunctionDemo")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 2. 创建示例数据
    val data = Seq(
      ("2024-01-01", "A", 100),
      ("2024-01-02", "A", 150),
      ("2024-01-03", "A", 200),
      ("2024-01-04", "A", 180),
      ("2024-01-01", "B", 80),
      ("2024-01-02", "B", 120),
      ("2024-01-03", "B", 90),
      ("2024-01-04", "B", 160)
    )

    val df = data.toDF("date", "category", "sales")

    // 3. 定义窗口规范
    val windowSpec = Window.partitionBy("category")
      .orderBy("date")
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    // 4. 添加累计销售额列
    val result = df.withColumn(
      "cumsum_sales",
      sum("sales").over(windowSpec)
    )

    // 5. 显示结果
    result.orderBy("category", "date").show()

    spark.stop()
  }
}

3.4 窗口框架详解

┌────────────────────────────────────────────────────────────────┐
│                    窗口框架类型对比                             │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  假设数据: day1=100, day2=150, day3=200, day4=180             │
│                                                                │
│  ────────────────────────────────────────────────────────────  │
│                                                                │
│  rowsBetween(UNBOUNDED_PRECEDING, CURRENT_ROW)                │
│  ┌───┬───┬───┬───┐                                            │
│  │100│150│200│180│  ← 累计到当前行: [100, 250, 450, 630]       │
│  └───┴───┴───┴───┘                                            │
│                                                                │
│  rowsBetween(CURRENT_ROW, UNBOUNDED_FOLLOWING)                │
│  ┌───┬───┬───┬───┐                                            │
│  │100│150│200│180│  ← 从当前行到最后: [630, 530, 380, 180]     │
│  └───┴───┴───┴───┘                                            │
│                                                                │
│  rowsBetween(-1, 1)  ── 前后1行                               │
│  ┌───┬───┬───┬───┐                                            │
│  │100│150│200│180│  ← 滑动窗口: [未定义, 450, 530, 380]        │
│  └───┴───┴───┴───┘                                            │
│                                                                │
│  rowsBetween(0, 0)   ── 只有当前行                             │
│  ┌───┬───┬───┬───┐                                            │
│  │100│150│200│180│  ← 就是原始值: [100, 150, 200, 180]         │
│  └───┴───┴───┴───┘                                            │
│                                                                │
└────────────────────────────────────────────────────────────────┘

3.5 移动平均(Moving Average)

移动平均是数据分析中常用的平滑技术:

# Python: 计算最近3天的移动平均
window_3days = Window.partitionBy("category") \
                     .orderBy("date") \
                     .rowsBetween(-2, 0)  # 当前行及前2行

result = df.withColumn(
    "moving_avg_3d",
    F.avg("sales").over(window_3days)
)
// Scala: 计算最近3天的移动平均
val window_3days = Window.partitionBy("category")
  .orderBy("date")
  .rowsBetween(-2, 0)

val result = df.withColumn(
  "moving_avg_3d",
  avg("sales").over(window_3days)
)

4. 排名与分位数

4.1 三种排名函数对比

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 创建示例数据(有并列排名)
data = [
    ("Alice", "Math", 95),
    ("Bob", "Math", 90),
    ("Charlie", "Math", 90),
    ("David", "Math", 85),
]
df = spark.createDataFrame(data, ["name", "subject", "score"])

# 定义窗口
window_spec = Window.partitionBy("subject").orderBy(F.desc("score"))

# 三种排名方式
result = df.withColumn("row_number", F.row_number().over(window_spec)) \
           .withColumn("rank", F.rank().over(window_spec)) \
           .withColumn("dense_rank", F.dense_rank().over(window_spec))

result.show()

输出对比

+--------+-------+-----+----------+-----+-----------+
|    name|subject|score|row_number|rank|dense_rank|
+--------+-------+-----+----------+-----+-----------+
|   Alice|   Math|   95|         1|    1|          1|
|     Bob|   Math|   90|         2|    2|          2|
| Charlie|   Math|   90|         3|    2|          2|
|   David|   Math|   85|         4|    4|          3|
+--------+-------+-----+----------+-----+-----------+

对比说明

函数特点适用场景
ROW_NUMBER1,2,3,4 唯一递增精确排名、取TopN
RANK1,2,2,4 跳跃排名有并列、显示差距
DENSE_RANK1,2,2,3 连续排名连续排名如"第N名"

4.2 分组TopN问题

业务场景:找出每个类别销售额Top3的商品

# Python: 每个类别销量Top3
window_rank = Window.partitionBy("category").orderBy(F.desc("sales"))

top3 = df.withColumn("rank", F.row_number().over(window_rank)) \
         .filter(F.col("rank") <= 3) \
         .orderBy("category", "rank")
// Scala: 每个类别销量Top3
val windowRank = Window.partitionBy("category").orderBy(desc("sales"))

val top3 = df.withColumn("rank", row_number().over(windowRank))
  .filter(col("rank") <= 3)
  .orderBy("category", "rank")

4.3 分位数分析

# 计算分位数 - 每行数据在分组中的百分比位置
window_spec = Window.partitionBy("subject").orderBy(F.desc("score"))

# PERCENT_RANK: 0到1的百分比排名
df.withColumn("percent_rank", F.percent_rank().over(window_spec))

# CUME_DIST: 累计分布(小于等于当前值的比例)
df.withColumn("cume_dist", F.cume_dist().over(window_spec))

# NTILE(n): 将数据分成n组
df.withColumn("quartile", F.ntile(4).over(window_spec))

5. 滑动窗口实战

5.1 滚动窗口 vs 滑动窗口

┌─────────────────────────────────────────────────────────────┐
│                    滚动窗口 (Tumbling)                       │
│  固定大小,不重叠                                            │
│  ───┬────┬────┬────┬────┬────┬────                         │
│     │窗口1│窗口2│窗口3│窗口4│                              │
│  ───┴────┴────┴────┴────┴────┴────                         │
│                                                             │
│                    滑动窗口 (Sliding)                        │
│  固定大小,可重叠                                            │
│  ───┬────┬────┬────┬────┬────┬────                         │
│     │╔══╗╔══╗╔══╗╔══╗                           │
│     ║窗口1║窗口2║窗口3║窗口4                          │
│     ╚══╝╚══╝╚══╝╚══╝                           │
│  ───┴────┴────┴────┴────┴────┴────                         │
│                                                             │
│                    会话窗口 (Session)                        │
│  根据活动间隙动态生成                                        │
│  ════┐    ═══════┐                                         │
│      │    │       │                                          │
│   会话1 │ 会话2  │                                           │
└─────────────────────────────────────────────────────────────┘

5.2 PySpark滑动窗口示例

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 模拟用户行为日志
log_data = [
    ("2024-01-01 10:00", "user1", "view"),
    ("2024-01-01 10:01", "user1", "click"),
    ("2024-01-01 10:05", "user1", "view"),
    ("2024-01-01 10:06", "user1", "buy"),
    ("2024-01-01 10:20", "user1", "view"),
]
df = spark.createDataFrame(log_data, ["timestamp", "user_id", "action"])

# 转换时间戳
df = df.withColumn("ts", F.to_timestamp("timestamp"))

# 计算5分钟滑动窗口内的行为次数
window_sliding = Window.partitionBy("user_id") \
                       .orderBy("ts") \
                       .rangeBetween(-5 * 60, 0)  # 5分钟 = 300秒

result = df.withColumn("action_count_5min", F.count("*").over(window_sliding))
result.show()

5.3 时间戳窗口(Range Windows)

# 使用时间戳定义窗口范围
window_range = Window.partitionBy("user_id") \
                     .orderBy("ts") \
                     .rangeBetween(Window.currentRow, 3 * 60 * 60)  # 3小时内

# 计算3小时内的累计消费金额
result = df.withColumn(
    "amount_3h",
    F.sum("amount").over(window_range)
)

6. 分析窗口函数

6.1 LAG / LEAD - 访问上下行

# LAG: 获取前N行数据
# LEAD: 获取后N行数据

window_spec = Window.partitionBy("category").orderBy("date")

# 获取昨天的销售额
df.withColumn("yesterday_sales", F.lag("sales", 1).over(window_spec))

# 获取明天的销售额(预测用)
df.withColumn("tomorrow_sales", F.lead("sales", 1).over(window_spec))

# 计算环比增长
df.withColumn("yesterday_sales", F.lag("sales", 1).over(window_spec)) \
  .withColumn("growth", F.col("sales") - F.col("yesterday_sales"))

可视化

原始数据:    day1   day2   day3   day4
sales:      100    150    200    180

LAG(sales,1):       100    150    200   ← 前一行
LEAD(sales,1): 150   200   180   null   ← 后一行

6.2 FIRST_VALUE / LAST_VALUE

# 获取窗口内第一个值
df.withColumn("first_sales", F.first("sales").over(window_spec))

# 获取窗口内最后一个值
df.withColumn("last_sales", F.last("sales").over(window_spec))

# ⚠️ 注意:默认窗口是 first 到 current row
# 如果需要从第一行到最後一行,使用:
window_all = Window.partitionBy("category").orderBy("date") \
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("total_sales", F.sum("sales").over(window_all))

7. 常见注意事项

7.1 Window聚合 vs GroupBy

# ❌ 错误:Window函数不能与聚合函数一起使用(除非嵌套)
df.groupBy("category").agg(F.sum("sales"), F.row_number().over(Window.orderBy("date")))

# ✅ 正确:先聚合,再窗口
df.groupBy("category", "date").agg(F.sum("sales").alias("daily_sales")) \
  .withColumn("cumsum", F.sum("daily_sales").over(Window.partitionBy("category")))

7.2 Null值处理

# LAG/LEAD 默认返回null,可使用 coalesce 填充
df.withColumn("prev_sales",
    F.coalesce(F.lag("sales", 1).over(window_spec), F.lit(0)))

7.3 性能优化

# ❌ 避免:在超大数据集上使用无分区、无排序的窗口
df.withColumn("rank", F.row_number().over(Window.orderBy("id")))

# ✅ 优化:先分区、再排序
window_spec = Window.partitionBy("category").orderBy("date")
df.withColumn("rank", F.row_number().over(window_spec))

# ✅ 更好的优化:使用 Bloom Filter 加速

7.4 数据倾斜处理

# 当某分区数据量巨大时,使用 salt(加盐)技术
import random

# 给大分区添加随机后缀,分散处理
df.withColumn("category_salt",
    F.concat("category", F.lit("_"), F.lit(random.randint(0, 9))))

8. 深度思考

8.1 窗口函数 vs UDTF

特性窗口函数UDTF(用户定义表函数)
返回行数不变可能增加
语法内置自定义
性能优化过可能较慢
适用场景排名、累计复杂逻辑

8.2 Streaming场景

在Flink中,窗口函数更加重要:

-- Flink SQL 滚动窗口
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as cnt
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)

-- Flink SQL 滑动窗口
SELECT
    HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start,
    COUNT(*) as cnt
FROM source_table
GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)

8.3 窗口水印(Watermark)

在处理乱序数据时:

数据流:  [t=1][t=3][t=2][t=5][t=4][t=8]
                    ↓ watermark=3s
丢弃:   [t=1][t=3]  ← t<=3 被丢弃
处理:      [t=2][t=5][t=4][t=8]

9. 完整示例:电商数据分析

9.1 需求

分析每个用户、每天的:

  1. 当日销售额
  2. 累计销售额
  3. 排名
  4. 环比增长
  5. 最近3天移动平均

9.2 完整代码

Python版本

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# 模拟电商订单数据
orders = [
    ("2024-01-01", "user1", "productA", 100),
    ("2024-01-01", "user1", "productB", 50),
    ("2024-01-01", "user2", "productA", 200),
    ("2024-01-02", "user1", "productA", 150),
    ("2024-01-02", "user2", "productC", 80),
    ("2024-01-03", "user1", "productB", 120),
    ("2024-01-03", "user2", "productA", 90),
    ("2024-01-04", "user1", "productC", 200),
    ("2024-01-04", "user2", "productB", 150),
]

df = spark.createDataFrame(orders, ["date", "user_id", "product", "amount"])

# 按用户、日期聚合
daily = df.groupBy("user_id", "date").agg(
    F.sum("amount").alias("daily_amount")
).orderBy("user_id", "date")

# 定义窗口
window_cumsum = Window.partitionBy("user_id").orderBy("date") \
                      .rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_3d = Window.partitionBy("user_id").orderBy("date") \
                  .rowsBetween(-2, 0)
window_rank = Window.partitionBy("user_id").orderBy(F.desc("daily_amount"))

# 添加所有分析指标
result = daily \
    .withColumn("cumsum_amount", F.sum("daily_amount").over(window_cumsum)) \
    .withColumn("prev_amount", F.lag("daily_amount", 1).over(window_cumsum)) \
    .withColumn("growth", F.col("daily_amount") - F.col("prev_amount")) \
    .withColumn("growth_pct",
        F.when(F.col("prev_amount").isNotNull(),
            F.round((F.col("daily_amount") - F.col("prev_amount")) / F.col("prev_amount") * 100, 2))
        .otherwise(F.lit(0))) \
    .withColumn("ma_3d", F.avg("daily_amount").over(window_3d)) \
    .withColumn("rank", F.row_number().over(window_rank))

result.show()

输出

+-------+----------+------------+-----------+------+--------+------+----+
|user_id|      date|daily_amount|cumsum_amount|prev_amount|growth|growth_pct| ma_3d|rank|
+-------+----------+------------+-----------+------+--------+------+----+
|  user1|2024-01-01|          150|          150|      null|  null|      null|150.0|   3|
|  user1|2024-01-02|          150|          300|       150|     0|       0.0|150.0|   3|
|  user1|2024-01-03|          120|          420|       150|   -30|     -20.0|140.0|   4|
|  user1|2024-01-04|          200|          620|       120|    80|      66.67|156.67|   1|
|  user2|2024-01-01|          200|          200|      null|  null|      null|200.0|   2|
|  user2|2024-01-02|           80|          280|       200|  -120|     -60.0|140.0|   4|
|  user2|2024-01-03|           90|          370|        80|    10|      12.5|123.33|   3|
|  user2|2024-01-04|          150|          520|        90|    60|      66.67|106.67|   1|
+-------+----------+------------+-----------+------+--------+------+----+

10. 练习题

练习1:累计占比

计算每个用户累计销售额占总销售额的百分比。

练习2:分组TopN

找出每个类别销量Top3的商品,且包含并列。

练习3:连续登录

找出连续登录3天以上的用户。

练习4:移动最大

计算最近3天的最大单日销售额。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x01-窗口函数与累计计算
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们