数据加密与编码
本篇是《大数据算法与UDF系列》的第8篇,讲解大数据场景下的数据加密、哈希、编码和脱敏技术。
1. 常用加密算法
1.1 算法分类
┌─────────────────────────────────────────────────────────────────┐
│ 加密算法分类 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 哈希函数(单向): │
│ ├─ MD5: 128位,废弃(不安全) │
│ ├─ SHA-1: 160位,废弃(不安全) │
│ ├─ SHA-256: 256位,安全 │
│ └─ bcrypt: 带盐,安全 │
│ │
│ 对称加密: │
│ ├─ AES-128: 128位密钥 │
│ ├─ AES-256: 256位密钥(推荐) │
│ └─ DES: 56位,废弃 │
│ │
│ 非对称加密: │
│ ├─ RSA: 常用2048位 │
│ └─ ECC: 更短密钥 │
│ │
│ Base64编码: │
│ └─ 可逆编码,非加密 │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 业务场景
| 场景 | 需求 | 推荐方案 |
|---|---|---|
| 密码存储 | 单向不可逆 | bcrypt/scrypt |
| 数据完整性 | 防篡改 | SHA-256 |
| 敏感数据加密 | 可解密 | AES-256 |
| 数据传输 | 加密传输 | HTTPS/TLS |
| 脱敏展示 | 部分隐藏 | 自定义掩码 |
2. 哈希函数
2.1 MD5
用途:快速校验、数据去重(不建议用于安全场景)
import hashlib
# MD5 哈希
def md5_hash(data):
return hashlib.md5(str(data).encode()).hexdigest()
# 使用
print(md5_hash("hello")) # 5d41402abc4b2a76b9719d911017c5922.2 SHA-256
用途:安全校验、数字签名
import hashlib
def sha256_hash(data):
return hashlib.sha256(str(data).encode()).hexdigest()
print(sha256_hash("hello"))
# 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b98242.3 PySpark实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
spark = SparkSession.builder.getOrCreate()
# 创建MD5 UDF
@F.udf(StringType())
def md5_udf(data):
import hashlib
return hashlib.md5(str(data).encode()).hexdigest() if data else None
# 创建SHA256 UDF
@F.udf(StringType())
def sha256_udf(data):
import hashlib
return hashlib.sha256(str(data).encode()).hexdigest() if data else None
# 测试
data = [
("user1", "123456"),
("user2", "password"),
("user3", "123456"), # 相同密码,相同hash
]
df = spark.createDataFrame(data, ["username", "password"])
result = df.withColumn("password_md5", md5_udf(F.col("password"))) \
.withColumn("password_sha256", sha256_udf(F.col("password")))
result.show(truncate=False)2.4 Scala实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.security.MessageDigest
object HashFunctions {
def md5(data: String): String = {
MessageDigest.getInstance("MD5").digest(data.getBytes).map("%02x".format(_)).mkString
}
def sha256(data: String): String = {
MessageDigest.getInstance("SHA-256").digest(data.getBytes).map("%02x".format(_)).mkString
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("HashDemo").master("local[*]").getOrCreate()
import spark.implicits._
val md5UDF = udf(md5 _)
val sha256UDF = udf(sha256 _)
val data = Seq(("user1", "123456"), ("user2", "password"))
val df = data.toDF("username", "password")
df.withColumn("md5", md5UDF($"password"))
.withColumn("sha256", sha256UDF($"password"))
.show(false)
spark.stop()
}
}3. Base64编码
3.1 原理
┌─────────────────────────────────────────────────────────────────┐
│ Base64 原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 编码表: │
│ A-Z a-z 0-9 + / │
│ │
│ 原理: 每3个字节 → 4个Base64字符 │
│ │
│ 示例: "abc" │
│ │
│ 原始: 97(01100001) 98(01100010) 99(01100011) │
│ 分组: 011000 010110 001001 100011 │
│ 索引: 24 22 9 35 │
│ Base64:Y W J j │
│ │
│ 结果: "YWJj" │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 PySpark实现
import base64
@F.udf(StringType())
def base64_encode(data):
if data is None:
return None
return base64.b64encode(str(data).encode()).decode()
@F.udf(StringType())
def base64_decode(data):
if data is None:
return None
return base64.b64decode(data.encode()).decode()
# 使用
df.withColumn("encoded", base64_encode("sensitive_data")) \
.withColumn("decoded", base64_decode("encoded"))4. AES对称加密
4.1 Python实现
from cryptography.fernet import Fernet
import base64
import hashlib
class AESCrypto:
"""AES加密工具"""
def __init__(self, key):
# SHA256生成32字节密钥
key_hash = hashlib.sha256(key.encode()).digest()
self.cipher = Fernet(base64.urlsafe_b64encode(key_hash))
def encrypt(self, data):
"""加密"""
if data is None:
return None
return self.cipher.encrypt(str(data).encode()).decode()
def decrypt(self, encrypted_data):
"""解密"""
if encrypted_data is None:
return None
return self.cipher.decrypt(encrypted_data.encode()).decode()
# 使用
crypto = AESCrypto("my-secret-key")
encrypted = crypto.encrypt("Hello World")
print(f"加密: {encrypted}")
decrypted = crypto.decrypt(encrypted)
print(f"解密: {decrypted}")4.2 PySpark UDF
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
class AESCrypto:
def __init__(self, key):
from cryptography.fernet import Fernet
import hashlib
import base64
key_hash = hashlib.sha256(key.encode()).digest()
self.cipher = Fernet(base64.urlsafe_b64encode(key_hash))
def encrypt(self, data):
if data: return self.cipher.encrypt(str(data).encode()).decode()
return None
def decrypt(self, data):
if data: return self.cipher.decrypt(data.encode()).decode()
return None
# 创建UDF
crypto = AESCrypto("my-secret-key")
encrypt_udf = F.udf(crypto.encrypt, StringType())
decrypt_udf = F.udf(crypto.decrypt, StringType())
# 使用
df.withColumn("encrypted", encrypt_udf("sensitive_data")) \
.withColumn("decrypted", decrypt_udf("encrypted"))5. 数据脱敏
5.1 常见脱敏规则
┌─────────────────────────────────────────────────────────────────┐
│ 脱敏规则示例 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 手机号: 13812345678 → 138****5678 │
│ 身份证: 110101199001011234 → 110101********1234 │
│ 银行卡: 6222021234567890123 → ************0123 │
│ 姓名: 张三 → 张* │
│ 地址: 北京市朝阳区xxx → 北京市** │
│ 邮箱: test@example.com → t***@example.com │
│ │
└─────────────────────────────────────────────────────────────────┘5.2 脱敏函数实现
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
@F.udf(StringType())
def mask_phone(phone):
"""手机号脱敏: 13812345678 → 138****5678"""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + "****" + phone[7:]
@F.udf(StringType())
def mask_id_card(id_card):
"""身份证脱敏: 110101199001011234 → 110101********1234"""
if id_card is None or len(id_card) < 8:
return id_card
return id_card[:6] + "********" + id_card[-4:]
@F.udf(StringType())
def mask_bank_card(card_no):
"""银行卡脱敏: 6222021234567890123 → ************0123"""
if card_no is None or len(card_no) < 8:
return card_no
return "*" * (len(card_no) - 4) + card_no[-4:]
@F.udf(StringType())
def mask_name(name):
"""姓名脱敏: 张三 → 张*"""
if name is None or len(name) < 2:
return name
return name[0] + "*" * (len(name) - 1)
@F.udf(StringType())
def mask_email(email):
"""邮箱脱敏: test@example.com → t***t@example.com"""
if email is None or "@" not in email:
return email
parts = email.split("@")
username = parts[0]
if len(username) <= 2:
masked = username[0] + "***"
else:
masked = username[0] + "***" + username[-1]
return masked + "@" + parts[1]
# 使用
df.withColumn("phone_masked", mask_phone("phone")) \
.withColumn("id_card_masked", mask_id_card("id_card")) \
.withColumn("bank_card_masked", mask_bank_card("bank_card")) \
.withColumn("name_masked", mask_name("name")) \
.withColumn("email_masked", mask_email("email"))6. 实战:用户数据保护
6.1 完整示例
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import hashlib
import base64
spark = SparkSession.builder.getOrCreate()
# =============================================
# 模拟敏感数据
# =============================================
users = [
(1, "张三", "13812345678", "110101199001011234", "zhangsan@example.com", "password123"),
(2, "李四", "13998765432", "310101198505052345", "lisi@example.com", "pass456"),
(3, "王五", "13711112222", "440101199203033456", "wangwu@example.com", "pwd789"),
]
df = spark.createDataFrame(users, ["id", "name", "phone", "id_card", "email", "password"])
# =============================================
# 1. 基础脱敏
# =============================================
print("=" * 60)
print("1. 基础脱敏")
print("=" * 60)
@F.udf(StringType())
def mask_phone(phone):
if phone and len(phone) == 11:
return phone[:3] + "****" + phone[7:]
return phone
@F.udf(StringType())
def mask_id_card(id_card):
if id_card and len(id_card) >= 8:
return id_card[:6] + "********" + id_card[-4:]
return id_card
masked_df = df.withColumn("phone", mask_phone("phone")) \
.withColumn("id_card", mask_id_card("id_card"))
masked_df.show(truncate=False)
# =============================================
# 2. 哈希加密(不可逆)
# =============================================
print("=" * 60)
print("2. 哈希加密(密码存储)")
print("=" * 60)
@F.udf(StringType())
def hash_password(password):
if password:
# 使用SHA256 + 盐
salt = "my_salt"
return hashlib.sha256((password + salt).encode()).hexdigest()
return None
hashed_df = df.withColumn("password_hash", hash_password("password"))
hashed_df.select("id", "password_hash").show(truncate=False)
# =============================================
# 3. 可逆加密(AES)
# =============================================
print("=" * 60)
print("3. 可逆加密(临时加密)")
print("=" * 60)
from cryptography.fernet import Fernet
import hashlib
import base64
# 创建加密器
key = "my-secret-key"
key_hash = hashlib.sha256(key.encode()).digest()
cipher = Fernet(base64.urlsafe_b64encode(key_hash))
@F.udf(StringType())
def encrypt_data(data):
if data:
return cipher.encrypt(str(data).encode()).decode()
return None
@F.udf(StringType())
def decrypt_data(data):
if data:
return cipher.decrypt(data.encode()).decode()
return None
encrypted_df = df.withColumn("phone_encrypted", encrypt_data("phone")) \
.withColumn("phone_decrypted", decrypt_data("phone_encrypted"))
encrypted_df.select("phone", "phone_encrypted", "phone_decrypted").show(truncate=False)
# =============================================
# 4. 组合使用
# =============================================
print("=" * 60)
print("4. 组合:脱敏 + 哈希 + 展示")
print("=" * 60)
final_df = df.withColumn("phone_masked", mask_phone("phone")) \
.withColumn("id_card_masked", mask_id_card("id_card")) \
.withColumn("password_hash", hash_password("password")) \
.drop("phone", "id_card", "password")
final_df.show(truncate=False)
spark.stop()7. Scala实现
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.security.MessageDigest
object DataMasking {
// MD5哈希
def md5(data: String): String = {
MessageDigest.getInstance("MD5").digest(data.getBytes).map("%02x".format(_)).mkString
}
// SHA256哈希
def sha256(data: String): String = {
MessageDigest.getInstance("SHA-256").digest(data.getBytes).map("%02x".format(_)).mkString
}
// 手机号脱敏
def maskPhone(phone: String): String = {
if (phone != null && phone.length == 11) {
phone.substring(0, 3) + "****" + phone.substring(7)
} else phone
}
// 身份证脱敏
def maskIdCard(idCard: String): String = {
if (idCard != null && idCard.length >= 8) {
idCard.substring(0, 6) + "********" + idCard.substring(idCard.length - 4)
} else idCard
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataMasking")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 创建UDF
val md5UDF = udf(md5 _)
val sha256UDF = udf(sha256 _)
val maskPhoneUDF = udf(maskPhone _)
val maskIdCardUDF = udf(maskIdCard _)
// 测试数据
val data = Seq(
(1, "张三", "13812345678", "110101199001011234"),
(2, "李四", "13998765432", "310101198505052345")
).toDF("id", "name", "phone", "id_card")
// 应用脱敏
val result = data
.withColumn("phone_masked", maskPhoneUDF($"phone"))
.withColumn("id_card_masked", maskIdCardUDF($"id_card"))
result.show(false)
spark.stop()
}
}8. 注意事项
8.1 安全建议
┌─────────────────────────────────────────────────────────────────┐
│ 安全最佳实践 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ❌ 禁止: │
│ ├─ 使用MD5存储密码 │
│ ├─ 硬编码加密密钥 │
│ ├─ 传输明文密码 │
│ └─ 日志记录敏感数据 │
│ │
│ ✅ 推荐: │
│ ├─ 密码使用bcrypt/scrypt │
│ ├─ 敏感数据使用AES-256 │
│ ├─ 密钥使用密钥管理服务(KMS) │
│ └─ 日志脱敏 │
│ │
└─────────────────────────────────────────────────────────────────┘8.2 性能考虑
# 大数据量加密时:
# 1. 使用广播小表
# 2. 避免频繁创建加密对象
# 3. 考虑硬件加速9. 练习题
练习1
实现一个完整的用户数据保护方案。
练习2
对身份证号进行脱敏,但保留出生日期信息。
练习3
使用AES加密实现数据的加密和解密。
順子の杂货铺


