-- 查看User Volume内容
SHOW USER VOLUME DIRECTORY;
-- 列出所有文件
LIST USER VOLUME;
-- 按正则表达式过滤
LIST USER VOLUME REGEXP = '.*\.csv';
-- 按子目录查看
LIST USER VOLUME SUBDIRECTORY 'temp/';
特点:
✅ 即开即用,无需配置
✅ 适合临时数据存储
✅ 用户默认具有管理权限
❌ 不支持DIRECTORY函数
❌ 不支持PIPE直接监控
Table Volume(表卷)
适用场景: 表级数据管理、ETL中间结果、表相关文件存储
-- 查看Table Volume目录
SHOW TABLE VOLUME DIRECTORY table_name;
-- 列出Table Volume文件
LIST TABLE VOLUME table_name;
-- 按正则表达式过滤
LIST TABLE VOLUME table_name REGEXP = '.*\.parquet';
-- 查看特定子目录
LIST TABLE VOLUME table_name SUBDIRECTORY 'backup/';
特点:
✅ 每个表自动拥有独立Volume空间
✅ 与表生命周期和权限绑定
✅ 支持标准Volume操作
✅ 便于表级数据管理和备份
External Volume(外部卷)
适用场景: 正式数据湖、生产数据存储、跨系统数据共享
创建External Volume
-- 步骤1: 创建Storage Connection
CREATE STORAGE CONNECTION my_oss_connection
TYPE = 'OSS' -- 支持多种存储类型: OSS/S3/COS/GCS
PROPERTIES = (
'access_key' = 'your_access_key',
'secret_key' = 'your_secret_key',
'endpoint' = 'oss-region.aliyuncs.com'
);
-- 步骤2: 创建External Volume
CREATE VOLUME my_external_volume
WITH CONNECTION = my_oss_connection
LOCATION = 'oss://bucket-name/path/'
DIRECTORY = (enable = TRUE) -- 启用DIRECTORY功能
COMMENT = '外部存储卷';
-- 步骤3: 设置访问权限
GRANT READ VOLUME ON VOLUME my_schema.my_external_volume TO ROLE data_analyst;
GRANT WRITE VOLUME ON VOLUME my_schema.my_external_volume TO ROLE data_engineer;
使用External Volume
-- 查看所有外部Volume
SHOW VOLUMES;
-- 检查Volume配置
DESC VOLUME schema_name.volume_name;
-- 列出Volume文件
LIST VOLUME schema_name.volume_name;
-- 高级过滤
LIST VOLUME schema_name.volume_name
SUBDIRECTORY 'data/2024/'
REGEXP = '.*\.parquet';
-- 基础列表查询
LIST USER VOLUME;
LIST TABLE VOLUME my_table;
LIST VOLUME mcp_demo.data_volume;
-- 子目录查询
LIST USER VOLUME SUBDIRECTORY 'reports/2024/';
LIST TABLE VOLUME my_table SUBDIRECTORY 'backups/';
LIST VOLUME mcp_demo.data_volume SUBDIRECTORY 'csv_data/';
-- 正则表达式过滤
LIST USER VOLUME REGEXP = '.*\.parquet';
LIST TABLE VOLUME my_table REGEXP = '.*backup.*';
LIST VOLUME mcp_demo.data_volume REGEXP = '.*month=0[1-5].*';
-- 组合查询:子目录+正则表达式
LIST VOLUME mcp_demo.data_volume
SUBDIRECTORY 't_search_log'
REGEXP = '.*c000';
-- Parquet导出(推荐,自带内置压缩)
COPY INTO USER VOLUME
SUBDIRECTORY 'parquet_data/'
FROM my_table
FILE_FORMAT = (TYPE = PARQUET); -- 自动应用最优压缩
-- TEXT格式导出(纯文本)
COPY INTO USER VOLUME
SUBDIRECTORY 'text_output/'
FROM my_table
FILE_FORMAT = (TYPE = TEXT);
-- 导入时的压缩参数设置(在OPTIONS中)
COPY INTO target_table
FROM USER VOLUME
USING CSV
OPTIONS('header'='true')
FILES ('data.csv');
🔧 文件格式参数说明
-- CSV导入时的参数设置
COPY INTO target_table
FROM USER VOLUME
USING CSV
OPTIONS(
'header'='true', -- 包含表头
'sep'='|' -- 分隔符
)
FILES ('special_format.csv');
-- 导入错误处理
-- 当遇到错误时,系统会默认返回详细的错误信息
COPY INTO target_table
FROM USER VOLUME
USING CSV
OPTIONS('header'='true')
FILES ('import_data.csv');
-- 对于可能的格式错误,建议先导入小样本进行验证
COPY INTO target_table
FROM USER VOLUME
USING CSV
OPTIONS('header'='true')
FILES ('sample_data.csv');
🧹 PURGE自动清理功能
PURGE功能可以在数据导入成功后自动删除源文件,节省存储空间:
-- 导入成功后自动删除源文件
COPY INTO target_table
FROM USER VOLUME
(id INT, name STRING, value DOUBLE)
USING CSV
FILES ('import_data.csv')
PURGE = TRUE; -- ⚠️ 导入成功后源文件将被永久删除
-- 完整导入示例
COPY INTO sales_data
FROM VOLUME data_lake_volume
(order_id BIGINT, customer_name STRING, amount DECIMAL(10,2), order_date DATE)
USING CSV
OPTIONS('header'='true', 'sep'=',')
SUBDIRECTORY 'daily_sales/'
PURGE = TRUE; -- 成功后清理
-- 设置强制生成外部可访问URL
SET cz.sql.function.get.presigned.url.force.external=true;
-- 生成外部URL(推荐用于分享)
SELECT GET_PRESIGNED_URL(USER VOLUME, 'report.pdf', 7200) AS external_url;
-- 批量生成URL(用于文件分享清单)
SELECT
relative_path,
size,
GET_PRESIGNED_URL(USER VOLUME, relative_path,
CASE
WHEN relative_path LIKE '%temp%' THEN 1800 -- 临时文件30分钟
WHEN relative_path LIKE '%archive%' THEN 86400 -- 归档文件24小时
ELSE 3600 -- 默认1小时
END
) AS access_url
FROM (SHOW USER VOLUME DIRECTORY)
WHERE relative_path LIKE '%.csv';
⏰ 有效期管理策略
-- 短期访问(30分钟)- 适用于临时下载
SELECT GET_PRESIGNED_URL(USER VOLUME, 'temp.csv', 1800) AS short_url;
-- 标准访问(1小时)- 适用于常规操作
SELECT GET_PRESIGNED_URL(USER VOLUME, 'data.csv', 3600) AS standard_url;
-- 长期访问(12小时)- 适用于大文件下载
SELECT GET_PRESIGNED_URL(USER VOLUME, 'large_archive.zip', 43200) AS long_url;
-- 超长期访问(7天)- 适用于跨时区协作
SELECT GET_PRESIGNED_URL(USER VOLUME, 'shared_report.xlsx', 604800) AS week_url;
-- 获取文件基本信息
SELECT relative_path, size, last_modified_time
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
LIMIT 10;
-- 按文件类型统计分析
SELECT
CASE
WHEN relative_path LIKE '%.csv' THEN 'CSV'
WHEN relative_path LIKE '%.parquet' THEN 'Parquet'
WHEN relative_path LIKE '%.json' THEN 'JSON'
WHEN relative_path LIKE '%.txt' THEN 'TEXT'
ELSE 'Other'
END AS file_type,
COUNT(*) as file_count,
SUM(CAST(size AS BIGINT)) as total_size_bytes,
ROUND(SUM(CAST(size AS BIGINT))/1024/1024, 2) as total_size_mb,
AVG(CAST(size AS BIGINT)) as avg_size_bytes
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
GROUP BY
CASE
WHEN relative_path LIKE '%.csv' THEN 'CSV'
WHEN relative_path LIKE '%.parquet' THEN 'Parquet'
WHEN relative_path LIKE '%.json' THEN 'JSON'
WHEN relative_path LIKE '%.txt' THEN 'TEXT'
ELSE 'Other'
END
ORDER BY total_size_bytes DESC;
🚀 高级组合应用
-- 结合预签名URL生成文件访问清单
SELECT
relative_path,
ROUND(CAST(size AS BIGINT)/1024/1024, 2) as size_mb,
last_modified_time,
GET_PRESIGNED_URL(VOLUME mcp_demo.data_volume, relative_path, 3600) AS access_url
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
WHERE relative_path LIKE '%.csv'
AND CAST(size AS BIGINT) > 1000000 -- 过滤大于1MB的文件
ORDER BY last_modified_time DESC
LIMIT 10;
-- 生成数据目录报告
SELECT
CASE
WHEN instr(relative_path, '/') > 0
THEN substr(relative_path, 1, instr(relative_path, '/') - 1)
ELSE 'root'
END as directory,
COUNT(*) as file_count,
SUM(CAST(size AS BIGINT)) as total_size,
MAX(last_modified_time) as latest_modified
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
WHERE relative_path LIKE '%/%'
GROUP BY
CASE
WHEN instr(relative_path, '/') > 0
THEN substr(relative_path, 1, instr(relative_path, '/') - 1)
ELSE 'root'
END
ORDER BY total_size DESC;
📈 数据治理与分析应用
-- 文件老化分析
SELECT
CASE
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '7' DAY THEN 'Recent (7d)'
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '30' DAY THEN 'Medium (30d)'
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '90' DAY THEN 'Old (90d)'
ELSE 'Very Old (>90d)'
END AS age_category,
COUNT(*) as file_count,
ROUND(SUM(CAST(size AS BIGINT))/1024/1024, 2) as size_mb,
ROUND(AVG(CAST(size AS BIGINT))/1024, 2) as avg_size_kb
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
GROUP BY
CASE
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '7' DAY THEN 'Recent (7d)'
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '30' DAY THEN 'Medium (30d)'
WHEN last_modified_time >= CURRENT_DATE - INTERVAL '90' DAY THEN 'Old (90d)'
ELSE 'Very Old (>90d)'
END
ORDER BY
CASE age_category
WHEN 'Recent (7d)' THEN 1
WHEN 'Medium (30d)' THEN 2
WHEN 'Old (90d)' THEN 3
ELSE 4
END;
-- 识别需要清理的文件
SELECT
relative_path,
ROUND(CAST(size AS BIGINT)/1024/1024, 2) as size_mb,
last_modified_time,
datediff(CURRENT_DATE, DATE(last_modified_time)) AS days_old
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
WHERE last_modified_time < CURRENT_DATE - INTERVAL '90' DAY
AND (relative_path LIKE '%temp%' OR relative_path LIKE '%tmp%')
ORDER BY last_modified_time ASC;
-- 1. 确认Virtual Cluster状态
SHOW VCLUSTERS;
-- 2. 启动Virtual Cluster(如需要)
ALTER VCLUSTER DEFAULT RESUME;
-- 3. 确认使用External Volume(仅External Volume支持PIPE)
SHOW VOLUMES;
DESC VOLUME schema_name.volume_name;
🚰 创建PIPE管道
-- 创建目标表
CREATE TABLE IF NOT EXISTS auto_import_target (
id INT,
name STRING,
data_content STRING,
ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
source_file STRING -- 记录来源文件
);
-- 创建基础PIPE(仅支持External Volume)
CREATE PIPE IF NOT EXISTS data_auto_import_pipe
VIRTUAL_CLUSTER = 'DEFAULT'
INGEST_MODE = 'LIST_PURGE'
COMMENT 'External Volume自动数据导入管道'
AS COPY INTO auto_import_target (id, name, data_content)
FROM VOLUME mcp_demo.data_volume
(id INT, name STRING, data_content STRING)
USING CSV
OPTIONS('header'='false')
PURGE = true;
-- 创建优化PIPE(生产环境推荐)
CREATE PIPE optimized_auto_pipe
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = 60 -- 批处理间隔
BATCH_SIZE_PER_KAFKA_PARTITION = 10000 -- 批处理大小
INGEST_MODE = 'LIST_PURGE'
COMMENT '生产级自动导入管道'
AS COPY INTO auto_import_target (id, name, data_content)
FROM VOLUME mcp_demo.data_volume
(id INT, name STRING, data_content STRING)
USING CSV
OPTIONS('header'='true')
PURGE = true; -- 处理后清理
📊 PIPE管理与监控
-- 查看所有PIPE
SHOW PIPES;
-- 查看PIPE详细信息
DESC PIPE data_auto_import_pipe;
-- 暂停PIPE(正确语法)
ALTER PIPE data_auto_import_pipe SET PIPE_EXECUTION_PAUSED = true;
-- 恢复PIPE(正确语法)
ALTER PIPE data_auto_import_pipe SET PIPE_EXECUTION_PAUSED = false;
-- 删除PIPE
DROP PIPE IF EXISTS data_auto_import_pipe;
⚠️ PIPE使用限制与注意事项
仅支持External Volume: User Volume和Table Volume不支持PIPE监控
-- 基础导出到User Volume
COPY INTO USER VOLUME
SUBDIRECTORY 'exports/2024/'
FROM my_table
FILE_FORMAT = (TYPE = CSV);
-- 导出到Table Volume(表专属空间)
COPY INTO TABLE VOLUME my_table
SUBDIRECTORY 'backups/'
FROM my_table
FILE_FORMAT = (TYPE = PARQUET);
-- 导出查询结果(复杂查询)
COPY INTO USER VOLUME
SUBDIRECTORY 'reports/'
FROM (
SELECT
customer_id,
customer_name,
SUM(order_amount) as total_amount,
COUNT(*) as order_count,
MAX(order_date) as last_order_date
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY customer_id, customer_name
HAVING SUM(order_amount) > 10000
)
FILE_FORMAT = (TYPE = CSV);
-- 使用Parquet格式导出(推荐)
COPY INTO USER VOLUME
SUBDIRECTORY 'compressed_exports/'
FROM my_table
FILE_FORMAT = (TYPE = PARQUET);
📥 数据导入高级应用
-- 从User Volume导入
COPY INTO target_table
FROM USER VOLUME
(id INT, name STRING, created_time TIMESTAMP_NTZ)
USING CSV
OPTIONS('header'='true')
FILES ('exports/data.csv')
PURGE = FALSE; -- 保留源文件用于错误分析
-- 从Table Volume导入(指定多个文件)
COPY INTO target_table
FROM TABLE VOLUME source_table
(id INT, name STRING, backup_time TIMESTAMP_NTZ)
USING PARQUET
FILES ('backups/backup_20240529.parquet');
-- 第二个文件需单独导入
COPY INTO target_table
FROM TABLE VOLUME source_table
(id INT, name STRING, backup_time TIMESTAMP_NTZ)
USING PARQUET
FILES ('backups/backup_20240530.parquet');
-- 从External Volume导入(使用子目录)
COPY INTO target_table
FROM VOLUME mcp_demo.data_volume
(id INT, name STRING, value DOUBLE, load_date DATE)
USING CSV
SUBDIRECTORY 'processed/' -- 处理整个目录
OPTIONS('header'='true');
-- JSON格式导入示例
COPY INTO json_target_table
FROM VOLUME data_volume
USING JSON
SUBDIRECTORY 'json_data/';
🔄 批量数据处理工作流
-- 完整的ETL工作流示例
-- Step 1: 导出原始数据进行处理
COPY INTO USER VOLUME
SUBDIRECTORY 'etl/raw/'
FROM (SELECT * FROM raw_table WHERE status = 'pending' AND created_date >= CURRENT_DATE)
FILE_FORMAT = (TYPE = CSV);
-- Step 2: 使用Table Volume进行表级备份
COPY INTO TABLE VOLUME important_table
SUBDIRECTORY 'daily_backup/2024-05-29/'
FROM important_table
FILE_FORMAT = (TYPE = PARQUET);
-- Step 3: 处理完成后导入(通常由外部ETL工具完成处理步骤)
COPY INTO processed_table
FROM USER VOLUME
(id INT, name STRING, processed_value DOUBLE, processed_time TIMESTAMP_NTZ)
USING CSV
OPTIONS('header'='true')
SUBDIRECTORY 'etl/processed/'
PURGE = TRUE; -- 导入成功后清理临时文件
-- Step 4: 验证数据质量
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT id) as unique_ids,
MAX(processed_time) as latest_processed,
MIN(processed_time) as earliest_processed
FROM processed_table
WHERE DATE(processed_time) = CURRENT_DATE;
SELECT
'User Volume' as volume_type,
COUNT(*) as file_count,
SUM(CAST(size AS BIGINT)) / 1024 / 1024 as total_size_mb,
AVG(CAST(size AS BIGINT)) / 1024 as avg_file_size_kb,
MIN(CAST(size AS BIGINT)) as min_size,
MAX(CAST(size AS BIGINT)) as max_size
FROM (SHOW USER VOLUME DIRECTORY)
UNION ALL
SELECT
'External Volume' as volume_type,
COUNT(*) as file_count,
SUM(CAST(size AS BIGINT)) / 1024 / 1024 as total_size_mb,
AVG(CAST(size AS BIGINT)) / 1024 as avg_file_size_kb,
MIN(CAST(size AS BIGINT)) as min_size,
MAX(CAST(size AS BIGINT)) as max_size
FROM DIRECTORY(VOLUME mcp_demo.data_volume);
🔍 识别小文件问题
SELECT
relative_path,
CAST(size AS BIGINT) as size_bytes,
ROUND(CAST(size AS BIGINT) / 1024, 2) as size_kb
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
WHERE CAST(size AS BIGINT) < 1048576 -- 小于1MB的文件
ORDER BY size_bytes ASC
LIMIT 20;
🔧 Volume文件合并策略
-- 1. 使用导出合并
-- 从多个小文件导入后再导出为单个大文件
COPY INTO consolidated_table
FROM VOLUME mcp_demo.data_volume
(id INT, name STRING, value DOUBLE)
USING CSV
SUBDIRECTORY 'small_files/';
COPY INTO VOLUME mcp_demo.data_volume
SUBDIRECTORY 'consolidated/'
FROM consolidated_table
FILE_FORMAT = (TYPE = PARQUET);
-- 2. 定期归档合并
-- 将旧数据合并为较大文件存档
COPY INTO archive_table
FROM VOLUME mcp_demo.data_volume
(id INT, name STRING, date_field DATE, value DOUBLE)
USING CSV
SUBDIRECTORY 'daily_data/'
FILES ('day_20240101.csv', 'day_20240102.csv', 'day_20240103.csv');
COPY INTO VOLUME mcp_demo.data_volume
SUBDIRECTORY 'archive/2024/01/'
FROM archive_table
FILE_FORMAT = (TYPE = PARQUET);
-- User Volume权限(用户默认拥有管理权限)
-- 无需额外授权,用户对自己的User Volume拥有完全控制权
-- Table Volume权限(与表权限联动)
-- 需要对应表的权限才能操作Table Volume:
-- SELECT 权限 → SHOW/LIST/GET 操作
-- INSERT/UPDATE/DELETE 权限 → PUT/REMOVE 操作
-- External Volume权限(需要显式授权)
GRANT READ VOLUME ON VOLUME schema_name.external_volume_name TO USER username;
GRANT WRITE VOLUME ON VOLUME schema_name.external_volume_name TO USER username;
GRANT ALL ON VOLUME schema_name.external_volume_name TO ROLE data_engineer;
👥 权限管理最佳实践
-- 创建角色化权限管理
CREATE ROLE data_reader;
CREATE ROLE data_writer;
CREATE ROLE data_admin;
-- 批量权限授予
GRANT READ VOLUME ON VOLUME schema_name.shared_volume TO ROLE data_reader;
GRANT READ VOLUME, WRITE VOLUME ON VOLUME schema_name.shared_volume TO ROLE data_writer;
GRANT ALL ON VOLUME schema_name.shared_volume TO ROLE data_admin;
-- 用户角色分配
GRANT ROLE data_reader TO USER analyst_team;
GRANT ROLE data_writer TO USER etl_team;
GRANT ROLE data_admin TO USER admin_user;
-- 查看Volume权限
SHOW GRANTS ON VOLUME schema_name.volume_name;
-- 查看用户权限
SHOW GRANTS TO USER username;
-- 回收权限
REVOKE WRITE VOLUME ON VOLUME schema_name.volume_name FROM USER username;
🛡️ 安全配置建议
-- 预签名URL安全设置
SET cz.sql.function.get.presigned.url.force.external=true; -- 仅在需要时启用
-- 定期权限审计
SHOW GRANTS ON VOLUME schema_name.volume_name;
-- 存储空间分析
SELECT
volume_type,
file_format,
file_count,
ROUND(total_size_gb, 2) as size_gb,
ROUND(total_size_gb * 0.15, 2) as estimated_monthly_cost_usd -- 估算成本
FROM (
SELECT
'External Volume' as volume_type,
CASE
WHEN relative_path LIKE '%.parquet' THEN 'Parquet'
WHEN relative_path LIKE '%.csv' THEN 'CSV'
WHEN relative_path LIKE '%.json' THEN 'JSON'
ELSE 'Other'
END as file_format,
COUNT(*) as file_count,
SUM(CAST(size AS BIGINT)) / 1024 / 1024 / 1024 as total_size_gb
FROM DIRECTORY(VOLUME mcp_demo.data_volume)
GROUP BY
CASE
WHEN relative_path LIKE '%.parquet' THEN 'Parquet'
WHEN relative_path LIKE '%.csv' THEN 'CSV'
WHEN relative_path LIKE '%.json' THEN 'JSON'
ELSE 'Other'
END
) subq
ORDER BY total_size_gb DESC;
-- 解决方案1:检查权限
SHOW GRANTS TO USER your_username;
-- 解决方案2:检查Volume状态
DESC VOLUME schema_name.volume_name;
-- 解决方案3:申请相应权限
-- 需要管理员执行:
-- GRANT READ VOLUME ON VOLUME schema_name.volume_name TO USER username;
-- 解决方案:验证文件格式和表结构
-- 1. 首先查看文件内容
LIST USER VOLUME REGEXP = '.*\.csv';
-- 2. 使用小样本测试导入
COPY INTO test_target
FROM USER VOLUME
USING CSV
OPTIONS('header'='true')
FILES ('sample.csv');
问题: 文件意外被删除
-- 预防方案:谨慎使用PURGE参数
COPY INTO target_table
FROM VOLUME my_volume
USING CSV
PURGE = FALSE; -- 保留源文件
-- 建议:重要数据使用备份策略
COPY INTO TABLE VOLUME backup_table
SUBDIRECTORY 'daily_backup/'
FROM source_table
FILE_FORMAT = (TYPE = PARQUET);