将数据导入云器Lakehouse的完整指南

数据入湖:通过ZettaPark PUT文件实现数据入湖的方式

概述

通过云器Lakehouse提供的Zettapark Python库,以Python编程的方式将在测试数据生成步骤生成的数据上传到云器Lakehouse管理的数据湖中,实现数据入湖。

数据湖操作需要新建一个数据湖连接和Volume,然后就可以将数据Put到数据湖中。

使用场景

适合熟悉Python编程,可以借助Python强大的编程能力和灵活性,通过Python和Dataframe进行数据清洗转换等数据工程、数据准备的工作,特别是和AI分析紧密相关的数据工作。

实现步骤

你也可以直接下载文件到本地。

将本地文件通过Zettapark Put到云器Lakehouse管理的数据湖(Volume
# !pip install clickzetta_zettapark_python  -i https://pypi.tuna.tsinghua.edu.cn/simple
from clickzetta.zettapark.session import Session
import json,requests
import os
from datetime import datetime
创建到云器Lakehouse的会话
# 从配置文件中读取参数
with open('config/config-ingest.json', 'r') as config_file:
    config = json.load(config_file)

print("正在连接到云器Lakehouse.....\n")

# 创建会话
session = Session.builder.configs(config).create()

print("连接成功!...\n")

正在连接到云器Lakehouse.....

连接成功!...

将文件PUT到云器Lakehouse数据湖Volume

请将'data/'改为在‘测试数据生成’步骤生成数据存放的目录。

for filename in os.listdir("data/"):
        if filename.endswith(".gz"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/gz/")
        if filename.endswith(".csv"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/csv/")
        if filename.endswith(".json"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/json/")
# 或者上传目录下所有的文件# session.file.put("../data/","volume://ingest_demo/gz/")
再次同步数据湖Volume的目录到Lakehouse
session.sql(alter_datalake_sql).show()

---------------------

|result_message |

---------------------

|OPERATION SUCCEED |

---------------------

再次查看云器Lakehouse数据湖Volume上的文件,数据入湖成功了
results = session.sql("select * from directory(volume ingest_demo)").show()

----------------------------------------------------------------------------------------------------------------------------

|relative_path |url |size |last_modified_time |

----------------------------------------------------------------------------------------------------------------------------

|gz/lift_tickets_data.csv.gz |oss://yourbucketname/ingest_demo/gz/lift_ticket... |9717050 |2024-12-27 19:24:21+08:00 |

|gz/lift_tickets_data.json.gz |oss://yourbucketname/ingest_demo/gz/lift_ticket... |11146044 |2024-12-27 19:24:19+08:00 |

----------------------------------------------------------------------------------------------------------------------------

测试将数据湖上文件再拉回到本地
session.file.get("volume://ingest_demo/gz/lift_tickets_data.json.gz","tmp/gz/")

[GetResult(file='tmp/gz/lift_tickets_data.json.gz', size=11146044, status='DOWNLOADED', message='')]

校验数据湖文件的行数

数据校验,检查文件里的行数。查询结果是100000,和原文件的行数一样,简单的从行数看数据入湖正确。

datalake_data_verify_sql = """
select count() from volume ingest_demo (txid string) using csv
 options(
    'header'='true',
    'sep'=',',
    'compression' = 'gzip'
 ) files('gz/lift_tickets_data.csv.gz')
 limit 10
"""
session.sql(datalake_data_verify_sql).show()

-------------

|count() |

-------------

|100000 |

-------------

查询数据湖文件里的数据
datalake_data_analytics_sql = """
select * from volume ingest_demo (txid string,name string, address_state string) using csv
 options(
    'header'='true',
    'sep'=',',
    'compression' = 'gzip'
 ) files('gz/lift_tickets_data.csv.gz')
 limit 10
"""
session.sql(datalake_data_analytics_sql).show()

-------------------------------------------------------------------------------------

|txid |name |address_state |

-------------------------------------------------------------------------------------

|80a7a77b-4941-46f3-bf1a-760bb46f12da |0xbb6eabaf2eb3c3d2ea164eba |新荣记 |

|976b4512-1b07-43f4-a8e4-1fe86a7e1ee4 |0xa08ab7945cf87fc0b5095dc |大董烤鸭 |

|4c49f5cc-0bd4-4a7e-8f61-f4a501a0dd24 |0xdf7bd805b890815a4e0a008c |京雅堂 |

|8579071f-1c8b-4214-9a4d-096e6403bc52 |0x3113aa5ae86c522f3176829e |新大陆中餐厅 |

|31962471-ad3b-463d-ab36-d1b1ab041a36 |0x28c6168f44e09cacd82ecfe9 |顺峰海鲜酒家 |

|f253d271-092d-4261-8703-a440cc149c39 |0xab306bea9de6a13426361153 |长安壹号 |

|5e52e443-2c03-4ce2-a95d-992d7cb3f54e |0x52000c48116d3a4667c3b607 |御宝轩 |

|e45f3806-972c-4617-b4ab-f2cbfc449de1 |0x247dd8c03cab559125a63d1b |店客店来 |

|9abeadfa-ecac-42fb-9dd7-33377e2e5387 |0x9824bf4d4f7e12590f692148 |川办餐厅 |

|c8938377-27a0-4f1f-9800-00c169729fd3 |0x4b65182989de9a3d13943b10 |南门火锅 |

-------------------------------------------------------------------------------------

关闭Zettapark会话
session.close()

下一步建议

  • 通过Zettapark以Dataframe的方式清洗和转化数据
  • 在Python代码中调用ML、LLM相关的接口,深度整合Data+AI
  • 在云器Lakehouse Studio里以SQL方式分析数据湖文件里的数据

资料

Zettapark快速上手

联系我们
预约咨询
微信咨询
电话咨询