大数据SQL语法补充
本篇补充大数据平台(MaxCompute/Hive/Hologres)的常用语法和技巧。
一、Hive特有语法
1. 内部表与外部表
-- 内部表(管理表,数据由Hive管理)
CREATE TABLE internal_table (
id INT,
name STRING,
amount DECIMAL(10,2)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS ORC
LOCATION '/user/hive/warehouse/internal.db';
-- 外部表(数据由外部管理,删除表不删除数据)
CREATE EXTERNAL TABLE external_table (
id INT,
name STRING,
amount DECIMAL(10,2)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/data/external_table';
2. 分桶表
-- 按字段哈希分桶
CREATE TABLE users_bucketed (
user_id BIGINT,
name STRING,
age INT
)
CLUSTERED BY (user_id) INTO 32 BUCKETS;
-- 分桶表抽样
SELECT * FROM users_bucketed TABLESAMPLE (BUCKET 1 OUT OF 32 ON user_id);
3. CTAS(Create Table As Select)
CREATE TABLE new_table AS
SELECT col1, col2, SUM(col3) AS total
FROM source_table
GROUP BY col1, col2;
4. INSERT OVERWRITE
-- 覆盖写入
INSERT OVERWRITE TABLE target_table
SELECT * FROM source_table;
-- 动态分区覆盖
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE target_table PARTITION (dt)
SELECT * FROM source_table;
二、MaxCompute特有语法
1. 表生命周期
CREATE TABLE sales_mc (
order_id BIGINT,
amount DECIMAL(10,2),
order_time DATETIME
)
PARTITIONED BY (dt STRING)
LIFECYCLE 30; -- 30天后自动删除
2. 隧道上传下载
-- Tunnel命令(命令行)
tunnel upload data.txt project_name.table_name;
-- SDK上传
odps = Odps(access_id, secret_access_key, project)
with odps.execute_job().tunnel(session) as tunnel:
tunnel.upload_table('table_name', 'data.csv')
3. MAP JOIN
-- 小表广播到所有节点
SELECT /*+ MAPJOIN(small_table) */ *
FROM large_table l
JOIN small_table s ON l.id = s.id;
三、Hologres特有语法
1. 实时写入
-- 实时写入(支持upsert)
INSERT INTO table_name (col1, col2)
VALUES (v1, v2)
ON CONFLICT (col1) DO UPDATE SET col2 = EXCLUDED.col2;
2. 物化视图
-- 创建物化视图
CREATE MATERIALIZED VIEW mv_sales AS
SELECT
DATE_TRUNC('day', order_time) AS day,
SUM(amount) AS daily_sales
FROM orders
GROUP BY DATE_TRUNC('day', order_time);
-- 刷新物化视图
REFRESH MATERIALIZED VIEW mv_sales;
3. 行列转换
-- 使用crosstab扩展
SELECT *
FROM crosstab(
'SELECT region, product, SUM(amount)
FROM sales
GROUP BY region, product
ORDER BY 1, 2'
) AS ct (region TEXT, product_a NUMERIC, product_b NUMERIC);
四、数据倾斜处理
1. 倾斜键单独处理
-- 识别倾斜键
SELECT key, COUNT(*) AS cnt
FROM large_table
GROUP BY key
ORDER BY cnt DESC
LIMIT 10;
-- 单独处理倾斜键
SELECT
key,
SUM(amount) AS total
FROM (
SELECT key, amount
FROM large_table
WHERE key != '倾斜值'
UNION ALL
SELECT key, amount / 100 AS amount -- 打散处理
FROM large_table
WHERE key = '倾斜值'
) t
GROUP BY key;
2. 开启倾斜优化
-- Hive
SET hive.groupby.skewindata=true;
SET hive.optimize.skewjoin=true;
-- MapJoin自动处理
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000;
五、并发控制
1. 悲观锁
-- Hive/MaxCompute
LOCK TABLE table_name EXCLUSIVE;
UNLOCK TABLE table_name;
2. 乐观锁(应用层)
-- 版本号更新
UPDATE table_name
SET col = 'new_value', version = version + 1
WHERE id = 1 AND version = 1;
六、权限管理
1. Hive授权
-- 角色管理
CREATE ROLE analyst;
GRANT SELECT, INSERT ON DATABASE analytics TO ROLE analyst;
GRANT ROLE analyst TO USER data_analyst;
-- 列级权限
GRANT SELECT (col1, col2) ON TABLE sensitive_data TO ROLE analyst;
2. MaxCompute ACL
-- 项目级授权
GRANT Read, List ON PROJECT project_name TO USER alice@aliyun.com;
-- 表级授权
GRANT Read, Write ON TABLE table_name TO USER bob@aliyun.com;
-- 列级授权
GRANT Read ON TABLE table_name (col1, col2) TO USER carol@aliyun.com;
七、性能优化
1. 执行计划分析
-- Hive
EXPLAIN
SELECT * FROM table WHERE condition;
EXPLAIN EXTENDED
SELECT * FROM table WHERE condition;
-- MaxCompute
EXPLAIN SELECT * FROM table WHERE condition;
2. 向量化执行
-- Hive向量化
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
3. 并行执行
-- Hive并行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
八、数据类型
1. 复杂数据类型
-- ARRAY
SELECT arr[0] FROM t; -- 取第一个元素
SELECT size(arr) FROM t; -- 数组长度
-- MAP
SELECT map_col['key'] FROM t;
SELECT map_keys(map_col) FROM t;
SELECT map_values(map_col) FROM t;
-- STRUCT
SELECT struct_col.field1 FROM t;
2. 相互转换
-- STRING转ARRAY
SELECT split('a,b,c', ',') FROM t; -- ["a","b","c"]
-- ARRAY转STRING
SELECT concat_ws(',', array_col) FROM t;
九、常用技巧
1. 数据采样
-- 随机采样
SELECT * FROM table ORDER BY RAND() LIMIT 1000;
-- 分桶采样
SELECT * FROM bucketed_table TABLESAMPLE (BUCKET 1 OUT OF 32 ON RAND());
2. 数据去重
-- 精确去重
SELECT COUNT(DISTINCT col) FROM t;
-- 近似去重(大数据量)
SELECT APPROX_COUNT_DISTINCT(col) FROM t;
-- 使用Bloom Filter
SELECT cardinality(bloom_filter(col)) FROM t;
3. 多行转一行
-- Hive
SELECT user_id, concat_ws(',', collect_set(product_name)) AS products
FROM orders
GROUP BY user_id;
-- MaxCompute
SELECT user_id, concat_ws(',', collect_set(product_name)) AS products
FROM orders
GROUP BY user_id;
十、UDF使用
1. 内置函数
-- 字符串函数
SELECT length('hello');
SELECT substr('hello', 1, 3);
SELECT regexp_extract('hello', '(hel)lo', 1);
-- 日期函数
SELECT date_add('2024-01-14', 7);
SELECT datediff('2024-01-14', '2024-01-01');
-- 聚合函数
SELECT variance(col) FROM t;
SELECT stddev(col) FROM t;
2. 自定义UDF
-- 注册UDF(需提前创建)
CREATE FUNCTION my_lower AS 'com.example.LowerUDF';
-- 使用UDF
SELECT my_lower(name) FROM t;
十一、调度相关
1. 依赖设置
-- Airflow DAG示例
with DAG('daily_report', schedule='@daily') as dag:
task1 = BashOperator(task_id='hive_query', bash_command='hive -f query.sql')
task2 = PythonOperator(task_id='run_python', python_callable=process_data)
task1 >> task2
2. 增量同步
-- 增量加载
INSERT OVERWRITE TABLE target_table PARTITION (dt)
SELECT *
FROM source_table
WHERE dt >= '${bizdate}';
十二、问题排查
1. 查看任务状态
-- Hive
SHOW PARTITIONS table_name;
DESCRIBE FORMATTED table_name;
-- MaxCompute
SHOW TABLES;
DESC TABLE table_name;
2. 查看日志
-- Hive日志位置
-- /tmp/${user}/hive.log
-- Application日志
-- yarn logs -applicationId application_xxx
3. 常见错误
| 错误 | 原因 | 解决方案 |
|---|---|---|
| GC overhead limit exceeded | 内存不足 | 增加内存配置 |
| Container killed by YARN | 资源不足 | 调整资源配额 |
| Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES | 数据倾斜 | 处理倾斜 |
順子の杂货铺


