COPY INTO

功能概述

将数据从对象存储文件批量加载到表中,支持 OSS/COS/S3,通过 Volume 挂载访问。适合一次性或定时批量入湖;如需持续自动导入新增文件,使用 Pipe。支持在

FROM (SELECT ...)
FROM (SELECT ...)
子查询中对数据做转换,详见结构化、半结构化数据分析

语法

-- 直接从 VOLUME 导入 COPY INTO|OVERWRITE <table_name> [PARTITION (<partition_column> = <partition_value>)] FROM VOLUME <volume_name> [(<column_name> <column_type>, ...)] USING CSV | PARQUET | ORC | BSON [OPTIONS(<key> = <value>, ...)] [FILES('<file1>', '<file2>', ...) | SUBDIRECTORY '<path>' | REGEXP '<pattern>'] [PURGE = TRUE] [ON_ERROR = CONTINUE | ABORT]; -- 导入时转换数据(需在 VOLUME 子句中显式声明列名和类型) COPY INTO <table_name> FROM ( SELECT <expr>, ... FROM VOLUME <volume_name>(<column_name> <column_type>, ...) USING CSV | PARQUET | ORC [OPTIONS(<key> = <value>, ...)] [FILES('<file>') | SUBDIRECTORY '<path>' | REGEXP '<pattern>'] ) [PURGE = TRUE] [ON_ERROR = CONTINUE | ABORT];

参数说明

  • INTO
    INTO
    :追加模式,新数据追加到目标表,不影响现有数据。

  • OVERWRITE
    OVERWRITE
    :覆盖模式,先清空目标表再导入新数据。

  • PARTITION (column = value)
    PARTITION (column = value)
    :直接指定分区列的值,例如
    PARTITION (dt='2024-01')
    PARTITION (dt='2024-01')
    。PARTITION 子句紧跟表名,写在 FROM 之前。文件中只需包含非分区列,分区值由此子句指定。

  • column_name / column_type
    column_name / column_type
    :可选,指定 Volume 中文件的列名和类型。推荐不填,Lakehouse 会自动识别:

    • CSV:按分隔符拆分后自动生成字段名,从
      f0
      f0
      开始,类型推断为 int、double、string、bool;指定
      header='true'
      header='true'
      时使用 CSV 文件的 header 行作为列名
    • Parquet / ORC:根据文件中存储的字段名和类型自动识别;列数不一致时尝试合并,无法合并则报错
    • FROM (SELECT ...)
      FROM (SELECT ...)
      子查询中引用列时,必须在 VOLUME 子句中显式声明列名和类型,否则无法解析列名
  • USING
    USING
    :指定文件格式,支持
    CSV
    CSV
    PARQUET
    PARQUET
    ORC
    ORC
    BSON
    BSON

  • OPTIONS
    OPTIONS
    :文件格式参数,多个参数用逗号分隔,格式为
    'key'='value'
    'key'='value'

    CSV 格式参数:

参数说明默认值
sep
sep
列分隔符,最多 2 个字符
,
,
header
header
是否将第一行作为列名
false
false
compression
compression
压缩格式:
gzip
gzip
zstd
zstd
zlib
zlib
lineSep
lineSep
行分隔符,最多 2 个字符
\r\n
\r\n
\n
\n
quote
quote
包裹含特殊字符字段的引用符;字段内含双引号时需改用其他字符,如
r'\0'
r'\0'
"
"
escape
escape
转义字符,仅支持单字节
\
\
nullValue
nullValue
视为 NULL 的字符串,支持
r
r
前缀避免转义歧义,如
r'\N'
r'\N'
;空字段默认识别为 NULL,无需额外配置
""
""
timeZone
timeZone
时间字段的时区,如
'Asia/Shanghai'
'Asia/Shanghai'
multiLine
multiLine
是否允许跨行的 CSV 记录
false
false

JSON 格式参数:

参数说明默认值
compression
compression
压缩格式:
gzip
gzip
explodeArray
explodeArray
JSON 内容以数组开头时:
true
true
表示 schema 是数组内单个元素,
false
false
表示 schema 是数组本身
true
true

Parquet、ORC、BSON 格式无额外参数。

  • FILES
    FILES
    :指定要导入的具体文件,支持多个,如
    FILES('a.parquet', 'b.parquet')
    FILES('a.parquet', 'b.parquet')

  • SUBDIRECTORY
    SUBDIRECTORY
    :指定子目录,递归加载该目录下所有文件,如
    SUBDIRECTORY 'month=02'
    SUBDIRECTORY 'month=02'

  • REGEXP
    REGEXP
    :正则表达式匹配文件,匹配目标是文件的完整对象存储路径(如
    s3://bucket/path/file.csv
    s3://bucket/path/file.csv
    ),而非 Volume 内的相对路径。可通过
    SHOW VOLUME DIRECTORY <volume>
    SHOW VOLUME DIRECTORY <volume>
    查看完整路径。示例:
    REGEXP 'part-.*.parquet'
    REGEXP 'part-.*.parquet'

  • PURGE = TRUE
    PURGE = TRUE
    :导入成功后删除对象存储中的源文件。导入失败时不删除。

  • ON_ERROR
    ON_ERROR
    :遇到错误时的处理策略,指定后会返回每个文件的导入状态:

