顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x10-机器学习算法集成

DMIT VPS

机器学习算法集成

本篇是《大数据算法与UDF系列》的第10篇(最终篇),讲解Spark MLlib中的常用机器学习算法,包括K-Means聚类和线性回归,以及如何在实际业务中应用。


1. MLlib简介

1.1 什么是Spark MLlib?

┌─────────────────────────────────────────────────────────────────┐
│                      Spark MLlib 简介                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   MLlib是Spark的机器学习库,提供:                                │
│                                                                 │
│   📊 分类算法:                                                  │
│   ├─ Logistic Regression(逻辑回归)                            │
│   ├─ Decision Tree(决策树)                                   │
│   ├─ Random Forest(随机森林)                                  │
│   ├─ Gradient-Boosted Tree                                     │
│   ├─ Naive Bayes                                                │
│   └─ Multi-layer Perceptron                                    │
│                                                                 │
│   📈 回归算法:                                                  │
│   ├─ Linear Regression(线性回归)                              │
│   ├─ Generalized Linear Regression                             │
│   ├─ Decision Tree Regression                                  │
│   └─ Random Forest Regression                                  │
│                                                                 │
│   🎯 聚类算法:                                                  │
│   ├─ K-Means                                                   │
│   ├─ Bisecting K-Means                                         │
│   ├─ Gaussian Mixture                                          │
│   └─ LDA (主题模型)                                            │
│                                                                 │
│   🔧 工具:                                                      │
│   ├─ Feature Engineering                                       │
│   ├─ Model Evaluation                                          │
│   └─ Pipelines                                                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2. K-Means聚类

2.1 算法原理

┌─────────────────────────────────────────────────────────────────┐
│                      K-Means 原理                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   目标: 将数据分成K个簇,使簇内数据相似度最大化                   │
│                                                                 │
│   步骤:                                                        │
│                                                                 │
│   1. 随机选择K个中心点                                         │
│                                                                 │
│      ●  ●  ●  ← 初始中心                                        │
│                                                                 │
│   2. 计算每个点到中心的距离,分配到最近的簇                       │
│                                                                 │
│      ●─────●─────●                                             │
│      ↓     ↓     ↓                                              │
│      ▼     ▼     ▼                                              │
│    簇1   簇2   簇3                                               │
│                                                                 │
│   3. 重新计算每个簇的中心点                                      │
│                                                                 │
│      重新计算中心位置                                            │
│                                                                 │
│   4. 重复步骤2-3,直到中心不再变化                               │
│                                                                 │
│   ─────────────────────────────────────                        │
│                                                                 │
│   距离度量: 欧氏距离                                            │
│   d(x,y) = √(Σ(xi-yi)²)                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 Python实现

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("KMeansDemo") \
    .getOrCreate()

# =============================================
# 创建示例数据:用户消费行为
# =============================================
# 特征:月消费次数、月消费金额、平均客单价
data = [
    (1, 10, 500, 50),   # 用户1: 10次, 500元, 平均50元
    (2, 15, 800, 53),
    (3, 8, 400, 50),
    (4, 20, 2000, 100),  # 高价值用户
    (5, 25, 2500, 100),
    (6, 18, 1800, 100),
    (7, 3, 100, 33),    # 低价值用户
    (8, 5, 150, 30),
    (9, 4, 120, 30),
    (10, 50, 3000, 60), # 薅羊毛用户
    (11, 55, 3200, 58),
    (12, 45, 2800, 62),
]

df = spark.createDataFrame(data, ["user_id", "month_orders", "month_amount", "avg_price"])

# =============================================
# 特征工程:组装特征向量
# =============================================
assembler = VectorAssembler(
    inputCols=["month_orders", "month_amount", "avg_price"],
    outputCol="features"
)

# =============================================
# K-Means模型
# =============================================
kmeans = KMeans(k=3, seed=42, maxIter=10)

# =============================================
# Pipeline
# =============================================
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(df)

# =============================================
# 预测
# =============================================
result = model.transform(df)

print("=" * 60)
print("K-Means聚类结果")
print("=" * 60)

result.select("user_id", "month_orders", "month_amount", "avg_price", "prediction") \
    .orderBy("prediction", "user_id").show()

