将数据导入云器Lakehouse的完整指南
数据入仓:通过云器Lakehouse Studio 多表实时同步(CDC,公网连接)
概述
使用场景
已有数据源(包括数据库、数据仓库)具备公网可访问的地址(比如做了公网NAT映射),需要多表同时同步,对数据新鲜度要求高(往往是分钟级别甚至秒级别),能够接受较高的同步成本的情况下,将数据从数据源的表同步到Lakehouse的表中。
云器Lakehouse Studio 多表实时同步支持多表合并,可以将结构系统的多个源表的数据合并到一张云器Lakehouse目标表中。
实现步骤
新建多表实时同步任务
导航到开发->任务,点击“+”,选择“多表实时同步”,新建一个多表实时同步任务。

任务名:06_multi_table_rt_sync_from_pg
来源数据选择“PostgreSQL”

选择同步对象
云器Lakehouse Studio会自动同步源库结构供选择:

数据库CDC配置
创建一个Slot

插件类型:pgoutput
Slot名:slot_for_multi_table_ingest_demo
目标配置
规则表达式:multitable_sync_{SOURCE_TABLE}。给目标表加上前缀multitable_sync_{SOURCE_TABLE},区分和其它方式同步进来的表。
查看字段映射,确保都映射成功:

配置同步规则

提交
配置完成后提交多表实时同步任务:

运维
运维多表实时同步任务:

启动
启动多表实时同步任务:

第一次启动该多表实时同步任务,选择“无状态启动”和“全量数据同步”:

启动后就可以查看任务运行的详细情况了,等待全量同步完成后,系统会自动开始进行增量的实时同步。
进度查看
全量同步进展中:

全量同步完成。
开始进行实时同步:

增量实时同步
往PG数据库插入增量数据,进行增量实时同步。在VS Code里新建一个文件“rt_data_generate_insert_into_pg.py”,将如下代码复制进去:
import os
import sys
import rapidjson as json
import optional_faker as _
import uuid
import random
import time
import psycopg2
from faker import Faker
from datetime import date, datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
fake = Faker('zh_CN') # 使用中文区域
resorts = ["大董烤鸭", "京雅堂", "新荣记", "仿膳饭庄", "全聚德",
"利群烤鸭店", "鼎泰丰", "海底捞", "江苏会所", "店客店来",
"周黑鸭", "夜上海", "香宫", "长安壹号", "翡翠餐厅", "北京饭店",
"四川豆花饭庄", "海底捞火锅", "川办餐厅", "南门火锅",
"胡同", "翠园", "利苑酒家", "御宝轩", "金鼎轩",
"外婆家", "大董", "顺峰海鲜酒家", "小龙坎火锅",
"新大陆中餐厅", "京兆尹", "鼎泰丰(台湾)", "滇池来客",
"绿波廊", "南美时光"]
# Load database credentials from environment variables
DB_NAME = 'postgres'
DB_USER = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'
DB_PORT = '5432'
def connect_db():
conn = psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
return conn
def random_date_in_2025():
start_date = date(2025, 1, 1)
end_date = date(2025, 12, 31)
return start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
def random_datetime_between(start_year, end_year):
start_datetime = datetime(start_year, 1, 1)
end_datetime = datetime(end_year, 12, 31, 23, 59, 59)
random_seconds = random.randint(0, int((end_datetime - start_datetime).total_seconds()))
return start_datetime + timedelta(seconds=random_seconds)
def insert_lift_ticket(cursor, lift_ticket):
cursor.execute("""
INSERT INTO ingest_demo.lift_tickets_data (txid, rfid, resort, purchase_time, expiration_time, days, name, address_street, address_city, address_state, address_postalcode, phone, email, emergency_contact_name, emergency_contact_phone)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
lift_ticket['txid'], lift_ticket['rfid'], lift_ticket['resort'],
lift_ticket['purchase_time'], lift_ticket['expiration_time'],
lift_ticket['days'], lift_ticket['name'], lift_ticket['address_street'],
lift_ticket['address_city'], lift_ticket['address_state'],
lift_ticket['address_postalcode'], lift_ticket['phone'],
lift_ticket['email'], lift_ticket['emergency_contact_name'],
lift_ticket['emergency_contact_phone']
))
def generate_lift_ticket():
global resorts, fake
lift_ticket = {
'txid': str(uuid.uuid4()),
'rfid': hex(random.getrandbits(96)),
'resort': fake.random_element(elements=resorts),
'purchase_time': random_datetime_between(2021, 2024),
'expiration_time': random_date_in_2025(),
'days': fake.random_int(min=1, max=7),
'name': fake.name(),
'address_street': fake.street_address(),
'address_city': fake.city(),
'address_state': fake.province(),
'address_postalcode': fake.postcode(),
'phone': fake.phone_number(),
'email': fake.email(),
'emergency_contact_name': fake.name(),
'emergency_contact_phone': fake.phone_number(),
}
return lift_ticket
def main(total_count, batch_size, sleep_time):
conn = connect_db()
cursor = conn.cursor()
batch_data = []
for _ in range(total_count):
lift_ticket = generate_lift_ticket()
batch_data.append(lift_ticket)
if len(batch_data) >= batch_size:
for ticket in batch_data:
insert_lift_ticket(cursor, ticket)
conn.commit()
batch_data = []
time.sleep(sleep_time)
# Insert any remaining data
if batch_data:
for ticket in batch_data:
insert_lift_ticket(cursor, ticket)
conn.commit()
cursor.close()
conn.close()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("请提供总行数, 每批次行数, 每批次休眠秒数。例如:python rt_data_generate_insert_into_pg.py 1000 100 10")
sys.exit(1)
total_count = int(sys.argv[1])
batch_size = int(sys.argv[2])
sleep_time = int(sys.argv[3])
main(total_count, batch_size, sleep_time)
在VS Code里新建一个“终端”,并运行如下命令激活在“环境设置”步骤创建的Python环境。如果已在cz-ingest-examples环境里,请跳过。
conda activate cz-ingest-examples
然后在同一终端里运行如下命令:
将100000条数据插入到ingest_demo.lift_tickets_data表中,每次插入100行,每批次休眠10秒。
python rt_data_generate_insert_into_pg.py 100000 100 10
在云器Lakehouse Studio查看实时同步进展:

下一步建议
在数据源中插入新数据,查看增量同步的结果。
“停止”该同步任务的运行,运行该任务的虚拟计算集群设置了自动停止,会在作业停止后的“自动停止”的秒数内停止运行,从而节省费用,实现按需运行。
资料
实时写入数据
多表实时同步
通过云器Lakehouse的多表实时同步和动态表实现变化数据捕获(CDC)及数据处理