顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x09-图算法PageRank

DMIT VPS

图算法PageRank

本篇是《大数据算法与UDF系列》的第9篇,深入讲解PageRank算法的原理,以及如何使用Spark GraphX进行大规模图计算。


1. PageRank简介

1.1 背景故事

PageRank由Google创始人Larry Page和Sergey Brin于1998年提出,最初用于Google搜索引擎的网页排名。

┌─────────────────────────────────────────────────────────────────┐
│                      PageRank 核心思想                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   一个网页的重要性,取决于:                                      │
│   ├─ 有多少其他网页链接到它                                      │
│   ├─ 链接到它的网页本身有多重要                                   │
│                                                                 │
│   ─────────────────────────────────────                        │
│                                                                 │
│   直观理解:                                                      │
│                                                                 │
│   A ───▶ B ───▶ C                                               │
│   │       │                                                      │
│   │       ▼                                                      │
│   └──▶ D ◀── E                                                  │
│                                                                 │
│   PageRank:                                                     │
│   - A有D和E两个出链                                              │
│   - B被A和C链接                                                  │
│   - C被B链接                                                      │
│   - D被A链接                                                      │
│   - E没有出链(可能是新网页)                                     │
│                                                                 │
│   重要性: C > B > D > A > E                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 业务场景

场景应用
搜索引擎网页排名
社交网络影响力用户识别
推荐系统关键节点发现
金融风控可疑交易检测
生物网络关键蛋白质发现

2. PageRank原理

2.1 核心公式

PR(A) = (1-d)/N + d × Σ(PR(B) / Out(B))

其中:
- PR(A): 页面A的PageRank值
- d: 阻尼因子(通常0.85)
- N: 总页面数
- Σ: 对所有链接到A的页面求和
- Out(B): 页面B的出链数量

2.2 算法图解

┌─────────────────────────────────────────────────────────────────┐
│                    PageRank 计算过程                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Step 1: 初始化                                                │
│   ─────────────────────────────────────                        │
│                                                                 │
│   A ───▶ B ───▶ C                                               │
│   │                                                            │
│   ▼                                                            │
│   D                                                            │
│                                                                 │
│   初始PR值: PR(A)=0.25, PR(B)=0.25, PR(C)=0.25, PR(D)=0.25   │
│                                                                 │
│   Step 2: 迭代计算                                              │
│   ─────────────────────────────────────                        │
│                                                                 │
│   第1次迭代:                                                    │
│   - PR(A) = 0.15 + 0.85 × 0 = 0.15 (无人链接)                  │
│   - PR(B) = 0.15 + 0.85 × 0.25/1 = 0.3625 (A→B, 出链=1)       │
│   - PR(C) = 0.15 + 0.85 × 0.25/1 = 0.3625 (B→C)               │
│   - PR(D) = 0.15 + 0.85 × 0.25/1 = 0.3625 (A→D)               │
│                                                                 │
│   第2次迭代:                                                    │
│   - PR(B) = 0.15 + 0.85 × 0.15/1 = 0.2775                      │
│   - PR(C) = 0.15 + 0.85 × 0.3625/1 = 0.4581                    │
│   - PR(D) = 0.15 + 0.85 × 0.15/1 = 0.2775                      │
│   - PR(A) = 0.15 + 0.85 × 0 = 0.15                             │
│                                                                 │
│   Step 3: 收敛                                                  │
│   ─────────────────────────────────────                        │
│                                                                 │
│   迭代10-20次后,PR值趋于稳定                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.3 阻尼因子

┌─────────────────────────────────────────────────────────────────┐
│                      阻尼因子 d                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   d = 0.85 (常用值)                                           │
│                                                                 │
│   含义:                                                         │
│   - 用户有85%概率点击页面上的链接                                │
│   - 用户有15%概率直接跳转到任意页面(随机浏览)                   │
│                                                                 │
│   作用:                                                         │
│   - 防止排名只出不进(没有入链的页面)                           │
│   - 保证算法收敛                                                │
│                                                                 │
│   调整:                                                         │
│   - d越大: 越依赖链接关系                                       │
│   - d越小: 越接近均匀分布                                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3. Spark GraphX实现

