顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x04-时间窗口实战

DMIT VPS

时间窗口实战

本篇是《大数据算法与UDF系列》的第4篇,深入讲解时间窗口的三大类型(滚动、滑动、会话),以及Flink流处理中的迟到数据处理和Watermark机制。


1. 什么是时间窗口?

1.1 业务场景

在实时数据分析中,我们经常需要:

  • 每分钟的网站访问量
  • 每小时的销售额
  • 每5分钟的用户活跃度
  • 会话内的用户行为

这些需求都依赖于时间窗口来实现。

1.2 窗口类型概览

┌─────────────────────────────────────────────────────────────────┐
│                      时间窗口三大类型                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   📊 滚动窗口 (Tumbling Window)                                 │
│   ─────────────────────────────────────                        │
│   [00:00-00:05)[00:05-00:10)[00:10-00:15)                    │
│   固定大小,不重叠,完整切分                                     │
│   适用:固定周期的统计                                          │
│                                                                 │
│   📊 滑动窗口 (Sliding Window)                                 │
│   ─────────────────────────────────────                        │
│   [00:00-00:10)[00:02-00:12)[00:04-00:14)...                 │
│   固定大小,可重叠,移动步长可配置                               │
│   适用:移动平均、趋势分析                                      │
│                                                                 │
│   📊 会话窗口 (Session Window)                                 │
│   ─────────────────────────────────────                        │
│   ════┐    ═══════════┐      ════┐                          │
│        │    │            │      │                            │
│     会话1   会话2        会话3                                 │
│   动态大小,根据间隙生成                                         │
│   适用:用户行为分析                                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2. 滚动窗口(Tumbling Window)

2.1 原理图解

┌─────────────────────────────────────────────────────────────────�│
│                     滚动窗口原理                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   数据流: |---t1---|---t2---|---t3---|---t4---|---t5---|      │
│                                                                 │
│   窗口大小: 2分钟                                              │
│                                                                 │
│   Window 1: [00:00 - 00:02)  包含 t1                         │
│   Window 2: [00:02 - 00:04)  包含 t2                         │
│   Window 3: [00:04 - 00:06)  包含 t3                         │
│   ...                                                          │
│                                                                 │
│   特点:                                                        │
│   ✓ 不重叠                                                     │
│   ✓ 固定大小                                                   │
│   ✓ 数据不重复计算                                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 PySpark Batch实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# 模拟数据
data = [
    ("2024-01-01 10:00:00", "user1", "view"),
    ("2024-01-01 10:01:00", "user1", "click"),
    ("2024-01-01 10:02:00", "user2", "view"),
    ("2024-01-01 10:03:00", "user2", "buy"),
    ("2024-01-01 10:04:00", "user3", "view"),
    ("2024-01-01 10:05:00", "user3", "click"),
]

df = spark.createDataFrame(data, ["timestamp", "user_id", "action"])

# 转换为时间戳
df = df.withColumn("ts", F.to_timestamp("timestamp"))

# 1. 按1分钟滚动窗口分组
result = df.groupBy(
    F.window("ts", "1 minute")  # 1分钟滚动窗口
).agg(
    F.count("*").alias("cnt"),
    F.countDistinct("user_id").alias("uv")
)

result.show(truncate=False)

输出

+------------------------------------------+----+---+
|window                                    |cnt |uv |
+------------------------------------------+----+---+
|[2024-01-01 10:00:00, 2024-01-01 10:01:00]|2   |1  |
|[2024-01-01 10:02:00, 2024-01-01 10:03:00]|2   |2  |
|[2024-01-01 10:04:00, 2024-01-01 10:05:00]|2   |1  |
+------------------------------------------+----+---+

2.3 Flink SQL实现

-- 滚动窗口(每5分钟)
SELECT
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
    COUNT(*) as cnt,
    COUNT(DISTINCT user_id) as uv
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)

2.4 Scala Spark实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

object TumblingWindow {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TumblingWindow")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val data = Seq(
      ("2024-01-01 10:00:00", "user1", "view"),
      ("2024-01-01 10:01:00", "user1", "click"),
      ("2024-01-01 10:02:00", "user2", "view"),
      ("2024-01-01 10:03:00", "user2", "buy"),
      ("2024-01-01 10:04:00", "user3", "view"),
      ("2024-01-01 10:05:00", "user3", "click")
    )

    val df = data.toDF("timestamp", "user_id", "action")
      .withColumn("ts", to_timestamp($"timestamp"))

    // 1分钟滚动窗口
    val result = df.groupBy(
      window($"ts", "1 minute")
    ).agg(
      count("*").as("cnt"),
      countDistinct("user_id").as("uv")
    )

    result.show(false)

    spark.stop()
  }
}

