顺子の杂货铺
生命不息,折腾不止,且行且珍惜~

0x08-数据加密与编码

DMIT VPS

数据加密与编码

本篇是《大数据算法与UDF系列》的第8篇,讲解大数据场景下的数据加密、哈希、编码和脱敏技术。


1. 常用加密算法

1.1 算法分类

┌─────────────────────────────────────────────────────────────────┐
│                      加密算法分类                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   哈希函数(单向):                                             │
│   ├─ MD5: 128位,废弃(不安全)                                │
│   ├─ SHA-1: 160位,废弃(不安全)                              │
│   ├─ SHA-256: 256位,安全                                      │
│   └─ bcrypt: 带盐,安全                                         │
│                                                                 │
│   对称加密:                                                     │
│   ├─ AES-128: 128位密钥                                        │
│   ├─ AES-256: 256位密钥(推荐)                                │
│   └─ DES: 56位,废弃                                           │
│                                                                 │
│   非对称加密:                                                   │
│   ├─ RSA: 常用2048位                                           │
│   └─ ECC: 更短密钥                                             │
│                                                                 │
│   Base64编码:                                                  │
│   └─ 可逆编码,非加密                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 业务场景

场景需求推荐方案
密码存储单向不可逆bcrypt/scrypt
数据完整性防篡改SHA-256
敏感数据加密可解密AES-256
数据传输加密传输HTTPS/TLS
脱敏展示部分隐藏自定义掩码

2. 哈希函数

2.1 MD5

用途:快速校验、数据去重(不建议用于安全场景)

import hashlib

# MD5 哈希
def md5_hash(data):
    return hashlib.md5(str(data).encode()).hexdigest()

# 使用
print(md5_hash("hello"))  # 5d41402abc4b2a76b9719d911017c592

2.2 SHA-256

用途:安全校验、数字签名

import hashlib

def sha256_hash(data):
    return hashlib.sha256(str(data).encode()).hexdigest()

print(sha256_hash("hello"))
# 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824

2.3 PySpark实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

spark = SparkSession.builder.getOrCreate()

# 创建MD5 UDF
@F.udf(StringType())
def md5_udf(data):
    import hashlib
    return hashlib.md5(str(data).encode()).hexdigest() if data else None

# 创建SHA256 UDF
@F.udf(StringType())
def sha256_udf(data):
    import hashlib
    return hashlib.sha256(str(data).encode()).hexdigest() if data else None

# 测试
data = [
    ("user1", "123456"),
    ("user2", "password"),
    ("user3", "123456"),  # 相同密码,相同hash
]
df = spark.createDataFrame(data, ["username", "password"])

result = df.withColumn("password_md5", md5_udf(F.col("password"))) \
           .withColumn("password_sha256", sha256_udf(F.col("password")))

result.show(truncate=False)

2.4 Scala实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.security.MessageDigest

object HashFunctions {
  def md5(data: String): String = {
    MessageDigest.getInstance("MD5").digest(data.getBytes).map("%02x".format(_)).mkString
  }

  def sha256(data: String): String = {
    MessageDigest.getInstance("SHA-256").digest(data.getBytes).map("%02x".format(_)).mkString
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("HashDemo").master("local[*]").getOrCreate()

    import spark.implicits._

    val md5UDF = udf(md5 _)
    val sha256UDF = udf(sha256 _)

    val data = Seq(("user1", "123456"), ("user2", "password"))
    val df = data.toDF("username", "password")

    df.withColumn("md5", md5UDF($"password"))
      .withColumn("sha256", sha256UDF($"password"))
      .show(false)

    spark.stop()
  }
}

3. Base64编码

3.1 原理

┌─────────────────────────────────────────────────────────────────┐
│                      Base64 原理                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   编码表:                                                       │
│   A-Z a-z 0-9 + /                                            │
│                                                                 │
│   原理: 每3个字节 → 4个Base64字符                               │
│                                                                 │
│   示例: "abc"                                                 │
│                                                                 │
│   原始:  97(01100001) 98(01100010) 99(01100011)              │
│   分组:  011000 010110 001001 100011                         │
│   索引:  24     22      9       35                            │
│   Base64:Y      W      J      j                               │
│                                                                 │
│   结果: "YWJj"                                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 PySpark实现

import base64

@F.udf(StringType())
def base64_encode(data):
    if data is None:
        return None
    return base64.b64encode(str(data).encode()).decode()

@F.udf(StringType())
def base64_decode(data):
    if data is None:
        return None
    return base64.b64decode(data.encode()).decode()

# 使用
df.withColumn("encoded", base64_encode("sensitive_data")) \
  .withColumn("decoded", base64_decode("encoded"))

4. AES对称加密

4.1 Python实现

from cryptography.fernet import Fernet
import base64
import hashlib