# =============================================
# 查看聚类中心
# =============================================
centers = model.stages[-1].clusterCenters()
print("\n聚类中心:")
for i, center in enumerate(centers):
    print(f"  簇{i}: 订单数={center[0]:.1f}, 金额={center[1]:.1f}, 平均价={center[2]:.1f}")

spark.stop()

2.3 Scala实现

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vector

object KMeansDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("KMeansDemo")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 创建数据
    val data = Seq(
      (1, 10, 500, 50),
      (2, 15, 800, 53),
      (3, 8, 400, 50),
      (4, 20, 2000, 100),
      (5, 25, 2500, 100),
      (6, 18, 1800, 100),
      (7, 3, 100, 33),
      (8, 5, 150, 30),
      (9, 4, 120, 30),
      (10, 50, 3000, 60),
      (11, 55, 3200, 58),
      (12, 45, 2800, 62)
    ).toDF("user_id", "month_orders", "month_amount", "avg_price")

    // 特征工程
    val assembler = new VectorAssembler()
      .setInputCols(Array("month_orders", "month_amount", "avg_price"))
      .setOutputCol("features")

    // K-Means
    val kmeans = new KMeans()
      .setK(3)
      .setSeed(42)
      .setMaxIter(10)

    // Pipeline
    val pipeline = new Pipeline()
      .setStages(Array(assembler, kmeans))

    val model = pipeline.fit(data)

    // 预测
    val result = model.transform(data)

    result.select("user_id", "prediction").show(false)

    // 聚类中心
    val centers = model.stages(1).asInstanceOf[KMeansModel].clusterCenters
    println("\n聚类中心:")
    centers.zipWithIndex.foreach { case (c, i) =>
      println(s"簇$i: $c")
    }

    spark.stop()
  }
}

3. 线性回归

3.1 算法原理

┌─────────────────────────────────────────────────────────────────┐
│                      线性回归 原理                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   目标: 找到一条直线,使所有点到直线的距离平方和最小               │
│                                                                 │
│   公式: y = wx + b                                            │
│                                                                 │
│   其中:                                                        │
│   - w: 权重(斜率)                                            │
│   - b: 偏置(截距)                                            │
│                                                                 │
│   ─────────────────────────────────────                        │
│                                                                 │
│   示例: 预测房价                                                │
│                                                                 │
│      房价                                                        │
│      ↑                                                          │
│   500│                                                    ●  ●│
│      │                                              ●    ●    │
│   400│                                        ●  ●             │
│      │                                    ●  ●                 │
│   300│                              ●  ●                       │
│      │                        ●  ●                             │
│   200│                  ●  ●                                   │
│      │            ●  ●                                         │
│   100│      ●  ●                                               │
│      └─────┼──────────────→ 面积(平方米)                       │
│           50    100   150   200                                │
│                                                                 │
│      回归线: 房价 = 2.5 × 面积 + 50                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 Python实现

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LinearRegressionDemo") \
    .getOrCreate()

# =============================================
# 创建示例数据:房价预测
# =============================================
# 特征:面积、房间数、地段评分
# 目标:预测房价
data = [
    (100, 2, 8, 150),
    (120, 2, 7, 180),
    (150, 3, 8, 250),
    (180, 3, 9, 300),
    (200, 4, 8, 350),
    (80, 1, 6, 100),
    (90, 2, 7, 130),
    (110, 2, 7, 160),
    (130, 2, 8, 190),
    (160, 3, 8, 270),
    (190, 4, 9, 380),
    (210, 4, 9, 420),
    (220, 4, 8, 400),
    (250, 5, 9, 500),
    (60, 1, 5, 70),
]

df = spark.createDataFrame(data, ["area", "rooms", "location_score", "price"])

# =============================================
# 特征工程
# =============================================
assembler = VectorAssembler(
    inputCols=["area", "rooms", "location_score"],
    outputCol="features"
)

# =============================================
# 线性回归模型
# =============================================
lr = LinearRegression(
    featuresCol="features",
    labelCol="price",
    maxIter=100,
    regParam=0.3,  # 正则化参数
    elasticNetParam=0.5  # L1/L2混合
)

# =============================================
# Pipeline
# =============================================
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(df)

# =============================================
# 模型评估
# =============================================
print("=" * 60)
print("模型参数")
print("=" * 60)

