时间窗口实战
本篇是《大数据算法与UDF系列》的第4篇,深入讲解时间窗口的三大类型(滚动、滑动、会话),以及Flink流处理中的迟到数据处理和Watermark机制。
1. 什么是时间窗口?
1.1 业务场景
在实时数据分析中,我们经常需要:
- 每分钟的网站访问量
- 每小时的销售额
- 每5分钟的用户活跃度
- 会话内的用户行为
这些需求都依赖于时间窗口来实现。
1.2 窗口类型概览
┌─────────────────────────────────────────────────────────────────┐
│ 时间窗口三大类型 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 📊 滚动窗口 (Tumbling Window) │
│ ───────────────────────────────────── │
│ [00:00-00:05)[00:05-00:10)[00:10-00:15) │
│ 固定大小,不重叠,完整切分 │
│ 适用:固定周期的统计 │
│ │
│ 📊 滑动窗口 (Sliding Window) │
│ ───────────────────────────────────── │
│ [00:00-00:10)[00:02-00:12)[00:04-00:14)... │
│ 固定大小,可重叠,移动步长可配置 │
│ 适用:移动平均、趋势分析 │
│ │
│ 📊 会话窗口 (Session Window) │
│ ───────────────────────────────────── │
│ ════┐ ═══════════┐ ════┐ │
│ │ │ │ │ │
│ 会话1 会话2 会话3 │
│ 动态大小,根据间隙生成 │
│ 适用:用户行为分析 │
│ │
└─────────────────────────────────────────────────────────────────┘2. 滚动窗口(Tumbling Window)
2.1 原理图解
┌─────────────────────────────────────────────────────────────────�│
│ 滚动窗口原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 数据流: |---t1---|---t2---|---t3---|---t4---|---t5---| │
│ │
│ 窗口大小: 2分钟 │
│ │
│ Window 1: [00:00 - 00:02) 包含 t1 │
│ Window 2: [00:02 - 00:04) 包含 t2 │
│ Window 3: [00:04 - 00:06) 包含 t3 │
│ ... │
│ │
│ 特点: │
│ ✓ 不重叠 │
│ ✓ 固定大小 │
│ ✓ 数据不重复计算 │
│ │
└─────────────────────────────────────────────────────────────────┘2.2 PySpark Batch实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
# 模拟数据
data = [
("2024-01-01 10:00:00", "user1", "view"),
("2024-01-01 10:01:00", "user1", "click"),
("2024-01-01 10:02:00", "user2", "view"),
("2024-01-01 10:03:00", "user2", "buy"),
("2024-01-01 10:04:00", "user3", "view"),
("2024-01-01 10:05:00", "user3", "click"),
]
df = spark.createDataFrame(data, ["timestamp", "user_id", "action"])
# 转换为时间戳
df = df.withColumn("ts", F.to_timestamp("timestamp"))
# 1. 按1分钟滚动窗口分组
result = df.groupBy(
F.window("ts", "1 minute") # 1分钟滚动窗口
).agg(
F.count("*").alias("cnt"),
F.countDistinct("user_id").alias("uv")
)
result.show(truncate=False)输出:
+------------------------------------------+----+---+
|window |cnt |uv |
+------------------------------------------+----+---+
|[2024-01-01 10:00:00, 2024-01-01 10:01:00]|2 |1 |
|[2024-01-01 10:02:00, 2024-01-01 10:03:00]|2 |2 |
|[2024-01-01 10:04:00, 2024-01-01 10:05:00]|2 |1 |
+------------------------------------------+----+---+2.3 Flink SQL实现
-- 滚动窗口(每5分钟)
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
COUNT(*) as cnt,
COUNT(DISTINCT user_id) as uv
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)2.4 Scala Spark实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
object TumblingWindow {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("TumblingWindow")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val data = Seq(
("2024-01-01 10:00:00", "user1", "view"),
("2024-01-01 10:01:00", "user1", "click"),
("2024-01-01 10:02:00", "user2", "view"),
("2024-01-01 10:03:00", "user2", "buy"),
("2024-01-01 10:04:00", "user3", "view"),
("2024-01-01 10:05:00", "user3", "click")
)
val df = data.toDF("timestamp", "user_id", "action")
.withColumn("ts", to_timestamp($"timestamp"))
// 1分钟滚动窗口
val result = df.groupBy(
window($"ts", "1 minute")
).agg(
count("*").as("cnt"),
countDistinct("user_id").as("uv")
)
result.show(false)
spark.stop()
}
}3. 滑动窗口(Sliding Window)
3.1 原理图解
┌─────────────────────────────────────────────────────────────────┐
│ 滑动窗口原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 数据流: |--t1--|--t2--|--t3--|--t4--|--t5--|--t6--| │
│ │
│ 窗口大小: 3分钟 │
│ 滑动步长: 2分钟 │
│ │
│ Window 1: [00:00 - 00:03) 包含 t1,t2 │
│ ↓ 滑动2分钟 │
│ Window 2: [00:02 - 00:05) 包含 t2,t3,t4 │
│ ↓ 滑动2分钟 │
│ Window 3: [00:04 - 00:07) 包含 t3,t4,t5 │
│ ↓ 滑动2分钟 │
│ Window 4: [00:06 - 00:09) 包含 t4,t5,t6 │
│ │
│ 特点: │
│ ✓ 可重叠 │
│ ✓ 移动步长可配置 │
│ ✓ 数据可能重复计算 │
│ ✓ 更平滑的统计曲线 │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 PySpark实现
# 滑动窗口:窗口大小3分钟,滑动步长1分钟
result = df.groupBy(
F.window("ts", "3 minutes", "1 minute") # (windowLength, slideLength)
).agg(
F.count("*").alias("cnt"),
F.avg("amount").alias("avg_amount")
)
result.show(truncate=False)3.3 滚动 vs 滑动 对比
| 特性 | 滚动窗口 | 滑动窗口 |
|---|---|---|
| 重叠 | 不重叠 | 可重叠 |
| 大小 | 固定 | 固定 |
| 步长 | = 窗口大小 | < 窗口大小 |
| 适用 | 独立周期统计 | 平滑趋势分析 |
| 计算量 | 较少 | 较多 |
3.4 业务场景选择
┌─────────────────────────────────────────────────────────────────┐
│ 场景选择指南 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 滚动窗口适用: │
│ ├─ 每日销售额统计 │
│ ├─ 每小时网站PV │
│ └─ 整点报数 │
│ │
│ 滑动窗口适用: │
│ ├─ 最近5分钟的平均响应时间 │
│ ├─ 移动平均线(MA5、MA10、MA20) │
│ ├─ 实时趋势监控 │
│ └─ 异常检测(与历史对比) │
│ │
└─────────────────────────────────────────────────────────────────┘4. 会话窗口(Session Window)
4.1 原理图解
┌─────────────────────────────────────────────────────────────────┐
│ 会话窗口原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户行为序列: │
│ user1: [view@10:00][click@10:01][buy@10:02]----[view@10:30] │
│ │
│ 会话间隔(gap): 10分钟 │
│ │
│ 会话1: [10:00 - 10:12) │
│ view → click → buy (间隔<10分钟) │
│ │
│ ────────── 超过10分钟间隔 ────────── │
│ │
│ 会话2: [10:30 - 10:40) │
│ view (单独会话) │
│ │
│ 特点: │
│ ✓ 动态大小 │
│ ✓ 根据用户行为自动生成 │
│ ✓ 适合用户行为分析 │
│ │
└─────────────────────────────────────────────────────────────────┘4.2 PySpark实现
# 会话窗口:间隔5分钟
result = df.groupBy(
"user_id",
F.session_window("ts", "5 minutes") # 会话间隔5分钟
).agg(
F.count("*").alias("action_cnt"),
F.min("timestamp").alias("session_start"),
F.max("timestamp").alias("session_end")
)
result.show(truncate=False)4.3 Flink SQL实现
-- 会话窗口
SELECT
SESSION_START(event_time, INTERVAL '10' MINUTE) as session_start,
SESSION_END(event_time, INTERVAL '10' MINUTE) as session_end,
user_id,
COUNT(*) as action_cnt
FROM source_table
GROUP BY SESSION(event_time, INTERVAL '10' MINUTE), user_id5. 迟到数据处理与Watermark
5.1 乱序问题
┌─────────────────────────────────────────────────────────────────┐
│ 乱序数据示例 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 理想顺序: t1@10:00 → t2@10:01 → t3@10:02 → t4@10:03 │
│ │
│ 实际顺序: t1@10:00 → t3@10:02 → t2@10:01 → t4@10:03 │
│ ↑ │
│ 乱序到达 │
│ │
│ 到达时间: t1@10:00:05 → t3@10:00:06 → t2@10:00:07 │
│ │
│ 问题: │
│ - t2应该在窗口[10:00-10:02)中计算 │
│ - 但当t3到达时,窗口[10:00-10:02)可能已经触发输出 │
│ - 导致t2被丢弃或计算错误 │
│ │
└─────────────────────────────────────────────────────────────────┘5.2 Watermark(水印)机制
核心思想:等待一定时间后再触发窗口计算
┌─────────────────────────────────────────────────────────────────┐
│ Watermark 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 配置: Watermark = 事件时间 - 最大允许延迟 │
│ │
│ 例如: max_lateness = 5秒 │
│ Watermark = 当前事件时间 - 5秒 │
│ │
│ ───────────────────────────────────────────── │
│ │
│ 数据流: [10:00][10:01][10:02][10:03][10:04] │
│ ↓ │
│ Watermark: 9:55 9:56 9:57 9:58 9:59 │
│ ↓ │
│ 触发条件: Watermark >= 窗口结束时间 │
│ │
│ Window [10:00-10:02): │
│ - 当Watermark >= 10:02时触发 │
│ - 即数据时间到达10:02+5秒 = 10:02:05时触发 │
│ │
│ 迟到数据处理: │
│ - Watermark之前的数据:正常计算 │
│ - Watermark之后的数据:放入侧输出流或丢弃 │
│ │
└─────────────────────────────────────────────────────────────────┘5.3 Flink实现Watermark
// Java/Scala Flink
DataStream stream = env.addSource(new EventSource());
// 定义Watermark策略
DataStream watermarkedStream = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// 使用事件时间窗口
watermarkedStream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum("amount"); 5.4 PySpark Streaming实现
from pyspark.sql import functions as F
# 定义Watermark
watermark_df = df.withWatermark("timestamp", "5 minutes")
# 使用Watermark进行窗口计算
result = watermark_df \
.groupBy(
F.window("timestamp", "5 minutes"),
"category"
) \
.agg(F.count("*").alias("cnt"))
# 迟到数据会进入late data处理
result.isLate("timestamp") # 标记迟到数据6. 窗口实战:实时销售统计
6.1 需求
实时统计:
- 每分钟销售额(滚动窗口)
- 最近5分钟销售趋势(滑动窗口)
- 用户会话行为(会话窗口)
6.2 完整代码
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("WindowDemo") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()
# 模拟实时订单数据
orders = [
("2024-01-01 10:00:01", "user1", "Electronics", 1000),
("2024-01-01 10:00:30", "user1", "Clothing", 200),
("2024-01-01 10:01:15", "user2", "Electronics", 1500),
("2024-01-01 10:02:00", "user3", "Food", 50),
("2024-01-01 10:02:30", "user1", "Electronics", 800), # 同一用户
("2024-01-01 10:10:00", "user3", "Food", 60), # 超过5分钟间隔
]
df = spark.createDataFrame(orders, ["timestamp", "user_id", "category", "amount"])
df = df.withColumn("ts", F.to_timestamp("timestamp"))
print("=" * 60)
print("1. 滚动窗口:每分钟销售额")
print("=" * 60)
tumbling = df.groupBy(
F.window("ts", "1 minute", "1 minute")
).agg(
F.sum("amount").alias("total_amount"),
F.count("*").alias("order_cnt")
).select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"total_amount",
"order_cnt"
)
tumbling.orderBy("window_start").show(truncate=False)
print("=" * 60)
print("2. 滑动窗口:最近3分钟的平均订单金额")
print("=" * 60)
sliding = df.groupBy(
F.window("ts", "3 minutes", "1 minute")
).agg(
F.avg("amount").alias("avg_amount"),
F.count("*").alias("order_cnt")
).select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"avg_amount",
"order_cnt"
)
sliding.orderBy("window_start").show(truncate=False)
print("=" * 60)
print("3. 会话窗口:用户会话行为(间隔5分钟)")
print("=" * 60)
session = df.groupBy(
"user_id",
F.session_window("ts", "5 minutes")
).agg(
F.sum("amount").alias("session_total"),
F.collect_list("category").alias("categories"),
F.count("*").alias("action_cnt")
).select(
"user_id",
F.col("window.start").alias("session_start"),
F.col("window.end").alias("session_end"),
"session_total",
"categories"
)
session.show(truncate=False)6.3 输出结果
============================================================
1. 滚动窗口:每分钟销售额
============================================================
+---------------------+---------------------+------------+---------+
|window_start |window_end |total_amount|order_cnt|
+---------------------+---------------------+------------+---------+
|2024-01-01 10:00:00 |2024-01-01 10:01:00 |1200 |2 |
|2024-01-01 10:01:00 |2024-01-01 10:02:00 |1500 |1 |
|2024-01-01 10:02:00 |2024-01-01 10:03:00 |850 |2 |
|2024-01-01 10:10:00 |2024-01-01 10:11:00 |60 |1 |
+---------------------+---------------------+------------+---------+
============================================================
2. 滑动窗口:最近3分钟的平均订单金额
============================================================
+---------------------+---------------------+------------------+---------+
|window_start |window_end |avg_amount |order_cnt|
+---------------------+---------------------+------------------+---------+
|2024-01-01 10:00:00 |2024-01-01 10:03:00 |683.33 |3 |
|2024-01-01 10:01:00 |2024-01-01 10:04:00 |783.33 |3 |
|2024-01-01 10:02:00 |2024-01-01 10:05:00 |783.33 |3 |
|2024-01-01 10:03:00 |2024-01-01 10:06:00 |425.0 |2 |
+---------------------+---------------------+------------------+---------+
============================================================
3. 会话窗口:用户会话行为(间隔5分钟)
============================================================
+-----+---------------------+---------------------+--------------+----------+
|user_id|session_start |session_end |session_total |categories|
+-----+---------------------+---------------------+--------------+----------+
|user1 |2024-01-01 10:00:01 |2024-01-01 10:02:30 |2000 |[Electronics, Clothing, Electronics]|
|user2 |2024-01-01 10:01:15 |2024-01-01 10:02:15 |1500 |[Electronics]|
|user3 |2024-01-01 10:02:00 |2024-01-01 10:03:00 |50 |[Food] |
|user3 |2024-01-01 10:10:00 |2024-01-01 10:11:00 |60 |[Food] |
+-----+---------------------+---------------------+--------------+----------7. 窗口函数组合使用
7.1 窗口 + 窗口函数
# 滚动窗口内计算排名
result = df.groupBy(
F.window("ts", "5 minute"),
"category"
).agg(
F.sum("amount").alias("total")
).withColumn(
"rank",
F.row_number().over(
Window.orderBy(F.desc("total"))
)
)7.2 多窗口同时计算
# 同时计算1分钟和5分钟窗口
from pyspark.sql import DataFrame
df_1min = df.groupBy(
F.window("ts", "1 minute"),
"category"
).agg(F.sum("amount").alias("amount_1min"))
df_5min = df.groupBy(
F.window("ts", "5 minute"),
"category"
).agg(F.sum("amount").alias("amount_5min"))
# 关联
result = df_1min.join(df_5min, ["window", "category"])8. 常见问题与优化
8.1 窗口太大,内存溢出
# ❌ 问题:窗口太大
df.groupBy(
F.window("ts", "1 hour")
).agg(F.count("*"))
# ✅ 优化:先预聚合,减少窗口内数据量
# 或者增加分区数
df.repartition(1000).groupBy(
F.window("ts", "1 hour")
).agg(F.count("*"))8.2 窗口太多,计算慢
# ❌ 问题:小窗口导致太多分区
df.groupBy(
F.window("ts", "1 second") # 每秒一个窗口
).agg(F.count("*"))
# ✅ 优化:使用更大的窗口,或先过滤
df.filter(F.col("amount") > 100).groupBy(
F.window("ts", "1 minute")
).agg(F.count("*"))8.3 迟到数据处理策略
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 丢弃 | 直接丢弃迟到数据 | 实时性要求高,允许误差 |
| 侧输出流 | 收集迟到数据单独处理 | 需要100%准确性 |
| 二次计算 | 定时全量重跑 | 离线+实时结合 |
| 修正更新 | 发送修正数据 | 强一致性要求 |
# 侧输出流示例
watermarked_stream = df.assignTimestampsAndWatermarks(...)
# 主输出流
main_output = watermarked_stream.keyBy(...).window(...).sum(...)
# 迟到数据
late_output = watermarked_stream.getSideOutput(LateDataOutputTag())9. Flink时间窗口进阶
9.1 窗口起始时间对齐
-- 默认:从00:00开始每5分钟一个窗口
TUMBLE(event_time, INTERVAL '5' MINUTE)
-- 从1分钟开始
TUMBLE(event_time, INTERVAL '5' MINUTE, INTERVAL '1' MINUTE)
-- 结果: [00:01-00:06), [00:06-00:11), ...9.2 窗口延迟触发
// 窗口延迟触发:窗口结束后再等待1分钟
.window(TumblingEventTimeWindows.of(Time.minutes(5))
.withAllowedLateness(Time.minutes(1)))9.3 窗口增量计算
// 增量窗口函数
window.reduce(
(value1, value2) -> value1 + value2, // 增量聚合
(key, window, aggregateResult) -> { // 输出
return new Result(key, window.getStart(), aggregateResult);
}
);10. 完整示例:实时销售大屏
10.1 需求
实时销售大屏需要:
- 实时GMV(当日累计)
- 实时订单数(当日累计)
- 每分钟销售额(最近1小时)
- 热卖商品Top10(最近30分钟)
- 各品类销售占比(当天)
10.2 完整代码
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
# 模拟实时订单流
orders = [
("2024-01-01 09:30:00", "user1", "Electronics", 1000),
("2024-01-01 09:31:00", "user2", "Clothing", 300),
("2024-01-01 09:32:00", "user1", "Electronics", 1500),
("2024-01-01 09:33:00", "user3", "Food", 80),
("2024-01-01 09:34:00", "user2", "Clothing", 500),
("2024-01-01 09:35:00", "user1", "Electronics", 800),
("2024-01-01 10:00:00", "user4", "Electronics", 2000),
("2024-01-01 10:01:00", "user5", "Food", 150),
]
df = spark.createDataFrame(orders, ["timestamp", "user_id", "category", "amount"])
df = df.withColumn("ts", F.to_timestamp("timestamp"))
# 设置Watermark(允许延迟30秒)
df_with_wm = df.withWatermark("ts", "30 seconds")
# 1. 当日累计GMV和订单数
daily_summary = df.agg(
F.sum("amount").alias("total_gmv"),
F.count("*").alias("total_orders")
)
print("=" * 50)
print("当日累计GMV和订单数")
print("=" * 50)
daily_summary.show()
# 2. 每分钟销售额(最近1小时滚动窗口)
minute_sales = df.groupBy(
F.window("ts", "1 hour", "1 minute")
).agg(
F.sum("amount").alias("sales")
).select(
F.col("window.start").alias("time"),
"sales"
).orderBy("time")
print("=" * 50)
print("每分钟销售额(最近1小时)")
print("=" * 50)
minute_sales.show(truncate=False)
# 3. 热卖商品Top10(最近30分钟)
hot_products = df.filter(
F.col("ts") >= F.current_timestamp() - F.expr("INTERVAL 30 MINUTES")
).groupBy("category").agg(
F.sum("amount").alias("sales")
).orderBy(F.desc("sales")).limit(10)
print("=" * 50)
print("热卖品类Top10(最近30分钟)")
print("=" * 50)
hot_products.show()
# 4. 品类销售占比
category_ratio = df.groupBy("category").agg(
F.sum("amount").alias("sales")
).withColumn(
"ratio",
F.col("sales") / F.sum("sales").over() * 100
).orderBy(F.desc("sales"))
print("=" * 50)
print("品类销售占比")
print("=" * 50)
category_ratio.show()11. 练习题
练习1
计算每5分钟的用户活跃数(UV),使用滚动窗口。
练习2
实现一个"最近30分钟平均响应时间"的滑动窗口统计。
练习3
分析用户会话:统计每个用户的会话数、平均会话时长、会话内平均点击数。
练习4
设置Watermark为10秒,模拟迟到数据,观察输出结果。
順子の杂货铺