3. 滑动窗口(Sliding Window)

3.1 原理图解

┌─────────────────────────────────────────────────────────────────┐
│                     滑动窗口原理                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   数据流: |--t1--|--t2--|--t3--|--t4--|--t5--|--t6--|         │
│                                                                 │
│   窗口大小: 3分钟                                               │
│   滑动步长: 2分钟                                               │
│                                                                 │
│   Window 1: [00:00 - 00:03)  包含 t1,t2                       │
│        ↓ 滑动2分钟                                              │
│   Window 2: [00:02 - 00:05)  包含 t2,t3,t4                   │
│        ↓ 滑动2分钟                                              │
│   Window 3: [00:04 - 00:07)  包含 t3,t4,t5                   │
│        ↓ 滑动2分钟                                              │
│   Window 4: [00:06 - 00:09)  包含 t4,t5,t6                   │
│                                                                 │
│   特点:                                                        │
│   ✓ 可重叠                                                     │
│   ✓ 移动步长可配置                                              │
│   ✓ 数据可能重复计算                                            │
│   ✓ 更平滑的统计曲线                                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 PySpark实现

# 滑动窗口:窗口大小3分钟,滑动步长1分钟
result = df.groupBy(
    F.window("ts", "3 minutes", "1 minute")  # (windowLength, slideLength)
).agg(
    F.count("*").alias("cnt"),
    F.avg("amount").alias("avg_amount")
)

result.show(truncate=False)

3.3 滚动 vs 滑动 对比

特性滚动窗口滑动窗口
重叠不重叠可重叠
大小固定固定
步长= 窗口大小< 窗口大小
适用独立周期统计平滑趋势分析
计算量较少较多

3.4 业务场景选择

┌─────────────────────────────────────────────────────────────────┐
│                     场景选择指南                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   滚动窗口适用:                                                 │
│   ├─ 每日销售额统计                                            │
│   ├─ 每小时网站PV                                              │
│   └─ 整点报数                                                  │
│                                                                 │
│   滑动窗口适用:                                                 │
│   ├─ 最近5分钟的平均响应时间                                    │
│   ├─ 移动平均线(MA5、MA10、MA20)                              │
│   ├─ 实时趋势监控                                              │
│   └─ 异常检测(与历史对比)                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

4. 会话窗口(Session Window)

4.1 原理图解

┌─────────────────────────────────────────────────────────────────┐
│                     会话窗口原理                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   用户行为序列:                                                 │
│   user1: [view@10:00][click@10:01][buy@10:02]----[view@10:30] │
│                                                                 │
│   会话间隔(gap): 10分钟                                         │
│                                                                 │
│   会话1: [10:00 - 10:12)                                       │
│          view → click → buy (间隔<10分钟)                      │
│                                                                 │
│   ────────── 超过10分钟间隔 ──────────                          │
│                                                                 │
│   会话2: [10:30 - 10:40)                                       │
│          view (单独会话)                                        │
│                                                                 │
│   特点:                                                        │
│   ✓ 动态大小                                                   │
│   ✓ 根据用户行为自动生成                                        │
│   ✓ 适合用户行为分析                                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

4.2 PySpark实现

# 会话窗口:间隔5分钟
result = df.groupBy(
    "user_id",
    F.session_window("ts", "5 minutes")  # 会话间隔5分钟
).agg(
    F.count("*").alias("action_cnt"),
    F.min("timestamp").alias("session_start"),
    F.max("timestamp").alias("session_end")
)

result.show(truncate=False)

4.3 Flink SQL实现

-- 会话窗口
SELECT
    SESSION_START(event_time, INTERVAL '10' MINUTE) as session_start,
    SESSION_END(event_time, INTERVAL '10' MINUTE) as session_end,
    user_id,
    COUNT(*) as action_cnt
FROM source_table
GROUP BY SESSION(event_time, INTERVAL '10' MINUTE), user_id

5. 迟到数据处理与Watermark

5.1 乱序问题