列名说明
file
file
文件完整路径
status
status
SUCCESS
SUCCESS
LOADED_FAILED
LOADED_FAILED
rows_loaded
rows_loaded
成功导入的行数
first_error
first_error
第一条错误信息
  • CONTINUE
    CONTINUE
    :跳过格式不匹配的文件,继续加载其余文件
  • ABORT
    ABORT
    :遇到任何错误立即终止整个 COPY 操作

注意事项

  • 导入数据时建议选择通用型计算集群(GENERAL PURPOSE VIRTUAL CLUSTER),更适合批量作业和数据加载。
  • 建议在同一 Region 内导入,避免公网传输费用。同一 Region 且同一云厂商内,数据传输通过内网进行。

使用示例

1. 从 User Volume 导入

CREATE TABLE birds ( id INT, name VARCHAR(50), wingspan_cm FLOAT, colors STRING ); PUT '/Users/Downloads/data.csv' TO USER VOLUME FILE 'data.csv'; COPY INTO birds FROM USER VOLUME USING csv OPTIONS('header'='true') FILES ('data.csv') PURGE = TRUE;

2. 从 Table Volume 导入

CREATE TABLE birds ( id INT, name VARCHAR(50), wingspan_cm FLOAT, colors STRING ); PUT '/Users/Downloads/data.csv' TO TABLE VOLUME birds FILE 'data.csv'; COPY INTO birds FROM TABLE VOLUME birds USING csv OPTIONS('header'='true') FILES ('data.csv') PURGE = TRUE;

3. 从 OSS 导入

CREATE TABLE birds ( id INT, name VARCHAR(50), wingspan_cm FLOAT, colors STRING ); CREATE STORAGE CONNECTION catalog_storage_oss TYPE OSS ACCESS_ID = 'xxxx' ACCESS_KEY = 'xxxxxxx' ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com'; CREATE EXTERNAL VOLUME my_volume LOCATION 'oss://mybucket/test_insert/' USING CONNECTION catalog_storage_oss DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE); COPY INTO birds FROM VOLUME my_volume USING csv SUBDIRECTORY 'dau_unload/read/';

4. 从 COS 导入

CREATE TABLE birds ( id INT, name VARCHAR(50), wingspan_cm FLOAT, colors STRING ); CREATE STORAGE CONNECTION my_conn TYPE COS ACCESS_KEY = '<access_key>' SECRET_KEY = '<secret_key>' REGION = 'ap-shanghai' APP_ID = '1310000503'; CREATE EXTERNAL VOLUME my_volume LOCATION 'cos://mybucket/test_insert/' USING CONNECTION my_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE); COPY INTO birds FROM VOLUME my_volume USING csv SUBDIRECTORY 'dau_unload/read/';

5. 从 S3 导入

CREATE TABLE birds ( id INT, name VARCHAR(50), wingspan_cm FLOAT, colors STRING ); CREATE STORAGE CONNECTION aws_bj_conn TYPE S3 ACCESS_KEY = 'AKIAQNBSBP6EIJE33***' SECRET_KEY = '7kfheDrmq***************************' ENDPOINT = 's3.cn-north-1.amazonaws.com.cn' REGION = 'cn-north-1'; CREATE EXTERNAL VOLUME my_volume LOCATION 's3://mybucket/test_insert/' USING CONNECTION aws_bj_conn DIRECTORY = (ENABLE = TRUE, AUTO_REFRESH = TRUE); COPY INTO birds FROM VOLUME my_volume USING csv SUBDIRECTORY 'dau_unload/read/';

6. 导入时转换数据(类型转换 + 函数处理)

FROM (SELECT ...)
FROM (SELECT ...)
子查询中对数据做转换。必须在 VOLUME 子句中显式声明列名和类型,否则无法在 SELECT 中引用列名。

CREATE TABLE IF NOT EXISTS doc_copy_transform (id INT, name_upper STRING); -- 从竖线分隔的 CSV 导入,同时做类型转换和大写处理 -- 注意:VOLUME 子句中必须声明列名和类型 COPY INTO doc_copy_transform FROM ( SELECT CAST(col0 AS INT), UPPER(col1) FROM USER VOLUME(col0 STRING, col1 STRING, col2 STRING) USING CSV OPTIONS('sep'='|') FILES('data.csv') ); SELECT * FROM doc_copy_transform ORDER BY id; +----+------------+ | id | name_upper | +----+------------+ | 1 | ALICE | | 2 | BOB | | 3 | CAROL | +----+------------+

7. 导入时 JOIN 维度表过滤数据