class AESCrypto:
    """AES加密工具"""

    def __init__(self, key):
        # SHA256生成32字节密钥
        key_hash = hashlib.sha256(key.encode()).digest()
        self.cipher = Fernet(base64.urlsafe_b64encode(key_hash))

    def encrypt(self, data):
        """加密"""
        if data is None:
            return None
        return self.cipher.encrypt(str(data).encode()).decode()

    def decrypt(self, encrypted_data):
        """解密"""
        if encrypted_data is None:
            return None
        return self.cipher.decrypt(encrypted_data.encode()).decode()

# 使用
crypto = AESCrypto("my-secret-key")

encrypted = crypto.encrypt("Hello World")
print(f"加密: {encrypted}")

decrypted = crypto.decrypt(encrypted)
print(f"解密: {decrypted}")

4.2 PySpark UDF

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

class AESCrypto:
    def __init__(self, key):
        from cryptography.fernet import Fernet
        import hashlib
        import base64
        key_hash = hashlib.sha256(key.encode()).digest()
        self.cipher = Fernet(base64.urlsafe_b64encode(key_hash))

    def encrypt(self, data):
        if data: return self.cipher.encrypt(str(data).encode()).decode()
        return None

    def decrypt(self, data):
        if data: return self.cipher.decrypt(data.encode()).decode()
        return None

# 创建UDF
crypto = AESCrypto("my-secret-key")

encrypt_udf = F.udf(crypto.encrypt, StringType())
decrypt_udf = F.udf(crypto.decrypt, StringType())

# 使用
df.withColumn("encrypted", encrypt_udf("sensitive_data")) \
  .withColumn("decrypted", decrypt_udf("encrypted"))

5. 数据脱敏

5.1 常见脱敏规则

┌─────────────────────────────────────────────────────────────────┐
│                      脱敏规则示例                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   手机号: 13812345678 → 138****5678                           │
│   身份证: 110101199001011234 → 110101********1234            │
│   银行卡: 6222021234567890123 → ************0123              │
│   姓名: 张三 → 张*                                              │
│   地址: 北京市朝阳区xxx → 北京市**                              │
│   邮箱: test@example.com → t***@example.com                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 脱敏函数实现

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

@F.udf(StringType())
def mask_phone(phone):
    """手机号脱敏: 13812345678 → 138****5678"""
    if phone is None or len(phone) != 11:
        return phone
    return phone[:3] + "****" + phone[7:]

@F.udf(StringType())
def mask_id_card(id_card):
    """身份证脱敏: 110101199001011234 → 110101********1234"""
    if id_card is None or len(id_card) < 8:
        return id_card
    return id_card[:6] + "********" + id_card[-4:]

@F.udf(StringType())
def mask_bank_card(card_no):
    """银行卡脱敏: 6222021234567890123 → ************0123"""
    if card_no is None or len(card_no) < 8:
        return card_no
    return "*" * (len(card_no) - 4) + card_no[-4:]

@F.udf(StringType())
def mask_name(name):
    """姓名脱敏: 张三 → 张*"""
    if name is None or len(name) < 2:
        return name
    return name[0] + "*" * (len(name) - 1)

@F.udf(StringType())
def mask_email(email):
    """邮箱脱敏: test@example.com → t***t@example.com"""
    if email is None or "@" not in email:
        return email
    parts = email.split("@")
    username = parts[0]
    if len(username) <= 2:
        masked = username[0] + "***"
    else:
        masked = username[0] + "***" + username[-1]
    return masked + "@" + parts[1]

# 使用
df.withColumn("phone_masked", mask_phone("phone")) \
  .withColumn("id_card_masked", mask_id_card("id_card")) \
  .withColumn("bank_card_masked", mask_bank_card("bank_card")) \
  .withColumn("name_masked", mask_name("name")) \
  .withColumn("email_masked", mask_email("email"))

6. 实战:用户数据保护

6.1 完整示例

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import hashlib
import base64

spark = SparkSession.builder.getOrCreate()

# =============================================
# 模拟敏感数据
# =============================================
users = [
    (1, "张三", "13812345678", "110101199001011234", "zhangsan@example.com", "password123"),
    (2, "李四", "13998765432", "310101198505052345", "lisi@example.com", "pass456"),
    (3, "王五", "13711112222", "440101199203033456", "wangwu@example.com", "pwd789"),
]

df = spark.createDataFrame(users, ["id", "name", "phone", "id_card", "email", "password"])

# =============================================
# 1. 基础脱敏
# =============================================
print("=" * 60)
print("1. 基础脱敏")
print("=" * 60)

@F.udf(StringType())
def mask_phone(phone):
    if phone and len(phone) == 11:
        return phone[:3] + "****" + phone[7:]
    return phone

@F.udf(StringType())
def mask_id_card(id_card):
    if id_card and len(id_card) >= 8:
        return id_card[:6] + "********" + id_card[-4:]
    return id_card

