-- 查询近 7 天所有作业执行记录
SELECT
job_id,
workspace_name,
job_creator AS operator,
job_type,
status,
start_time,
end_time,
execution_time,
LEFT(job_text, 200) AS sql_preview
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 7 DAY
ORDER BY start_time DESC
LIMIT 100;
DDL 变更审计
-- 查询近 30 天所有 DDL 操作(CREATE / DROP / ALTER)
SELECT
job_id,
workspace_name,
job_creator AS operator,
job_type,
start_time,
LEFT(job_text, 300) AS ddl_statement
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 30 DAY
AND job_type IN ('CREATE', 'DROP', 'ALTER')
ORDER BY start_time DESC;
按用户审计操作记录
-- 统计每个用户的作业数量和成功率
SELECT
job_creator AS user_name,
COUNT(*) AS total_jobs,
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_jobs,
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_jobs,
ROUND(SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS success_rate
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY job_creator
ORDER BY total_jobs DESC;
数据访问审计(谁看了什么数据)
敏感表访问记录
-- 查询访问特定表的作业记录
SELECT
job_id,
job_creator AS operator,
workspace_name,
start_time,
status,
execution_time,
LEFT(job_text, 200) AS sql_preview
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 7 DAY
AND input_tables LIKE '%sensitive_table_name%'
ORDER BY start_time DESC;
数据导出审计
-- 查询 COPY INTO 导出操作
SELECT
job_id,
job_creator AS operator,
workspace_name,
start_time,
execution_time,
LEFT(job_text, 300) AS export_statement
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 30 DAY
AND job_text LIKE '%COPY INTO%'
AND (job_text LIKE '%VOLUME%' OR job_text LIKE '%LOCATION%')
ORDER BY start_time DESC;
异常访问模式检测
-- 检测非工作时间(22:00-06:00)的大批量数据读取
SELECT
job_id,
job_creator AS operator,
workspace_name,
start_time,
HOUR(start_time) AS access_hour,
CAST(input_bytes AS BIGINT) / 1024 / 1024 AS data_read_mb,
LEFT(job_text, 200) AS sql_preview
FROM sys.information_schema.job_history
WHERE start_time >= CURRENT_DATE() - INTERVAL 30 DAY
AND HOUR(start_time) NOT BETWEEN 6 AND 22
AND CAST(input_bytes AS BIGINT) > 1024 * 1024 * 1024 -- 读取超过 1GB
ORDER BY start_time DESC;
跨工作空间数据访问
-- 查询跨工作空间访问数据的作业
SELECT DISTINCT
j.workspace_name AS source_workspace,
GET_JSON_OBJECT(j.input_tables, '$.table[0].namespace[0]') AS accessed_workspace,
GET_JSON_OBJECT(j.input_tables, '$.table[0].tableName') AS accessed_table,
j.job_creator AS operator,
j.start_time
FROM sys.information_schema.job_history j
WHERE j.start_time >= CURRENT_DATE() - INTERVAL 7 DAY
AND j.input_tables IS NOT NULL
AND j.input_tables != '{"table":[]}'
AND GET_JSON_OBJECT(j.input_tables, '$.table[0].namespace[0]') != j.workspace_name
ORDER BY j.start_time DESC;
数据变更审计(数据被怎么改了)
表结构变更历史
-- 查看表的变更历史(DDL 操作记录)
DESC HISTORY table_name;
返回结果包含:
操作类型(CREATE / ALTER / DROP)
操作时间
操作人
变更内容
数据变更回溯(Time Travel)
-- 查看表在某个时间点的数据快照
SELECT * FROM table_name TIMESTAMP AS OF '2025-06-01 10:00:00';
-- 对比两个时间点的数据差异(时间戳必须是字面量,不支持表达式)
SELECT * FROM table_name TIMESTAMP AS OF '2025-06-01 10:00:00'
WHERE id NOT IN (
SELECT id FROM table_name TIMESTAMP AS OF '2025-06-01 09:00:00'
);
⚠️
TIMESTAMP AS OF
TIMESTAMP AS OF
只接受字符串字面量,不支持
CURRENT_DATE() - INTERVAL 1 DAY
CURRENT_DATE() - INTERVAL 1 DAY
等表达式。
误操作恢复审计
-- 查看表的所有历史版本
DESC HISTORY table_name;
-- 恢复到指定版本
RESTORE TABLE table_name TO VERSION version_number;
权限变更审计
通过账户中心查看权限变更
登录账户中心,进入 审计 → 权限审计,可查看:
GRANT / REVOKE 操作记录
角色创建 / 删除记录
用户加入 / 移出工作空间记录
按时间范围、操作人、目标用户筛选
当前权限状态快照
-- 查看所有用户的角色分配
SELECT
user_name,
role_names,
workspace_name
FROM information_schema.users
WHERE role_names IS NOT NULL AND role_names != ''
ORDER BY user_name;
-- 查看所有角色的分配情况
SELECT
role_name,
user_names,
comment
FROM information_schema.roles
ORDER BY role_name;
高权限用户审计
-- 查询拥有 admin 权限的用户
SELECT
user_name,
role_names,
workspace_name
FROM information_schema.users
WHERE role_names LIKE '%admin%'
ORDER BY workspace_name, user_name;
-- 生成月度安全审计摘要(时间范围用字面量填写,如 '2025-05-01')
SELECT
'审计周期' AS item, '2025-05-01 至 2025-05-31' AS value
UNION ALL
SELECT '总作业数', CAST(COUNT(*) AS STRING)
FROM sys.information_schema.job_history
WHERE start_time >= '2025-05-01' AND start_time < '2025-06-01'
UNION ALL
SELECT '成功作业数', CAST(SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS STRING)
FROM sys.information_schema.job_history
WHERE start_time >= '2025-05-01' AND start_time < '2025-06-01'
UNION ALL
SELECT '失败作业数', CAST(SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS STRING)
FROM sys.information_schema.job_history
WHERE start_time >= '2025-05-01' AND start_time < '2025-06-01'
UNION ALL
SELECT '活跃用户数', CAST(COUNT(DISTINCT job_creator) AS STRING)
FROM sys.information_schema.job_history
WHERE start_time >= '2025-05-01' AND start_time < '2025-06-01'
UNION ALL
SELECT 'DDL 操作数', CAST(COUNT(*) AS STRING)
FROM sys.information_schema.job_history
WHERE start_time >= '2025-05-01' AND start_time < '2025-06-01'
AND job_type IN ('CREATE', 'DROP', 'ALTER')
UNION ALL
SELECT '权限变更次数', '通过账户中心审计功能查看';
⚠️
job_history
job_history
的时间条件必须使用字符串字面量(如
'2025-05-01'
'2025-05-01'
),不支持
CURRENT_DATE() - INTERVAL N DAY
CURRENT_DATE() - INTERVAL N DAY
表达式。
审计数据保留建议
数据类型
建议保留周期
说明
作业历史
90 天
sys.information_schema.job_history
sys.information_schema.job_history
默认保留
表版本历史
按
data_retention_days
data_retention_days
设置
默认 1 天,可调整至 90 天
账户中心审计记录
按账户中心配置
建议至少保留 180 天
自动化审计实践
创建审计监控视图
-- 创建每日审计摘要视图
CREATE VIEW daily_audit_summary AS
SELECT
DATE(start_time) AS audit_date,
workspace_name,
COUNT(*) AS total_jobs,
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_jobs,
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_jobs,
COUNT(DISTINCT job_creator) AS active_users,
SUM(CASE WHEN job_type IN ('CREATE', 'DROP', 'ALTER') THEN 1 ELSE 0 END) AS ddl_operations,
SUM(CAST(input_bytes AS BIGINT)) / 1024 / 1024 / 1024 AS total_data_read_gb
FROM sys.information_schema.job_history
GROUP BY DATE(start_time), workspace_name;
定期审计任务调度
在 Studio 中创建周期性 SQL 任务,每日执行审计查询并导出结果:
-- 每日安全审计任务
INSERT INTO audit_log
SELECT
CURRENT_TIMESTAMP() AS audit_time,
'daily_security_check' AS check_type,
COUNT(*) AS anomaly_count,
'非工作时间大批量数据读取' AS description
FROM sys.information_schema.job_history
WHERE DATE(start_time) = CURRENT_DATE() - INTERVAL 1 DAY
AND HOUR(start_time) NOT BETWEEN 6 AND 22
AND CAST(input_bytes AS BIGINT) > 1024 * 1024 * 1024;