3.1 GraphX核心概念

┌─────────────────────────────────────────────────────────────────┐
│                      GraphX 核心概念                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Vertex (顶点):                                                │
│   - 代表实体(用户、网页)                                       │
│   - VertexRDD[VD]                                              │
│                                                                 │
│   Edge (边):                                                   │
│   - 代表关系(关注、链接)                                       │
│   - EdgeRDD[ED]                                                │
│                                                                 │
│   图结构:                                                       │
│                                                                 │
│        [用户A] ──关注──▶ [用户B]                                │
│        [用户A] ──关注──▶ [用户C]                                │
│        [用户B] ──关注──▶ [用户C]                                │
│        [用户C] ──关注──▶ [用户A]                                │
│                                                                 │
│   VertexRDD:                                                  │
│   (A, 用户信息) (B, 用户信息) (C, 用户信息)                   │
│                                                                 │
│   EdgeRDD:                                                     │
│   (A→B, 关注) (A→C, 关注) (B→C, 关注) (C→A, 关注)            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 Python (GraphFrames) 实现

from pyspark.sql import SparkSession
from graphframes import GraphFrame

spark = SparkSession.builder \
    .appName("PageRankDemo") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# =============================================
# 创建顶点数据(用户)
# =============================================
vertices = spark.createDataFrame([
    ("A", "Alice", 25),
    ("B", "Bob", 30),
    ("C", "Charlie", 28),
    ("D", "David", 35),
    ("E", "Eve", 22),
    ("F", "Frank", 40),
], ["id", "name", "age"])

# =============================================
# 创建边数据(关注关系)
# =============================================
edges = spark.createDataFrame([
    ("A", "B", "follow"),
    ("A", "C", "follow"),
    ("B", "C", "follow"),
    ("C", "A", "follow"),
    ("D", "A", "follow"),
    ("D", "B", "follow"),
    ("E", "D", "follow"),
    ("F", "D", "follow"),
], ["src", "dst", "relationship"])

# =============================================
# 创建图
# =============================================
g = GraphFrame(vertices, edges)

print("=" * 60)
print("图信息")
print("=" * 60)
print(f"顶点数: {g.vertices.count()}")
print(f"边数: {g.edges.count()}")

# =============================================
# 运行PageRank
# =============================================
print("=" * 60)
print("PageRank结果")
print("=" * 60)

# 运行PageRank(收敛版)
results = g.pageRank(resetProbability=0.15, maxIter=20)

# 获取排名结果
pagerank_df = results.vertices.select("id", "name", "pagerank")
pagerank_df = pagerank_df.orderBy(pagerank_df["pagerank"].desc())

pagerank_df.show()

# =============================================
# 带权重的PageRank
# =============================================
print("=" * 60)
print("带权重PageRank")
print("=" * 60)

# 给边添加权重
edges_weighted = spark.createDataFrame([
    ("A", "B", "follow", 1.0),
    ("A", "C", "follow", 0.5),  # A更关注B
    ("B", "C", "follow", 1.0),
    ("C", "A", "follow", 1.0),
    ("D", "A", "follow", 1.0),
    ("D", "B", "follow", 1.0),
    ("E", "D", "follow", 1.0),
    ("F", "D", "follow", 1.0),
], ["src", "dst", "relationship", "weight"])

g2 = GraphFrame(vertices, edges_weighted)
results2 = g2.pageRank(resetProbability=0.15, maxIter=20)

results2.vertices.select("id", "name", "pagerank") \
    .orderBy("pagerank", ascending=False).show()

spark.stop()