lr_model = model.stages[-1]
print(f"权重 (系数): {lr_model.coefficients}")
print(f"截距: {lr_model.intercept}")
print(f"R²: {lr_model.summary.r2:.4f}")
print(f"RMSE: {lr_model.summary.rootMeanSquaredError:.2f}")

# =============================================
# 预测
# =============================================
print("\n" + "=" * 60)
print("预测结果")
print("=" * 60)

predictions = model.transform(df)
predictions.select("area", "rooms", "location_score", "price", "prediction").show()

# =============================================
# 预测新数据
# =============================================
print("\n" + "=" * 60)
print("预测新数据")
print("=" * 60)

new_data = [
    (140, 3, 8),
    (170, 3, 9),
    (70, 1, 6),
]
new_df = spark.createDataFrame(new_data, ["area", "rooms", "location_score"])
new_predictions = model.transform(new_df)
new_predictions.select("area", "rooms", "location_score", "prediction").show()

spark.stop()

4. 实战:用户分群

4.1 业务场景

根据用户的消费行为,将用户分成不同群体:

  • 高价值用户
  • 潜力用户
  • 普通用户
  • 流失风险用户

4.2 完整示例

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("UserSegmentation") \
    .getOrCreate()

# =============================================
# 1. 准备用户数据
# =============================================
# 特征说明:
# - total_orders: 总订单数
# - total_amount: 总消费金额
# - avg_amount: 平均订单金额
# - days_since_last: 最近一次购买距今天数
# - return_rate: 退货率

users = [
    # 高价值用户
    (1, 100, 50000, 500, 7, 0.02),
    (2, 80, 40000, 500, 10, 0.03),
    (3, 120, 60000, 500, 5, 0.01),
    # 潜力用户
    (4, 20, 5000, 250, 15, 0.05),
    (5, 25, 6000, 240, 20, 0.04),
    (6, 18, 4500, 250, 18, 0.03),
    # 普通用户
    (7, 5, 500, 100, 60, 0.1),
    (8, 8, 800, 100, 45, 0.08),
    (9, 6, 600, 100, 50, 0.12),
    # 流失风险用户
    (10, 3, 200, 67, 180, 0.15),
    (11, 2, 150, 75, 200, 0.2),
    (12, 1, 80, 80, 250, 0.25),
]

df = spark.createDataFrame(users, [
    "user_id", "total_orders", "total_amount", "avg_amount", "days_since_last", "return_rate"
])

# =============================================
# 2. 特征工程
# =============================================
# 标准化特征
features = ["total_orders", "total_amount", "avg_amount", "days_since_last", "return_rate"]

# 向量化
assembler = VectorAssembler(inputCols=features, outputCol="raw_features")

# 标准化
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

# =============================================
# 3. K-Means聚类
# =============================================
kmeans = KMeans(k=4, seed=42, maxIter=20)

# Pipeline
pipeline = Pipeline(stages=[assembler, scaler, kmeans])
model = pipeline.fit(df)

# =============================================
# 4. 分析聚类结果
# =============================================
result = model.transform(df)

print("=" * 60)
print("用户分群结果")
print("=" * 60)

# 聚类中心
centers = model.stages[-1].clusterCenters()
print("\n聚类中心(标准化后):")
for i, c in enumerate(centers):
    print(f"  簇{i}: {c}")

# 各簇统计
cluster_stats = result.groupBy("prediction").agg(
    F.count("user_id").alias("user_count"),
    F.avg("total_orders").alias("avg_orders"),
    F.avg("total_amount").alias("avg_amount"),
    F.avg("days_since_last").alias("avg_days"),
    F.avg("return_rate").alias("avg_return_rate")
)

print("\n各簇用户特征:")
cluster_stats.show()

# =============================================
# 5. 用户分群标签
# =============================================
# 根据特征判断簇类型
def get_segment_label(cluster_id, df):
    """根据聚类中心判断用户类型"""
    # 简单规则:根据平均消费金额判断
    avg_amount = df.filter(F.col("prediction") == cluster_id) \
        .agg(F.avg("total_amount")).first()[0]
    return avg_amount

# 添加用户标签
result_with_label = result.withColumn(
    "segment",
    F.when(F.col("prediction") == 0, "高价值用户")
    .when(F.col("prediction") == 1, "潜力用户")
    .when(F.col("prediction") == 2, "普通用户")
    .otherwise("流失风险用户")
)