┌─────────────────────────────────────────────────────────────────┐
│                        乱序数据示例                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   理想顺序:  t1@10:00 → t2@10:01 → t3@10:02 → t4@10:03        │
│                                                                 │
│   实际顺序:  t1@10:00 → t3@10:02 → t2@10:01 → t4@10:03        │
│                                        ↑                       │
│                                    乱序到达                      │
│                                                                 │
│   到达时间:  t1@10:00:05 → t3@10:00:06 → t2@10:00:07         │
│                                                                 │
│   问题:                                                        │
│   - t2应该在窗口[10:00-10:02)中计算                            │
│   - 但当t3到达时,窗口[10:00-10:02)可能已经触发输出             │
│   - 导致t2被丢弃或计算错误                                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 Watermark(水印)机制

核心思想:等待一定时间后再触发窗口计算

┌─────────────────────────────────────────────────────────────────┐
│                      Watermark 原理                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   配置: Watermark = 事件时间 - 最大允许延迟                       │
│                                                                 │
│   例如: max_lateness = 5秒                                       │
│         Watermark = 当前事件时间 - 5秒                          │
│                                                                 │
│   ─────────────────────────────────────────────                │
│                                                                 │
│   数据流:     [10:00][10:01][10:02][10:03][10:04]              │
│                ↓                                                │
│   Watermark:    9:55  9:56  9:57  9:58  9:59                   │
│                ↓                                                │
│   触发条件: Watermark >= 窗口结束时间                            │
│                                                                 │
│   Window [10:00-10:02):                                         │
│   - 当Watermark >= 10:02时触发                                  │
│   - 即数据时间到达10:02+5秒 = 10:02:05时触发                    │
│                                                                 │
│   迟到数据处理:                                                 │
│   - Watermark之前的数据:正常计算                               │
│   - Watermark之后的数据:放入侧输出流或丢弃                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.3 Flink实现Watermark

// Java/Scala Flink
DataStream stream = env.addSource(new EventSource());

// 定义Watermark策略
DataStream watermarkedStream = stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
    );

// 使用事件时间窗口
watermarkedStream
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .sum("amount");

5.4 PySpark Streaming实现

from pyspark.sql import functions as F

# 定义Watermark
watermark_df = df.withWatermark("timestamp", "5 minutes")

# 使用Watermark进行窗口计算
result = watermark_df \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "category"
    ) \
    .agg(F.count("*").alias("cnt"))

# 迟到数据会进入late data处理
result.isLate("timestamp")  # 标记迟到数据

6. 窗口实战:实时销售统计

6.1 需求

实时统计:

  1. 每分钟销售额(滚动窗口)
  2. 最近5分钟销售趋势(滑动窗口)
  3. 用户会话行为(会话窗口)

6.2 完整代码

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("WindowDemo") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# 模拟实时订单数据
orders = [
    ("2024-01-01 10:00:01", "user1", "Electronics", 1000),
    ("2024-01-01 10:00:30", "user1", "Clothing", 200),
    ("2024-01-01 10:01:15", "user2", "Electronics", 1500),
    ("2024-01-01 10:02:00", "user3", "Food", 50),
    ("2024-01-01 10:02:30", "user1", "Electronics", 800),  # 同一用户
    ("2024-01-01 10:10:00", "user3", "Food", 60),  # 超过5分钟间隔
]

df = spark.createDataFrame(orders, ["timestamp", "user_id", "category", "amount"])
df = df.withColumn("ts", F.to_timestamp("timestamp"))

print("=" * 60)
print("1. 滚动窗口:每分钟销售额")
print("=" * 60)

tumbling = df.groupBy(
    F.window("ts", "1 minute", "1 minute")
).agg(
    F.sum("amount").alias("total_amount"),
    F.count("*").alias("order_cnt")
).select(
    F.col("window.start").alias("window_start"),
    F.col("window.end").alias("window_end"),
    "total_amount",
    "order_cnt"
)

tumbling.orderBy("window_start").show(truncate=False)

print("=" * 60)
print("2. 滑动窗口:最近3分钟的平均订单金额")
print("=" * 60)

sliding = df.groupBy(
    F.window("ts", "3 minutes", "1 minute")
).agg(
    F.avg("amount").alias("avg_amount"),
    F.count("*").alias("order_cnt")
).select(
    F.col("window.start").alias("window_start"),
    F.col("window.end").alias("window_end"),
    "avg_amount",
    "order_cnt"
)