CREATE TABLE departments ( dept_id int, dept_name varchar, location varchar ); INSERT INTO departments VALUES (10, '销售部', '北京'), (20, '研发部', '上海'), (30, '财务部', '广州'), (40, '人事部', '深圳'); CREATE TABLE employees ( emp_id int, emp_name varchar, dept_id int, salary int ); -- Volume 中的 CSV 文件列顺序:emp_id, emp_name, dept_id, salary -- 必须在 VOLUME 子句中声明列名,才能在 JOIN 条件中引用 -- 只导入销售部(dept_id=10)的数据 COPY OVERWRITE employees FROM ( SELECT c0::int, c1, c2::int, c3::int FROM VOLUME my_volume(c0 STRING, c1 STRING, c2 STRING, c3 STRING) USING csv FILES('employees/part00001.csv') JOIN departments ON c2 = dept_id::STRING WHERE dept_name = '销售部' );

8. 正则匹配 Parquet 文件

COPY INTO hz_parquet_table FROM VOLUME hz_parquet_volume USING parquet REGEXP 'month=0[1-5].*.parquet';

9. 导入到分区表的指定分区

使用

PARTITION
PARTITION
子句可以将文件中的数据写入指定分区,文件里只需包含非分区列。

-- 创建按日期分区的表 CREATE TABLE IF NOT EXISTS events_partitioned ( id INT, event_type STRING ) PARTITIONED BY (dt STRING); -- 文件只有 2 列(id 和 event_type),dt 由 PARTITION 子句指定 COPY INTO events_partitioned PARTITION (dt='2024-01') FROM USER VOLUME USING CSV FILES('events_jan.csv'); SHOW PARTITIONS events_partitioned; +------------+ | dt | +------------+ | 2024-01 | +------------+

9. 导入 BSON 文件

COPY INTO t_bson FROM VOLUME my_external_vol( name string, age bigint, city string, interests array<string> ) USING BSON FILES('data.bson');

10. ON_ERROR 错误处理示例

-- ABORT 模式:遇到错误立即终止 COPY INTO test_data FROM VOLUME on_error_pipe USING csv OPTIONS('sep'='|', 'quote'='\0') ON_ERROR = ABORT; +-------------------------------------------------+---------+-------------+-------------+ | file | status | rows_loaded | first_error | +-------------------------------------------------+---------+-------------+-------------+ | oss://lakehouse-perf-test/tmp/tmp_pipe/copy.csv | SUCCESS | 2 | | +-------------------------------------------------+---------+-------------+-------------+ -- CONTINUE 模式:跳过格式不匹配的文件 COPY INTO test_data FROM VOLUME on_error_pipe USING csv OPTIONS('sep'='|', 'quote'='\0') ON_ERROR = CONTINUE; +-----------------------------------------------------+---------------+-------------+----------------------------------------------------------------------------------------------------------------------+ | file | status | rows_loaded | first_error | +-----------------------------------------------------+---------------+-------------+----------------------------------------------------------------------------------------------------------------------+ | oss://lakehouse-perf-test/tmp/tmp_pipe/copy.csv | SUCCESS | 2 | | | oss://lakehouse-perf-test/tmp/tmp_pipe/copy.csv.zip | LOADED_FAILED | 0 | csv file: oss://lakehouse-perf-test/tmp/tmp_pipe/copy.csv.zip, line: 0: eatString throws quote(0) in unquote string, | | oss://lakehouse-perf-test/tmp/tmp_pipe/new_copy.csv | SUCCESS | 2 | | +-----------------------------------------------------+---------------+-------------+----------------------------------------------------------------------------------------------------------------------+

11. 竖线分隔符 CSV 导入

CREATE TABLE IF NOT EXISTS orders (id INT, customer STRING, amount INT); -- 文件内容示例: -- 1|Alice|500 -- 2|Bob|300 COPY INTO orders FROM USER VOLUME USING CSV OPTIONS('sep'='|') FILES('orders.csv'); SELECT * FROM orders ORDER BY id; +----+----------+--------+ | id | customer | amount | +----+----------+--------+ | 1 | Alice | 500 | | 2 | Bob | 300 | +----+----------+--------+

12. 带 header 的 CSV 导入(header 行自动跳过)

CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, score INT); -- 文件内容示例(第一行为 header): -- id,name,score -- 1,Alice,95 -- 2,Bob,87 COPY INTO employees FROM USER VOLUME USING CSV OPTIONS('header'='true') FILES('employees.csv'); SELECT * FROM employees ORDER BY id; +----+-------+-------+ | id | name | score | +----+-------+-------+ | 1 | Alice | 95 | | 2 | Bob | 87 | +----+-------+-------+

13. gzip 压缩 CSV 导入

CREATE TABLE IF NOT EXISTS logs (ts STRING, level STRING, message STRING); COPY INTO logs FROM VOLUME my_volume USING CSV OPTIONS('header'='true', 'compression'='gzip') SUBDIRECTORY 'logs/2024/';

14. JSON 数组导入

CREATE TABLE IF NOT EXISTS events (id INT, name STRING); -- 文件内容示例(JSON 数组格式): -- [{"id":1,"name":"login"},{"id":2,"name":"logout"}] COPY INTO events FROM USER VOLUME USING JSON OPTIONS('explodeArray'='true') FILES('events.json'); SELECT * FROM events ORDER BY id; +----+--------+ | id | name | +----+--------+ | 1 | login | | 2 | logout | +----+--------+

相关指南

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询