使用Python任务将gharchive网站数据同步到对象存储

Lakehouse Studio中的Python节点提供了Python代码的开发、测试运行和调度功能。依赖调度功能,通过一份代码实现全量数据的补数任务、周期性的调度任务。通过调度任务依赖,实现Python任务和SQL任务、Shell脚本、数据集成等其它任务类型的混合工作流编排。

编写python代码

import os,io

import subprocess

from datetime import datetime, timedelta

import oss2

# 阿里云OSS配置,ak\sk为自定义参数。ENDPOINT请根据OSS实际Region来修改。

ACCESS_KEY_ID = '${ak}'

ACCESS_KEY_SECRET = '${sk}'

BUCKET_NAME = 'YourBucketName'

ENDPOINT = 'oss-cn-shanghai-internal.aliyuncs.com'

ROOT_PATH = 'ghachive'

# 获取当前东八区时间

# beijing_time = datetime.now()

beijing_time = datetime.strptime('${datetime}', "%Y-%m-%d %H:%M:%S")

# 获取文件时间,北京时间偏差9个小时即可(时间8小时,gharchive网站产出数据文件晚1个小时,8+1)

ny_time = beijing_time - timedelta(hours=9)

# 格式化时间

year = ny_time.strftime('%Y')

month = ny_time.strftime('%m')

day = ny_time.strftime('%d')

hour = ny_time.strftime('%H')

# 打印转换后的时间

print(f"Converted to data file Time and -9 hour: {year}-{month}-{day} {hour}:00:00")

# 判断小时是否是'0x'格式,是的话去掉前导0

if hour.startswith('0') and len(hour) > 1:

# 去掉前导的'0'

hour = hour[1:]


try:

    # 构建wget命令
    
    url = f"https://data.gharchive.org/{year}-{month}-{day}-{hour}.json.gz"
    
    cmd = ["wget", "-qO-", url]
    
    print(f"wget cmd: {cmd}")
    
    # 执行wget命令并捕获输出
    
    wget_output = subprocess.check_output(cmd)
    
    print(f"Wget file done...")
    
    # 将输出转换为内存中的文件对象
    
    file_obj = io.BytesIO(wget_output)

except Exception as e:

    print(f"An error occurred: {e}")
    
    file_obj = None

    # 增加异常抛出,配合任务重试。调度设定为间隔10分钟、重试3次,防止源端文件没有准时产出,提高鲁棒性
    
    raise


if file_obj:

    try:
    
        # 初始化阿里云OSS
        
        auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
        
        bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)
        
        # 上传文件到OSS
        
        oss_path = f"{ROOT_PATH}/{year}/{month}/{day}/{year}-{month}-{day}-{hour}.json.gz"
        
        print(f"osspath: {oss_path}")
        
        bucket.put_object(oss_path, file_obj)
        
        print(f"Put file to oss done...")
        
    except Exception as e:
        
        print(f"An error occurred: {e}")
        
    finally:
        
        # 关闭内存中的文件对象
        
        file_obj.close()

运行测试

点击“运行”对代码进行测试,查看运行结果是否符合预期。

调度设置与任务发布

由于gharchive网站每一小时新生成一个文件,所以将调度周期设置为1小时即可。

然后点击“提交”完成发布,这样Python任务就可以周期调度完成gharchive文件到云对象存储OSS的数据同步了。

补数实现全量同步

周期任务是从指定的时间开始周期性执行从而获得数据的。为了获得在此之前的全量数据,可以直接用同样的代码和任务,执行“补数”任务就可以了,批量同步周期任务最早一个周期之前的所有数据,实现全量同步。这非常方便,且通过同一套代码保证了逻辑的一致性。 点击“运维”,进入周期任务的运维页面,然后点击“补数”。 gharchive上获取从2012-02-12日开始的文件,所以将补数任务开始时间设置为2012-02-12 00:00:00。 该任务的周期调度是从2024-06-18日的11点开始,所以将补数任务结束时间设置为2024-06-18 11:00:00

预览补数任务的实例,共要产生108251个任务任务实例,这意味着上述时间段总共有108251个小时,也意味着本次补数将从gharchive网站同步108251个文件到云对象存储上。

任务编排

在后续任务开发中,通过将本任务设置为依赖任务,实现工作流编排。

联系我们
预约咨询
微信咨询
电话咨询