机器学习算法集成
本篇是《大数据算法与UDF系列》的第10篇(最终篇),讲解Spark MLlib中的常用机器学习算法,包括K-Means聚类和线性回归,以及如何在实际业务中应用。
1. MLlib简介
1.1 什么是Spark MLlib?
┌─────────────────────────────────────────────────────────────────┐
│ Spark MLlib 简介 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ MLlib是Spark的机器学习库,提供: │
│ │
│ 📊 分类算法: │
│ ├─ Logistic Regression(逻辑回归) │
│ ├─ Decision Tree(决策树) │
│ ├─ Random Forest(随机森林) │
│ ├─ Gradient-Boosted Tree │
│ ├─ Naive Bayes │
│ └─ Multi-layer Perceptron │
│ │
│ 📈 回归算法: │
│ ├─ Linear Regression(线性回归) │
│ ├─ Generalized Linear Regression │
│ ├─ Decision Tree Regression │
│ └─ Random Forest Regression │
│ │
│ 🎯 聚类算法: │
│ ├─ K-Means │
│ ├─ Bisecting K-Means │
│ ├─ Gaussian Mixture │
│ └─ LDA (主题模型) │
│ │
│ 🔧 工具: │
│ ├─ Feature Engineering │
│ ├─ Model Evaluation │
│ └─ Pipelines │
│ │
└─────────────────────────────────────────────────────────────────┘2. K-Means聚类
2.1 算法原理
┌─────────────────────────────────────────────────────────────────┐
│ K-Means 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 目标: 将数据分成K个簇,使簇内数据相似度最大化 │
│ │
│ 步骤: │
│ │
│ 1. 随机选择K个中心点 │
│ │
│ ● ● ● ← 初始中心 │
│ │
│ 2. 计算每个点到中心的距离,分配到最近的簇 │
│ │
│ ●─────●─────● │
│ ↓ ↓ ↓ │
│ ▼ ▼ ▼ │
│ 簇1 簇2 簇3 │
│ │
│ 3. 重新计算每个簇的中心点 │
│ │
│ 重新计算中心位置 │
│ │
│ 4. 重复步骤2-3,直到中心不再变化 │
│ │
│ ───────────────────────────────────── │
│ │
│ 距离度量: 欧氏距离 │
│ d(x,y) = √(Σ(xi-yi)²) │
│ │
└─────────────────────────────────────────────────────────────────┘2.2 Python实现
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("KMeansDemo") \
.getOrCreate()
# =============================================
# 创建示例数据:用户消费行为
# =============================================
# 特征:月消费次数、月消费金额、平均客单价
data = [
(1, 10, 500, 50), # 用户1: 10次, 500元, 平均50元
(2, 15, 800, 53),
(3, 8, 400, 50),
(4, 20, 2000, 100), # 高价值用户
(5, 25, 2500, 100),
(6, 18, 1800, 100),
(7, 3, 100, 33), # 低价值用户
(8, 5, 150, 30),
(9, 4, 120, 30),
(10, 50, 3000, 60), # 薅羊毛用户
(11, 55, 3200, 58),
(12, 45, 2800, 62),
]
df = spark.createDataFrame(data, ["user_id", "month_orders", "month_amount", "avg_price"])
# =============================================
# 特征工程:组装特征向量
# =============================================
assembler = VectorAssembler(
inputCols=["month_orders", "month_amount", "avg_price"],
outputCol="features"
)
# =============================================
# K-Means模型
# =============================================
kmeans = KMeans(k=3, seed=42, maxIter=10)
# =============================================
# Pipeline
# =============================================
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(df)
# =============================================
# 预测
# =============================================
result = model.transform(df)
print("=" * 60)
print("K-Means聚类结果")
print("=" * 60)
result.select("user_id", "month_orders", "month_amount", "avg_price", "prediction") \
.orderBy("prediction", "user_id").show()
# =============================================
# 查看聚类中心
# =============================================
centers = model.stages[-1].clusterCenters()
print("\n聚类中心:")
for i, center in enumerate(centers):
print(f" 簇{i}: 订单数={center[0]:.1f}, 金额={center[1]:.1f}, 平均价={center[2]:.1f}")
spark.stop()2.3 Scala实现
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vector
object KMeansDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("KMeansDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 创建数据
val data = Seq(
(1, 10, 500, 50),
(2, 15, 800, 53),
(3, 8, 400, 50),
(4, 20, 2000, 100),
(5, 25, 2500, 100),
(6, 18, 1800, 100),
(7, 3, 100, 33),
(8, 5, 150, 30),
(9, 4, 120, 30),
(10, 50, 3000, 60),
(11, 55, 3200, 58),
(12, 45, 2800, 62)
).toDF("user_id", "month_orders", "month_amount", "avg_price")
// 特征工程
val assembler = new VectorAssembler()
.setInputCols(Array("month_orders", "month_amount", "avg_price"))
.setOutputCol("features")
// K-Means
val kmeans = new KMeans()
.setK(3)
.setSeed(42)
.setMaxIter(10)
// Pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, kmeans))
val model = pipeline.fit(data)
// 预测
val result = model.transform(data)
result.select("user_id", "prediction").show(false)
// 聚类中心
val centers = model.stages(1).asInstanceOf[KMeansModel].clusterCenters
println("\n聚类中心:")
centers.zipWithIndex.foreach { case (c, i) =>
println(s"簇$i: $c")
}
spark.stop()
}
}3. 线性回归
3.1 算法原理
┌─────────────────────────────────────────────────────────────────┐
│ 线性回归 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 目标: 找到一条直线,使所有点到直线的距离平方和最小 │
│ │
│ 公式: y = wx + b │
│ │
│ 其中: │
│ - w: 权重(斜率) │
│ - b: 偏置(截距) │
│ │
│ ───────────────────────────────────── │
│ │
│ 示例: 预测房价 │
│ │
│ 房价 │
│ ↑ │
│ 500│ ● ●│
│ │ ● ● │
│ 400│ ● ● │
│ │ ● ● │
│ 300│ ● ● │
│ │ ● ● │
│ 200│ ● ● │
│ │ ● ● │
│ 100│ ● ● │
│ └─────┼──────────────→ 面积(平方米) │
│ 50 100 150 200 │
│ │
│ 回归线: 房价 = 2.5 × 面积 + 50 │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 Python实现
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("LinearRegressionDemo") \
.getOrCreate()
# =============================================
# 创建示例数据:房价预测
# =============================================
# 特征:面积、房间数、地段评分
# 目标:预测房价
data = [
(100, 2, 8, 150),
(120, 2, 7, 180),
(150, 3, 8, 250),
(180, 3, 9, 300),
(200, 4, 8, 350),
(80, 1, 6, 100),
(90, 2, 7, 130),
(110, 2, 7, 160),
(130, 2, 8, 190),
(160, 3, 8, 270),
(190, 4, 9, 380),
(210, 4, 9, 420),
(220, 4, 8, 400),
(250, 5, 9, 500),
(60, 1, 5, 70),
]
df = spark.createDataFrame(data, ["area", "rooms", "location_score", "price"])
# =============================================
# 特征工程
# =============================================
assembler = VectorAssembler(
inputCols=["area", "rooms", "location_score"],
outputCol="features"
)
# =============================================
# 线性回归模型
# =============================================
lr = LinearRegression(
featuresCol="features",
labelCol="price",
maxIter=100,
regParam=0.3, # 正则化参数
elasticNetParam=0.5 # L1/L2混合
)
# =============================================
# Pipeline
# =============================================
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(df)
# =============================================
# 模型评估
# =============================================
print("=" * 60)
print("模型参数")
print("=" * 60)
lr_model = model.stages[-1]
print(f"权重 (系数): {lr_model.coefficients}")
print(f"截距: {lr_model.intercept}")
print(f"R²: {lr_model.summary.r2:.4f}")
print(f"RMSE: {lr_model.summary.rootMeanSquaredError:.2f}")
# =============================================
# 预测
# =============================================
print("\n" + "=" * 60)
print("预测结果")
print("=" * 60)
predictions = model.transform(df)
predictions.select("area", "rooms", "location_score", "price", "prediction").show()
# =============================================
# 预测新数据
# =============================================
print("\n" + "=" * 60)
print("预测新数据")
print("=" * 60)
new_data = [
(140, 3, 8),
(170, 3, 9),
(70, 1, 6),
]
new_df = spark.createDataFrame(new_data, ["area", "rooms", "location_score"])
new_predictions = model.transform(new_df)
new_predictions.select("area", "rooms", "location_score", "prediction").show()
spark.stop()4. 实战:用户分群
4.1 业务场景
根据用户的消费行为,将用户分成不同群体:
- 高价值用户
- 潜力用户
- 普通用户
- 流失风险用户
4.2 完整示例
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("UserSegmentation") \
.getOrCreate()
# =============================================
# 1. 准备用户数据
# =============================================
# 特征说明:
# - total_orders: 总订单数
# - total_amount: 总消费金额
# - avg_amount: 平均订单金额
# - days_since_last: 最近一次购买距今天数
# - return_rate: 退货率
users = [
# 高价值用户
(1, 100, 50000, 500, 7, 0.02),
(2, 80, 40000, 500, 10, 0.03),
(3, 120, 60000, 500, 5, 0.01),
# 潜力用户
(4, 20, 5000, 250, 15, 0.05),
(5, 25, 6000, 240, 20, 0.04),
(6, 18, 4500, 250, 18, 0.03),
# 普通用户
(7, 5, 500, 100, 60, 0.1),
(8, 8, 800, 100, 45, 0.08),
(9, 6, 600, 100, 50, 0.12),
# 流失风险用户
(10, 3, 200, 67, 180, 0.15),
(11, 2, 150, 75, 200, 0.2),
(12, 1, 80, 80, 250, 0.25),
]
df = spark.createDataFrame(users, [
"user_id", "total_orders", "total_amount", "avg_amount", "days_since_last", "return_rate"
])
# =============================================
# 2. 特征工程
# =============================================
# 标准化特征
features = ["total_orders", "total_amount", "avg_amount", "days_since_last", "return_rate"]
# 向量化
assembler = VectorAssembler(inputCols=features, outputCol="raw_features")
# 标准化
scaler = StandardScaler(
inputCol="raw_features",
outputCol="features",
withStd=True,
withMean=True
)
# =============================================
# 3. K-Means聚类
# =============================================
kmeans = KMeans(k=4, seed=42, maxIter=20)
# Pipeline
pipeline = Pipeline(stages=[assembler, scaler, kmeans])
model = pipeline.fit(df)
# =============================================
# 4. 分析聚类结果
# =============================================
result = model.transform(df)
print("=" * 60)
print("用户分群结果")
print("=" * 60)
# 聚类中心
centers = model.stages[-1].clusterCenters()
print("\n聚类中心(标准化后):")
for i, c in enumerate(centers):
print(f" 簇{i}: {c}")
# 各簇统计
cluster_stats = result.groupBy("prediction").agg(
F.count("user_id").alias("user_count"),
F.avg("total_orders").alias("avg_orders"),
F.avg("total_amount").alias("avg_amount"),
F.avg("days_since_last").alias("avg_days"),
F.avg("return_rate").alias("avg_return_rate")
)
print("\n各簇用户特征:")
cluster_stats.show()
# =============================================
# 5. 用户分群标签
# =============================================
# 根据特征判断簇类型
def get_segment_label(cluster_id, df):
"""根据聚类中心判断用户类型"""
# 简单规则:根据平均消费金额判断
avg_amount = df.filter(F.col("prediction") == cluster_id) \
.agg(F.avg("total_amount")).first()[0]
return avg_amount
# 添加用户标签
result_with_label = result.withColumn(
"segment",
F.when(F.col("prediction") == 0, "高价值用户")
.when(F.col("prediction") == 1, "潜力用户")
.when(F.col("prediction") == 2, "普通用户")
.otherwise("流失风险用户")
)
print("\n用户分群明细:")
result_with_label.select(
"user_id", "total_orders", "total_amount", "days_since_last", "prediction", "segment"
).orderBy("prediction", "total_amount", ascending=False).show()
spark.stop()5. 模型评估与选择
5.1 常用指标
┌─────────────────────────────────────────────────────────────────┐
│ 模型评估指标 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 回归指标: │
│ ├─ R² (决定系数): 1=完美, 0=基线 │
│ ├─ RMSE (均方根误差): 越小越好 │
│ ├─ MAE (平均绝对误差): 对异常值更鲁棒 │
│ └─ MSE (均方误差): 敏感于异常值 │
│ │
│ 聚类指标: │
│ ├─ 轮廓系数: -1~1, 越大越好 │
│ ├─ 簇内平方和 (SSE): 越小越好 │
│ └─ 簇间平方和: 越大越好 │
│ │
│ 分类指标: │
│ ├─ 准确率 (Accuracy) │
│ ├─ 精确率 (Precision) │
│ ├─ 召回率 (Recall) │
│ └─ F1 Score │
│ │
└─────────────────────────────────────────────────────────────────┘5.2 评估示例
from pyspark.ml.evaluation import RegressionEvaluator, ClusteringEvaluator
# 回归评估
evaluator = RegressionEvaluator(
labelCol="price",
predictionCol="prediction",
metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
# 聚类评估
evaluator = ClusteringEvaluator(
predictionCol="prediction",
featuresCol="features",
metricName="silhouette" # 轮廓系数
)
silhouette = evaluator.evaluate(result)
print(f"RMSE: {rmse:.2f}")
print(f"轮廓系数: {silhouette:.4f}")6. 机器学习最佳实践
6.1 特征工程
# 1. 缺失值处理
df.fillna({"age": 0})
df.na.drop()
# 2. 特征缩放
scaler = StandardScaler(withMean=True, withStd=True)
# 3. 特征选择
# 使用ChiSqSelector等
# 4. 特征转换
# 独热编码、标准化、归一化6.2 模型选择
┌─────────────────────────────────────────────────────────────────┐
│ 模型选择指南 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 回归问题: │
│ ├─ 数据量小,特征少 → 线性回归 │
│ ├─ 需要可解释性 → 线性回归/决策树 │
│ ├─ 数据复杂 → 随机森林/GBDT │
│ └─ 大规模数据 → Spark MLlib │
│ │
│ 聚类问题: │
│ ├─ 簇形状规则 → K-Means │
│ ├─ 簇形状不规则 → GMM │
│ └─ 需要层次结构 → 层次聚类 │
│ │
│ 分类问题: │
│ ├─ 二分类 → 逻辑回归 │
│ ├─ 多分类 → 随机森林 │
│ └─ 大规模数据 → Spark MLlib │
│ │
└─────────────────────────────────────────────────────────────────┘7. 练习题
练习1
使用K-Means对电商用户进行RFM分群。
练习2
构建一个线性回归模型预测用户生命周期价值。
练习3
比较不同K值对K-Means效果的影响。
8. 总结
本系列回顾
┌─────────────────────────────────────────────────────────────────┐
│ 系列博客总结 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 📊 基础技能: │
│ ├─ 0x01 窗口函数与累计计算 │
│ ├─ 0x02 Join算法详解与优化 │
│ ├─ 0x03 聚合与TopN问题 │
│ └─ 0x04 时间窗口实战 │
│ │
│ 🚀 高级算法: │
│ ├─ 0x05 近似算法HyperLogLog │
│ ├─ 0x06 布隆过滤器与去重 │
│ └─ 0x07 字符串模糊匹配 │
│ │
│ 🔒 数据安全: │
│ └─ 0x08 数据加密与编码 │
│ │
│ 📈 图与智能: │
│ ├─ 0x09 图算法PageRank │
│ └─ 0x10 机器学习算法集成 │
│ │
│ 感谢学习! │
│ │
└─────────────────────────────────────────────────────────────────┘恭喜完成本系列所有内容!
順子の杂货铺