sliding.orderBy("window_start").show(truncate=False)

print("=" * 60)
print("3. 会话窗口:用户会话行为(间隔5分钟)")
print("=" * 60)

session = df.groupBy(
    "user_id",
    F.session_window("ts", "5 minutes")
).agg(
    F.sum("amount").alias("session_total"),
    F.collect_list("category").alias("categories"),
    F.count("*").alias("action_cnt")
).select(
    "user_id",
    F.col("window.start").alias("session_start"),
    F.col("window.end").alias("session_end"),
    "session_total",
    "categories"
)

session.show(truncate=False)

6.3 输出结果

============================================================
1. 滚动窗口:每分钟销售额
============================================================
+---------------------+---------------------+------------+---------+
|window_start        |window_end          |total_amount|order_cnt|
+---------------------+---------------------+------------+---------+
|2024-01-01 10:00:00 |2024-01-01 10:01:00 |1200        |2        |
|2024-01-01 10:01:00 |2024-01-01 10:02:00 |1500        |1        |
|2024-01-01 10:02:00 |2024-01-01 10:03:00 |850         |2        |
|2024-01-01 10:10:00 |2024-01-01 10:11:00 |60          |1        |
+---------------------+---------------------+------------+---------+

============================================================
2. 滑动窗口:最近3分钟的平均订单金额
============================================================
+---------------------+---------------------+------------------+---------+
|window_start        |window_end          |avg_amount        |order_cnt|
+---------------------+---------------------+------------------+---------+
|2024-01-01 10:00:00 |2024-01-01 10:03:00 |683.33            |3        |
|2024-01-01 10:01:00 |2024-01-01 10:04:00 |783.33            |3        |
|2024-01-01 10:02:00 |2024-01-01 10:05:00 |783.33            |3        |
|2024-01-01 10:03:00 |2024-01-01 10:06:00 |425.0             |2        |
+---------------------+---------------------+------------------+---------+

============================================================
3. 会话窗口:用户会话行为(间隔5分钟)
============================================================
+-----+---------------------+---------------------+--------------+----------+
|user_id|session_start        |session_end          |session_total |categories|
+-----+---------------------+---------------------+--------------+----------+
|user1 |2024-01-01 10:00:01 |2024-01-01 10:02:30 |2000          |[Electronics, Clothing, Electronics]|
|user2 |2024-01-01 10:01:15 |2024-01-01 10:02:15 |1500          |[Electronics]|
|user3 |2024-01-01 10:02:00 |2024-01-01 10:03:00 |50            |[Food]     |
|user3 |2024-01-01 10:10:00 |2024-01-01 10:11:00 |60            |[Food]     |
+-----+---------------------+---------------------+--------------+----------

7. 窗口函数组合使用

7.1 窗口 + 窗口函数

# 滚动窗口内计算排名
result = df.groupBy(
    F.window("ts", "5 minute"),
    "category"
).agg(
    F.sum("amount").alias("total")
).withColumn(
    "rank",
    F.row_number().over(
        Window.orderBy(F.desc("total"))
    )
)

7.2 多窗口同时计算

# 同时计算1分钟和5分钟窗口
from pyspark.sql import DataFrame

df_1min = df.groupBy(
    F.window("ts", "1 minute"),
    "category"
).agg(F.sum("amount").alias("amount_1min"))

df_5min = df.groupBy(
    F.window("ts", "5 minute"),
    "category"
).agg(F.sum("amount").alias("amount_5min"))

# 关联
result = df_1min.join(df_5min, ["window", "category"])

8. 常见问题与优化

8.1 窗口太大,内存溢出

# ❌ 问题:窗口太大
df.groupBy(
    F.window("ts", "1 hour")
).agg(F.count("*"))

# ✅ 优化:先预聚合,减少窗口内数据量
# 或者增加分区数
df.repartition(1000).groupBy(
    F.window("ts", "1 hour")
).agg(F.count("*"))

8.2 窗口太多,计算慢

# ❌ 问题:小窗口导致太多分区
df.groupBy(
    F.window("ts", "1 second")  # 每秒一个窗口
).agg(F.count("*"))

# ✅ 优化:使用更大的窗口,或先过滤
df.filter(F.col("amount") > 100).groupBy(
    F.window("ts", "1 minute")
).agg(F.count("*"))

8.3 迟到数据处理策略