masked_df = df.withColumn("phone", mask_phone("phone")) \
             .withColumn("id_card", mask_id_card("id_card"))

masked_df.show(truncate=False)

# =============================================
# 2. 哈希加密(不可逆)
# =============================================
print("=" * 60)
print("2. 哈希加密(密码存储)")
print("=" * 60)

@F.udf(StringType())
def hash_password(password):
    if password:
        # 使用SHA256 + 盐
        salt = "my_salt"
        return hashlib.sha256((password + salt).encode()).hexdigest()
    return None

hashed_df = df.withColumn("password_hash", hash_password("password"))
hashed_df.select("id", "password_hash").show(truncate=False)

# =============================================
# 3. 可逆加密(AES)
# =============================================
print("=" * 60)
print("3. 可逆加密(临时加密)")
print("=" * 60)

from cryptography.fernet import Fernet
import hashlib
import base64

# 创建加密器
key = "my-secret-key"
key_hash = hashlib.sha256(key.encode()).digest()
cipher = Fernet(base64.urlsafe_b64encode(key_hash))

@F.udf(StringType())
def encrypt_data(data):
    if data:
        return cipher.encrypt(str(data).encode()).decode()
    return None

@F.udf(StringType())
def decrypt_data(data):
    if data:
        return cipher.decrypt(data.encode()).decode()
    return None

encrypted_df = df.withColumn("phone_encrypted", encrypt_data("phone")) \
                .withColumn("phone_decrypted", decrypt_data("phone_encrypted"))

encrypted_df.select("phone", "phone_encrypted", "phone_decrypted").show(truncate=False)

# =============================================
# 4. 组合使用
# =============================================
print("=" * 60)
print("4. 组合:脱敏 + 哈希 + 展示")
print("=" * 60)

final_df = df.withColumn("phone_masked", mask_phone("phone")) \
             .withColumn("id_card_masked", mask_id_card("id_card")) \
             .withColumn("password_hash", hash_password("password")) \
             .drop("phone", "id_card", "password")

final_df.show(truncate=False)

spark.stop()

7. Scala实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.security.MessageDigest

object DataMasking {
  // MD5哈希
  def md5(data: String): String = {
    MessageDigest.getInstance("MD5").digest(data.getBytes).map("%02x".format(_)).mkString
  }

  // SHA256哈希
  def sha256(data: String): String = {
    MessageDigest.getInstance("SHA-256").digest(data.getBytes).map("%02x".format(_)).mkString
  }

  // 手机号脱敏
  def maskPhone(phone: String): String = {
    if (phone != null && phone.length == 11) {
      phone.substring(0, 3) + "****" + phone.substring(7)
    } else phone
  }

  // 身份证脱敏
  def maskIdCard(idCard: String): String = {
    if (idCard != null && idCard.length >= 8) {
      idCard.substring(0, 6) + "********" + idCard.substring(idCard.length - 4)
    } else idCard
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("DataMasking")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // 创建UDF
    val md5UDF = udf(md5 _)
    val sha256UDF = udf(sha256 _)
    val maskPhoneUDF = udf(maskPhone _)
    val maskIdCardUDF = udf(maskIdCard _)

    // 测试数据
    val data = Seq(
      (1, "张三", "13812345678", "110101199001011234"),
      (2, "李四", "13998765432", "310101198505052345")
    ).toDF("id", "name", "phone", "id_card")

    // 应用脱敏
    val result = data
      .withColumn("phone_masked", maskPhoneUDF($"phone"))
      .withColumn("id_card_masked", maskIdCardUDF($"id_card"))

    result.show(false)

    spark.stop()
  }
}

8. 注意事项

8.1 安全建议

┌─────────────────────────────────────────────────────────────────┐
│                      安全最佳实践                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ❌ 禁止:                                                     │
│   ├─ 使用MD5存储密码                                           │
│   ├─ 硬编码加密密钥                                            │
│   ├─ 传输明文密码                                              │
│   └─ 日志记录敏感数据                                           │
│                                                                 │
│   ✅ 推荐:                                                     │
│   ├─ 密码使用bcrypt/scrypt                                    │
│   ├─ 敏感数据使用AES-256                                       │
│   ├─ 密钥使用密钥管理服务(KMS)                                │
│   └─ 日志脱敏                                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

8.2 性能考虑

# 大数据量加密时:
# 1. 使用广播小表
# 2. 避免频繁创建加密对象
# 3. 考虑硬件加速

9. 练习题

练习1

实现一个完整的用户数据保护方案。

练习2

对身份证号进行脱敏,但保留出生日期信息。

练习3

使用AES加密实现数据的加密和解密。


赞(0)
未经允许不得转载:順子の杂货铺 » 0x08-数据加密与编码
搬瓦工VPS

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

分享创造快乐

联系我们联系我们