print("\n用户分群明细:")
result_with_label.select(
    "user_id", "total_orders", "total_amount", "days_since_last", "prediction", "segment"
).orderBy("prediction", "total_amount", ascending=False).show()

spark.stop()

5. 模型评估与选择

5.1 常用指标

┌─────────────────────────────────────────────────────────────────┐
│                      模型评估指标                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   回归指标:                                                     │
│   ├─ R² (决定系数): 1=完美, 0=基线                              │
│   ├─ RMSE (均方根误差): 越小越好                                │
│   ├─ MAE (平均绝对误差): 对异常值更鲁棒                          │
│   └─ MSE (均方误差): 敏感于异常值                               │
│                                                                 │
│   聚类指标:                                                     │
│   ├─ 轮廓系数: -1~1, 越大越好                                  │
│   ├─ 簇内平方和 (SSE): 越小越好                                 │
│   └─ 簇间平方和: 越大越好                                       │
│                                                                 │
│   分类指标:                                                     │
│   ├─ 准确率 (Accuracy)                                        │
│   ├─ 精确率 (Precision)                                        │
│   ├─ 召回率 (Recall)                                           │
│   └─ F1 Score                                                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 评估示例

from pyspark.ml.evaluation import RegressionEvaluator, ClusteringEvaluator

# 回归评估
evaluator = RegressionEvaluator(
    labelCol="price",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

# 聚类评估
evaluator = ClusteringEvaluator(
    predictionCol="prediction",
    featuresCol="features",
    metricName="silhouette"  # 轮廓系数
)
silhouette = evaluator.evaluate(result)

print(f"RMSE: {rmse:.2f}")
print(f"轮廓系数: {silhouette:.4f}")

6. 机器学习最佳实践

6.1 特征工程

# 1. 缺失值处理
df.fillna({"age": 0})
df.na.drop()

# 2. 特征缩放
scaler = StandardScaler(withMean=True, withStd=True)

# 3. 特征选择
# 使用ChiSqSelector等

# 4. 特征转换
# 独热编码、标准化、归一化

6.2 模型选择

┌─────────────────────────────────────────────────────────────────┐
│                      模型选择指南                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   回归问题:                                                     │
│   ├─ 数据量小,特征少 → 线性回归                                 │
│   ├─ 需要可解释性 → 线性回归/决策树                              │
│   ├─ 数据复杂 → 随机森林/GBDT                                   │
│   └─ 大规模数据 → Spark MLlib                                   │
│                                                                 │
│   聚类问题:                                                     │
│   ├─ 簇形状规则 → K-Means                                       │
│   ├─ 簇形状不规则 → GMM                                         │
│   └─ 需要层次结构 → 层次聚类                                     │
│                                                                 │
│   分类问题:                                                     │
│   ├─ 二分类 → 逻辑回归                                          │
│   ├─ 多分类 → 随机森林                                           │
│   └─ 大规模数据 → Spark MLlib                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

7. 练习题

练习1

使用K-Means对电商用户进行RFM分群。

练习2

构建一个线性回归模型预测用户生命周期价值。

练习3

比较不同K值对K-Means效果的影响。


8. 总结

本系列回顾

┌─────────────────────────────────────────────────────────────────┐
│                    系列博客总结                                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   📊 基础技能:                                                  │
│   ├─ 0x01 窗口函数与累计计算                                   │
│   ├─ 0x02 Join算法详解与优化                                    │
│   ├─ 0x03 聚合与TopN问题                                       │
│   └─ 0x04 时间窗口实战                                         │
│                                                                 │
│   🚀 高级算法:                                                  │
│   ├─ 0x05 近似算法HyperLogLog                                 │
│   ├─ 0x06 布隆过滤器与去重                                      │
│   └─ 0x07 字符串模糊匹配                                        │
│                                                                 │
│   🔒 数据安全:                                                  │
│   └─ 0x08 数据加密与编码                                       │
│                                                                 │
│   📈 图与智能:                                                  │
│   ├─ 0x09 图算法PageRank                                       │
│   └─ 0x10 机器学习算法集成                                      │
│                                                                 │
│   感谢学习!                                                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

恭喜完成本系列所有内容!

赞(0)
未经允许不得转载:順子の杂货铺 » 0x10-机器学习算法集成
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们