将数据导入云器Lakehouse的完整指南

数据入仓:通过云器Lakehouse Studio 多表实时同步(CDC,公网连接)

概述

使用场景

已有数据源(包括数据库、数据仓库)具备公网可访问的地址(比如做了公网NAT映射),需要多表同时同步,对数据新鲜度要求高(往往是分钟级别甚至秒级别),能够接受较高的同步成本的情况下,将数据从数据源的表同步到Lakehouse的表中。

云器Lakehouse Studio 多表实时同步支持多表合并,可以将结构系统的多个源表的数据合并到一张云器Lakehouse目标表中。

实现步骤

新建多表实时同步任务

导航到开发->任务,点击“+”,选择“多表实时同步”,新建一个多表实时同步任务。

任务名:06_multi_table_rt_sync_from_pg

来源数据选择“PostgreSQL”

选择同步对象

云器Lakehouse Studio会自动同步源库结构供选择:

数据库CDC配置

创建一个Slot

插件类型:pgoutput

Slot名:slot_for_multi_table_ingest_demo

目标配置

规则表达式:multitable_sync_{SOURCE_TABLE}。给目标表加上前缀multitable_sync_{SOURCE_TABLE},区分和其它方式同步进来的表。

查看字段映射,确保都映射成功:

配置同步规则

提交

配置完成后提交多表实时同步任务:

运维

运维多表实时同步任务:

启动

启动多表实时同步任务:

第一次启动该多表实时同步任务,选择“无状态启动”和“全量数据同步”:

启动后就可以查看任务运行的详细情况了,等待全量同步完成后,系统会自动开始进行增量的实时同步。

进度查看

全量同步进展中:

全量同步完成。

开始进行实时同步:

增量实时同步

往PG数据库插入增量数据,进行增量实时同步。在VS Code里新建一个文件“rt_data_generate_insert_into_pg.py”,将如下代码复制进去:

import os
import sys
import rapidjson as json
import optional_faker as _
import uuid
import random
import time
import psycopg2
from faker import Faker
from datetime import date, datetime, timedelta
from dotenv import load_dotenv

load_dotenv()
fake = Faker('zh_CN')  # 使用中文区域
resorts = ["大董烤鸭", "京雅堂", "新荣记", "仿膳饭庄", "全聚德", 
           "利群烤鸭店", "鼎泰丰", "海底捞", "江苏会所", "店客店来",
           "周黑鸭", "夜上海", "香宫", "长安壹号", "翡翠餐厅", "北京饭店",
           "四川豆花饭庄", "海底捞火锅", "川办餐厅", "南门火锅",
           "胡同", "翠园", "利苑酒家", "御宝轩", "金鼎轩", 
           "外婆家", "大董", "顺峰海鲜酒家", "小龙坎火锅",
           "新大陆中餐厅", "京兆尹", "鼎泰丰(台湾)", "滇池来客", 
           "绿波廊", "南美时光"]

# Load database credentials from environment variables
DB_NAME = 'postgres'
DB_USER = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'
DB_PORT = '5432'

def connect_db():
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    return conn

def random_date_in_2025():
    start_date = date(2025, 1, 1)
    end_date = date(2025, 12, 31)
    return start_date + timedelta(days=random.randint(0, (end_date - start_date).days))

def random_datetime_between(start_year, end_year):
    start_datetime = datetime(start_year, 1, 1)
    end_datetime = datetime(end_year, 12, 31, 23, 59, 59)
    random_seconds = random.randint(0, int((end_datetime - start_datetime).total_seconds()))
    return start_datetime + timedelta(seconds=random_seconds)

def insert_lift_ticket(cursor, lift_ticket):
    cursor.execute("""
        INSERT INTO ingest_demo.lift_tickets_data (txid, rfid, resort, purchase_time, expiration_time, days, name, address_street, address_city, address_state, address_postalcode, phone, email, emergency_contact_name, emergency_contact_phone)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        lift_ticket['txid'], lift_ticket['rfid'], lift_ticket['resort'],
        lift_ticket['purchase_time'], lift_ticket['expiration_time'],
        lift_ticket['days'], lift_ticket['name'], lift_ticket['address_street'],
        lift_ticket['address_city'], lift_ticket['address_state'],
        lift_ticket['address_postalcode'], lift_ticket['phone'],
        lift_ticket['email'], lift_ticket['emergency_contact_name'],
        lift_ticket['emergency_contact_phone']
    ))

def generate_lift_ticket():
    global resorts, fake
    lift_ticket = {
        'txid': str(uuid.uuid4()),
        'rfid': hex(random.getrandbits(96)),
        'resort': fake.random_element(elements=resorts),
        'purchase_time': random_datetime_between(2021, 2024),
        'expiration_time': random_date_in_2025(),
        'days': fake.random_int(min=1, max=7),
        'name': fake.name(),
        'address_street': fake.street_address(),
        'address_city': fake.city(),
        'address_state': fake.province(),
        'address_postalcode': fake.postcode(),
        'phone': fake.phone_number(),
        'email': fake.email(),
        'emergency_contact_name': fake.name(),
        'emergency_contact_phone': fake.phone_number(),
    }
    return lift_ticket

def main(total_count, batch_size, sleep_time):
    conn = connect_db()
    cursor = conn.cursor()
    
    batch_data = []
    for _ in range(total_count):
        lift_ticket = generate_lift_ticket()
        batch_data.append(lift_ticket)
        
        if len(batch_data) >= batch_size:
            for ticket in batch_data:
                insert_lift_ticket(cursor, ticket)
            conn.commit()
            batch_data = []
            time.sleep(sleep_time)
    
    # Insert any remaining data
    if batch_data:
        for ticket in batch_data:
            insert_lift_ticket(cursor, ticket)
        conn.commit()

    cursor.close()
    conn.close()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("请提供总行数, 每批次行数, 每批次休眠秒数。例如:python rt_data_generate_insert_into_pg.py 1000 100 10")
        sys.exit(1)
    
    total_count = int(sys.argv[1])
    batch_size = int(sys.argv[2])
    sleep_time = int(sys.argv[3])
    
    main(total_count, batch_size, sleep_time)

在VS Code里新建一个“终端”,并运行如下命令激活在“环境设置”步骤创建的Python环境。如果已在cz-ingest-examples环境里,请跳过。

conda activate cz-ingest-examples

然后在同一终端里运行如下命令:

将100000条数据插入到ingest_demo.lift_tickets_data表中,每次插入100行,每批次休眠10秒。

python rt_data_generate_insert_into_pg.py 100000 100 10

在云器Lakehouse Studio查看实时同步进展:

下一步建议

在数据源中插入新数据,查看增量同步的结果。

“停止”该同步任务的运行,运行该任务的虚拟计算集群设置了自动停止,会在作业停止后的“自动停止”的秒数内停止运行,从而节省费用,实现按需运行。

资料

实时写入数据

多表实时同步

通过云器Lakehouse的多表实时同步和动态表实现变化数据捕获(CDC)及数据处理

联系我们
预约咨询
微信咨询
电话咨询