3.3 Scala (GraphX) 实现

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object PageRankDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("PageRankDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 创建顶点
    val vertices: RDD[(VertexId, (String, Int))] = sc.parallelize(Seq(
      (1L, ("Alice", 25)),
      (2L, ("Bob", 30)),
      (3L, ("Charlie", 28)),
      (4L, ("David", 35)),
      (5L, ("Eve", 22))
    ))

    // 创建边
    val edges: RDD[Edge[Double]] = sc.parallelize(Seq(
      Edge(1L, 2L, 1.0),  // Alice -> Bob
      Edge(1L, 3L, 1.0),  // Alice -> Charlie
      Edge(2L, 3L, 1.0),  // Bob -> Charlie
      Edge(3L, 1L, 1.0),  // Charlie -> Alice
      Edge(4L, 1L, 1.0),  // David -> Alice
      Edge(4L, 2L, 1.0),  // David -> Bob
      Edge(5L, 4L, 1.0)   // Eve -> David
    ))

    // 创建图
    val graph = Graph(vertices, edges, ("default", 0))

    // 运行PageRank
    val pagerank = graph.pageRank(0.0001).vertices

    // 合并顶点信息
    val ranked = pagerank.join(vertices).map {
      case (id, (rank, (name, age))) => (name, rank)
    }

    // 排序输出
    println("PageRank结果:")
    ranked.sortBy(_._2, ascending = false).collect().foreach(println)

    sc.stop()
  }
}

4. 实战:社交网络分析

4.1 业务场景

分析社交网络中的:

  1. 最有影响力的用户(PageRank)
  2. 最活跃的用户(入度/出度)
  3. 关键意见领袖识别

4.2 完整示例

from pyspark.sql import SparkSession
from graphframes import GraphFrame
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("SocialNetworkAnalysis") \
    .getOrCreate()

# =============================================
# 模拟社交网络数据
# =============================================

# 用户信息
users = spark.createDataFrame([
    ("u1", "张三", "北京", 1000),
    ("u2", "李四", "上海", 500),
    ("u3", "王五", "北京", 800),
    ("u4", "赵六", "深圳", 300),
    ("u5", "钱七", "杭州", 200),
    ("u6", "孙八", "广州", 150),
    ("u7", "周九", "成都", 100),
    ("u8", "吴十", "武汉", 80),
], ["user_id", "name", "city", "followers"])

# 关注关系(关注者 → 被关注者)
relationships = spark.createDataFrame([
    ("u1", "u2", "follow"),  # 张三关注李四
    ("u1", "u3", "follow"),  # 张三关注王五
    ("u1", "u4", "follow"),  # 张三关注赵六
    ("u2", "u3", "follow"),
    ("u2", "u1", "follow"),  # 李四也关注张三(互关)
    ("u3", "u1", "follow"),
    ("u3", "u2", "follow"),
    ("u4", "u1", "follow"),
    ("u4", "u2", "follow"),
    ("u4", "u3", "follow"),
    ("u5", "u1", "follow"),
    ("u5", "u4", "follow"),
    ("u6", "u4", "follow"),
    ("u7", "u6", "follow"),
    ("u8", "u7", "follow"),
], ["src", "dst", "type"])

# 创建图
g = GraphFrame(users, relationships)

print("=" * 60)
print("1. 基本统计")
print("=" * 60)
print(f"用户总数: {g.vertices.count()}")
print(f"关注关系总数: {g.edges.count()}")

# =============================================
# 2. PageRank - 影响力排名
# =============================================
print("=" * 60)
print("2. PageRank - 影响力排名")
print("=" * 60)

pagerank_results = g.pageRank(resetProbability=0.15, maxIter=20)

# 关联用户信息
influence = pagerank_results.vertices \
    .join(users, pagerank_results.vertices.id == users.user_id) \
    .select("user_id", "name", "city", "followers", "pagerank") \
    .orderBy(F.desc("pagerank"))

influence.show()

# =============================================
# 3. 入度分析 - 被关注最多
# =============================================
print("=" * 60)
print("3. 入度分析 - 被关注最多")
print("=" * 60)