策略说明适用场景
丢弃直接丢弃迟到数据实时性要求高,允许误差
侧输出流收集迟到数据单独处理需要100%准确性
二次计算定时全量重跑离线+实时结合
修正更新发送修正数据强一致性要求
# 侧输出流示例
watermarked_stream = df.assignTimestampsAndWatermarks(...)

# 主输出流
main_output = watermarked_stream.keyBy(...).window(...).sum(...)

# 迟到数据
late_output = watermarked_stream.getSideOutput(LateDataOutputTag())

9. Flink时间窗口进阶

9.1 窗口起始时间对齐

-- 默认:从00:00开始每5分钟一个窗口
TUMBLE(event_time, INTERVAL '5' MINUTE)

-- 从1分钟开始
TUMBLE(event_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE)

-- 结果: [00:01-00:06), [00:06-00:11), ...

9.2 窗口延迟触发

// 窗口延迟触发:窗口结束后再等待1分钟
.window(TumblingEventTimeWindows.of(Time.minutes(5))
    .withAllowedLateness(Time.minutes(1)))

9.3 窗口增量计算

// 增量窗口函数
window.reduce(
    (value1, value2) -> value1 + value2,  // 增量聚合
    (key, window, aggregateResult) -> {   // 输出
        return new Result(key, window.getStart(), aggregateResult);
    }
);

10. 完整示例:实时销售大屏

10.1 需求

实时销售大屏需要:

  1. 实时GMV(当日累计)
  2. 实时订单数(当日累计)
  3. 每分钟销售额(最近1小时)
  4. 热卖商品Top10(最近30分钟)
  5. 各品类销售占比(当天)

10.2 完整代码

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# 模拟实时订单流
orders = [
    ("2024-01-01 09:30:00", "user1", "Electronics", 1000),
    ("2024-01-01 09:31:00", "user2", "Clothing", 300),
    ("2024-01-01 09:32:00", "user1", "Electronics", 1500),
    ("2024-01-01 09:33:00", "user3", "Food", 80),
    ("2024-01-01 09:34:00", "user2", "Clothing", 500),
    ("2024-01-01 09:35:00", "user1", "Electronics", 800),
    ("2024-01-01 10:00:00", "user4", "Electronics", 2000),
    ("2024-01-01 10:01:00", "user5", "Food", 150),
]

df = spark.createDataFrame(orders, ["timestamp", "user_id", "category", "amount"])
df = df.withColumn("ts", F.to_timestamp("timestamp"))

# 设置Watermark(允许延迟30秒)
df_with_wm = df.withWatermark("ts", "30 seconds")

# 1. 当日累计GMV和订单数
daily_summary = df.agg(
    F.sum("amount").alias("total_gmv"),
    F.count("*").alias("total_orders")
)
print("=" * 50)
print("当日累计GMV和订单数")
print("=" * 50)
daily_summary.show()

# 2. 每分钟销售额(最近1小时滚动窗口)
minute_sales = df.groupBy(
    F.window("ts", "1 hour", "1 minute")
).agg(
    F.sum("amount").alias("sales")
).select(
    F.col("window.start").alias("time"),
    "sales"
).orderBy("time")

print("=" * 50)
print("每分钟销售额(最近1小时)")
print("=" * 50)
minute_sales.show(truncate=False)

# 3. 热卖商品Top10(最近30分钟)
hot_products = df.filter(
    F.col("ts") >= F.current_timestamp() - F.expr("INTERVAL 30 MINUTES")
).groupBy("category").agg(
    F.sum("amount").alias("sales")
).orderBy(F.desc("sales")).limit(10)

print("=" * 50)
print("热卖品类Top10(最近30分钟)")
print("=" * 50)
hot_products.show()

# 4. 品类销售占比
category_ratio = df.groupBy("category").agg(
    F.sum("amount").alias("sales")
).withColumn(
    "ratio",
    F.col("sales") / F.sum("sales").over() * 100
).orderBy(F.desc("sales"))

print("=" * 50)
print("品类销售占比")
print("=" * 50)
category_ratio.show()

11. 练习题

练习1

计算每5分钟的用户活跃数(UV),使用滚动窗口。

练习2

实现一个"最近30分钟平均响应时间"的滑动窗口统计。

练习3

分析用户会话:统计每个用户的会话数、平均会话时长、会话内平均点击数。

练习4

设置Watermark为10秒,模拟迟到数据,观察输出结果。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x04-时间窗口实战
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们