图算法PageRank
本篇是《大数据算法与UDF系列》的第9篇,深入讲解PageRank算法的原理,以及如何使用Spark GraphX进行大规模图计算。
1. PageRank简介
1.1 背景故事
PageRank由Google创始人Larry Page和Sergey Brin于1998年提出,最初用于Google搜索引擎的网页排名。
┌─────────────────────────────────────────────────────────────────┐
│ PageRank 核心思想 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 一个网页的重要性,取决于: │
│ ├─ 有多少其他网页链接到它 │
│ ├─ 链接到它的网页本身有多重要 │
│ │
│ ───────────────────────────────────── │
│ │
│ 直观理解: │
│ │
│ A ───▶ B ───▶ C │
│ │ │ │
│ │ ▼ │
│ └──▶ D ◀── E │
│ │
│ PageRank: │
│ - A有D和E两个出链 │
│ - B被A和C链接 │
│ - C被B链接 │
│ - D被A链接 │
│ - E没有出链(可能是新网页) │
│ │
│ 重要性: C > B > D > A > E │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 业务场景
| 场景 | 应用 |
|---|---|
| 搜索引擎 | 网页排名 |
| 社交网络 | 影响力用户识别 |
| 推荐系统 | 关键节点发现 |
| 金融风控 | 可疑交易检测 |
| 生物网络 | 关键蛋白质发现 |
2. PageRank原理
2.1 核心公式
PR(A) = (1-d)/N + d × Σ(PR(B) / Out(B))
其中:
- PR(A): 页面A的PageRank值
- d: 阻尼因子(通常0.85)
- N: 总页面数
- Σ: 对所有链接到A的页面求和
- Out(B): 页面B的出链数量2.2 算法图解
┌─────────────────────────────────────────────────────────────────┐
│ PageRank 计算过程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Step 1: 初始化 │
│ ───────────────────────────────────── │
│ │
│ A ───▶ B ───▶ C │
│ │ │
│ ▼ │
│ D │
│ │
│ 初始PR值: PR(A)=0.25, PR(B)=0.25, PR(C)=0.25, PR(D)=0.25 │
│ │
│ Step 2: 迭代计算 │
│ ───────────────────────────────────── │
│ │
│ 第1次迭代: │
│ - PR(A) = 0.15 + 0.85 × 0 = 0.15 (无人链接) │
│ - PR(B) = 0.15 + 0.85 × 0.25/1 = 0.3625 (A→B, 出链=1) │
│ - PR(C) = 0.15 + 0.85 × 0.25/1 = 0.3625 (B→C) │
│ - PR(D) = 0.15 + 0.85 × 0.25/1 = 0.3625 (A→D) │
│ │
│ 第2次迭代: │
│ - PR(B) = 0.15 + 0.85 × 0.15/1 = 0.2775 │
│ - PR(C) = 0.15 + 0.85 × 0.3625/1 = 0.4581 │
│ - PR(D) = 0.15 + 0.85 × 0.15/1 = 0.2775 │
│ - PR(A) = 0.15 + 0.85 × 0 = 0.15 │
│ │
│ Step 3: 收敛 │
│ ───────────────────────────────────── │
│ │
│ 迭代10-20次后,PR值趋于稳定 │
│ │
└─────────────────────────────────────────────────────────────────┘2.3 阻尼因子
┌─────────────────────────────────────────────────────────────────┐
│ 阻尼因子 d │
├─────────────────────────────────────────────────────────────────┤
│ │
│ d = 0.85 (常用值) │
│ │
│ 含义: │
│ - 用户有85%概率点击页面上的链接 │
│ - 用户有15%概率直接跳转到任意页面(随机浏览) │
│ │
│ 作用: │
│ - 防止排名只出不进(没有入链的页面) │
│ - 保证算法收敛 │
│ │
│ 调整: │
│ - d越大: 越依赖链接关系 │
│ - d越小: 越接近均匀分布 │
│ │
└─────────────────────────────────────────────────────────────────┘3. Spark GraphX实现
3.1 GraphX核心概念
┌─────────────────────────────────────────────────────────────────┐
│ GraphX 核心概念 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Vertex (顶点): │
│ - 代表实体(用户、网页) │
│ - VertexRDD[VD] │
│ │
│ Edge (边): │
│ - 代表关系(关注、链接) │
│ - EdgeRDD[ED] │
│ │
│ 图结构: │
│ │
│ [用户A] ──关注──▶ [用户B] │
│ [用户A] ──关注──▶ [用户C] │
│ [用户B] ──关注──▶ [用户C] │
│ [用户C] ──关注──▶ [用户A] │
│ │
│ VertexRDD: │
│ (A, 用户信息) (B, 用户信息) (C, 用户信息) │
│ │
│ EdgeRDD: │
│ (A→B, 关注) (A→C, 关注) (B→C, 关注) (C→A, 关注) │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 Python (GraphFrames) 实现
from pyspark.sql import SparkSession
from graphframes import GraphFrame
spark = SparkSession.builder \
.appName("PageRankDemo") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
# =============================================
# 创建顶点数据(用户)
# =============================================
vertices = spark.createDataFrame([
("A", "Alice", 25),
("B", "Bob", 30),
("C", "Charlie", 28),
("D", "David", 35),
("E", "Eve", 22),
("F", "Frank", 40),
], ["id", "name", "age"])
# =============================================
# 创建边数据(关注关系)
# =============================================
edges = spark.createDataFrame([
("A", "B", "follow"),
("A", "C", "follow"),
("B", "C", "follow"),
("C", "A", "follow"),
("D", "A", "follow"),
("D", "B", "follow"),
("E", "D", "follow"),
("F", "D", "follow"),
], ["src", "dst", "relationship"])
# =============================================
# 创建图
# =============================================
g = GraphFrame(vertices, edges)
print("=" * 60)
print("图信息")
print("=" * 60)
print(f"顶点数: {g.vertices.count()}")
print(f"边数: {g.edges.count()}")
# =============================================
# 运行PageRank
# =============================================
print("=" * 60)
print("PageRank结果")
print("=" * 60)
# 运行PageRank(收敛版)
results = g.pageRank(resetProbability=0.15, maxIter=20)
# 获取排名结果
pagerank_df = results.vertices.select("id", "name", "pagerank")
pagerank_df = pagerank_df.orderBy(pagerank_df["pagerank"].desc())
pagerank_df.show()
# =============================================
# 带权重的PageRank
# =============================================
print("=" * 60)
print("带权重PageRank")
print("=" * 60)
# 给边添加权重
edges_weighted = spark.createDataFrame([
("A", "B", "follow", 1.0),
("A", "C", "follow", 0.5), # A更关注B
("B", "C", "follow", 1.0),
("C", "A", "follow", 1.0),
("D", "A", "follow", 1.0),
("D", "B", "follow", 1.0),
("E", "D", "follow", 1.0),
("F", "D", "follow", 1.0),
], ["src", "dst", "relationship", "weight"])
g2 = GraphFrame(vertices, edges_weighted)
results2 = g2.pageRank(resetProbability=0.15, maxIter=20)
results2.vertices.select("id", "name", "pagerank") \
.orderBy("pagerank", ascending=False).show()
spark.stop()3.3 Scala (GraphX) 实现
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
object PageRankDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("PageRankDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
// 创建顶点
val vertices: RDD[(VertexId, (String, Int))] = sc.parallelize(Seq(
(1L, ("Alice", 25)),
(2L, ("Bob", 30)),
(3L, ("Charlie", 28)),
(4L, ("David", 35)),
(5L, ("Eve", 22))
))
// 创建边
val edges: RDD[Edge[Double]] = sc.parallelize(Seq(
Edge(1L, 2L, 1.0), // Alice -> Bob
Edge(1L, 3L, 1.0), // Alice -> Charlie
Edge(2L, 3L, 1.0), // Bob -> Charlie
Edge(3L, 1L, 1.0), // Charlie -> Alice
Edge(4L, 1L, 1.0), // David -> Alice
Edge(4L, 2L, 1.0), // David -> Bob
Edge(5L, 4L, 1.0) // Eve -> David
))
// 创建图
val graph = Graph(vertices, edges, ("default", 0))
// 运行PageRank
val pagerank = graph.pageRank(0.0001).vertices
// 合并顶点信息
val ranked = pagerank.join(vertices).map {
case (id, (rank, (name, age))) => (name, rank)
}
// 排序输出
println("PageRank结果:")
ranked.sortBy(_._2, ascending = false).collect().foreach(println)
sc.stop()
}
}4. 实战:社交网络分析
4.1 业务场景
分析社交网络中的:
- 最有影响力的用户(PageRank)
- 最活跃的用户(入度/出度)
- 关键意见领袖识别
4.2 完整示例
from pyspark.sql import SparkSession
from graphframes import GraphFrame
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("SocialNetworkAnalysis") \
.getOrCreate()
# =============================================
# 模拟社交网络数据
# =============================================
# 用户信息
users = spark.createDataFrame([
("u1", "张三", "北京", 1000),
("u2", "李四", "上海", 500),
("u3", "王五", "北京", 800),
("u4", "赵六", "深圳", 300),
("u5", "钱七", "杭州", 200),
("u6", "孙八", "广州", 150),
("u7", "周九", "成都", 100),
("u8", "吴十", "武汉", 80),
], ["user_id", "name", "city", "followers"])
# 关注关系(关注者 → 被关注者)
relationships = spark.createDataFrame([
("u1", "u2", "follow"), # 张三关注李四
("u1", "u3", "follow"), # 张三关注王五
("u1", "u4", "follow"), # 张三关注赵六
("u2", "u3", "follow"),
("u2", "u1", "follow"), # 李四也关注张三(互关)
("u3", "u1", "follow"),
("u3", "u2", "follow"),
("u4", "u1", "follow"),
("u4", "u2", "follow"),
("u4", "u3", "follow"),
("u5", "u1", "follow"),
("u5", "u4", "follow"),
("u6", "u4", "follow"),
("u7", "u6", "follow"),
("u8", "u7", "follow"),
], ["src", "dst", "type"])
# 创建图
g = GraphFrame(users, relationships)
print("=" * 60)
print("1. 基本统计")
print("=" * 60)
print(f"用户总数: {g.vertices.count()}")
print(f"关注关系总数: {g.edges.count()}")
# =============================================
# 2. PageRank - 影响力排名
# =============================================
print("=" * 60)
print("2. PageRank - 影响力排名")
print("=" * 60)
pagerank_results = g.pageRank(resetProbability=0.15, maxIter=20)
# 关联用户信息
influence = pagerank_results.vertices \
.join(users, pagerank_results.vertices.id == users.user_id) \
.select("user_id", "name", "city", "followers", "pagerank") \
.orderBy(F.desc("pagerank"))
influence.show()
# =============================================
# 3. 入度分析 - 被关注最多
# =============================================
print("=" * 60)
print("3. 入度分析 - 被关注最多")
print("=" * 60)
in_degree = g.inDegrees.withColumnRenamed("id", "user_id") \
.join(users, "user_id") \
.select("user_id", "name", "inDegree") \
.orderBy(F.desc("inDegree"))
in_degree.show()
# =============================================
# 4. 出度分析 - 关注最多
# =============================================
print("=" * 60)
print("4. 出度分析 - 关注最多")
print("=" * 60)
out_degree = g.outDegrees.withColumnRenamed("id", "user_id") \
.join(users, "user_id") \
.select("user_id", "name", "outDegree") \
.orderBy(F.desc("outDegree"))
out_degree.show()
# =============================================
# 5. 综合评分
# =============================================
print("=" * 60)
print("5. 综合影响力评分")
print("=" * 60)
# 合并多种指标
combined = pagerank_results.vertices \
.join(g.inDegrees, "id") \
.join(g.outDegrees, "id") \
.join(users, users.user_id == pagerank_results.vertices.id) \
.select(
"user_id",
"name",
"city",
"followers",
F.col("pagerank"),
F.col("inDegree").alias("followed_by"),
F.col("outDegree").alias("following")
)
# 计算综合评分 = 0.4*PageRank + 0.3*粉丝数 + 0.3*互关数
combined = combined.withColumn(
"score",
F.col("pagerank") * 0.4 +
F.col("followed_by") * 0.3 +
F.col("following") * 0.3
).orderBy(F.desc("score"))
combined.show()
spark.stop()5. 图算法扩展
5.1 其他常用图算法
┌─────────────────────────────────────────────────────────────────┐
│ 图算法一览 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PageRank: │
│ ├─ 用途: 节点重要性排名 │
│ └─ 场景: 影响力分析、推荐排序 │
│ │
│ Connected Components: │
│ ├─ 用途: 找出连通分量 │
│ └─ 场景: 社区发现、异常检测 │
│ │
│ Triangle Count: │
│ ├─ 用途: 统计三角形数量 │
│ └─ 场景: 社交密度分析 │
│ │
│ Shortest Path: │
│ ├─ 用途: 最短路径 │
│ └─ 场景: 关系链分析 │
│ │
│ Label Propagation: │
│ ├─ 用途: 社区发现 │
│ └─ 场景: 群体识别 │
│ │
│ SVD++: │
│ ├─ 用途: 矩阵分解 │
│ └─ 场景: 推荐系统 │
│ │
└─────────────────────────────────────────────────────────────────┘5.2 连通分量
# 找出连通分量
cc = g.connectedComponents()
cc.show()
# 社区发现
lp = g.labelPropagation(maxIter=5)
lp.show()6. 性能优化
6.1 优化技巧
# 1. 合理设置分区数
spark.conf.set("spark.sql.shuffle.partitions", "8")
# 2. 过滤无关边
edges_filtered = edges.filter(F.col("weight") > 0.5)
# 3. 缓存图
g.cache()
# 4. 预先聚合
g.degrees.cache()6.2 注意事项
┌─────────────────────────────────────────────────────────────────┐
│ 注意事项 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ⚠️ 悬挂节点: 没有出链的节点 │
│ → PageRank会从所有节点吸收PR值 │
│ → 解决方案: 使用damping处理 │
│ │
│ ⚠️ 循环: A→B→C→A │
│ → PR会在小团体中循环 │
│ → 阻尼因子可解决 │
│ │
│ ⚠️ 孤岛节点: 没有任何连接的节点 │
│ → 不会影响其他节点 │
│ → PR值趋近于初始值 │
│ │
└─────────────────────────────────────────────────────────────────┘7. 练习题
练习1
使用PageRank分析微博用户的影响力。
练习2
实现一个基于图算法的推荐系统。
练习3
对比不同阻尼因子对PageRank结果的影响。
順子の杂货铺