in_degree = g.inDegrees.withColumnRenamed("id", "user_id") \
    .join(users, "user_id") \
    .select("user_id", "name", "inDegree") \
    .orderBy(F.desc("inDegree"))

in_degree.show()

# =============================================
# 4. 出度分析 - 关注最多
# =============================================
print("=" * 60)
print("4. 出度分析 - 关注最多")
print("=" * 60)

out_degree = g.outDegrees.withColumnRenamed("id", "user_id") \
    .join(users, "user_id") \
    .select("user_id", "name", "outDegree") \
    .orderBy(F.desc("outDegree"))

out_degree.show()

# =============================================
# 5. 综合评分
# =============================================
print("=" * 60)
print("5. 综合影响力评分")
print("=" * 60)

# 合并多种指标
combined = pagerank_results.vertices \
    .join(g.inDegrees, "id") \
    .join(g.outDegrees, "id") \
    .join(users, users.user_id == pagerank_results.vertices.id) \
    .select(
        "user_id",
        "name",
        "city",
        "followers",
        F.col("pagerank"),
        F.col("inDegree").alias("followed_by"),
        F.col("outDegree").alias("following")
    )

# 计算综合评分 = 0.4*PageRank + 0.3*粉丝数 + 0.3*互关数
combined = combined.withColumn(
    "score",
    F.col("pagerank") * 0.4 +
    F.col("followed_by") * 0.3 +
    F.col("following") * 0.3
).orderBy(F.desc("score"))

combined.show()

spark.stop()

5. 图算法扩展

5.1 其他常用图算法

┌─────────────────────────────────────────────────────────────────┐
│                      图算法一览                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   PageRank:                                                    │
│   ├─ 用途: 节点重要性排名                                       │
│   └─ 场景: 影响力分析、推荐排序                                  │
│                                                                 │
│   Connected Components:                                         │
│   ├─ 用途: 找出连通分量                                         │
│   └─ 场景: 社区发现、异常检测                                    │
│                                                                 │
│   Triangle Count:                                              │
│   ├─ 用途: 统计三角形数量                                        │
│   └─ 场景: 社交密度分析                                         │
│                                                                 │
│   Shortest Path:                                               │
│   ├─ 用途: 最短路径                                             │
│   └─ 场景: 关系链分析                                           │
│                                                                 │
│   Label Propagation:                                           │
│   ├─ 用途: 社区发现                                             │
│   └─ 场景: 群体识别                                             │
│                                                                 │
│   SVD++:                                                       │
│   ├─ 用途: 矩阵分解                                              │
│   └─ 场景: 推荐系统                                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 连通分量

# 找出连通分量
cc = g.connectedComponents()
cc.show()

# 社区发现
lp = g.labelPropagation(maxIter=5)
lp.show()

6. 性能优化

6.1 优化技巧

# 1. 合理设置分区数
spark.conf.set("spark.sql.shuffle.partitions", "8")

# 2. 过滤无关边
edges_filtered = edges.filter(F.col("weight") > 0.5)

# 3. 缓存图
g.cache()

# 4. 预先聚合
g.degrees.cache()

6.2 注意事项

┌─────────────────────────────────────────────────────────────────┐
│                      注意事项                                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ⚠️ 悬挂节点: 没有出链的节点                                    │
│      → PageRank会从所有节点吸收PR值                             │
│      → 解决方案: 使用damping处理                                │
│                                                                 │
│   ⚠️ 循环: A→B→C→A                                              │
│      → PR会在小团体中循环                                        │
│      → 阻尼因子可解决                                           │
│                                                                 │
│   ⚠️ 孤岛节点: 没有任何连接的节点                                │
│      → 不会影响其他节点                                         │
│      → PR值趋近于初始值                                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

7. 练习题

练习1

使用PageRank分析微博用户的影响力。

练习2

实现一个基于图算法的推荐系统。

练习3

对比不同阻尼因子对PageRank结果的影响。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x09-图算法PageRank
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们