使用Python任务将gharchive网站数据同步到对象存储
Lakehouse Studio中的Python节点提供了Python代码的开发、测试运行和调度功能。依赖调度功能,通过一份代码实现全量数据的补数任务、周期性的调度任务。通过调度任务依赖,实现Python任务和SQL任务、Shell脚本、数据集成等其它任务类型的混合工作流编排。

编写python代码
import os,io
import subprocess
from datetime import datetime, timedelta
import oss2
# 阿里云OSS配置,ak\sk为自定义参数。ENDPOINT请根据OSS实际Region来修改。
ACCESS_KEY_ID = '${ak}'
ACCESS_KEY_SECRET = '${sk}'
BUCKET_NAME = 'YourBucketName'
ENDPOINT = 'oss-cn-shanghai-internal.aliyuncs.com'
ROOT_PATH = 'ghachive'
# 获取当前东八区时间
# beijing_time = datetime.now()
beijing_time = datetime.strptime('${datetime}', "%Y-%m-%d %H:%M:%S")
# 获取文件时间,北京时间偏差9个小时即可(时间8小时,gharchive网站产出数据文件晚1个小时,8+1)
ny_time = beijing_time - timedelta(hours=9)
# 格式化时间
year = ny_time.strftime('%Y')
month = ny_time.strftime('%m')
day = ny_time.strftime('%d')
hour = ny_time.strftime('%H')
# 打印转换后的时间
print(f"Converted to data file Time and -9 hour: {year}-{month}-{day} {hour}:00:00")
# 判断小时是否是'0x'格式,是的话去掉前导0
if hour.startswith('0') and len(hour) > 1:
# 去掉前导的'0'
hour = hour[1:]
try:
# 构建wget命令
url = f"https://data.gharchive.org/{year}-{month}-{day}-{hour}.json.gz"
cmd = ["wget", "-qO-", url]
print(f"wget cmd: {cmd}")
# 执行wget命令并捕获输出
wget_output = subprocess.check_output(cmd)
print(f"Wget file done...")
# 将输出转换为内存中的文件对象
file_obj = io.BytesIO(wget_output)
except Exception as e:
print(f"An error occurred: {e}")
file_obj = None
# 增加异常抛出,配合任务重试。调度设定为间隔10分钟、重试3次,防止源端文件没有准时产出,提高鲁棒性
raise
if file_obj:
try:
# 初始化阿里云OSS
auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)
# 上传文件到OSS
oss_path = f"{ROOT_PATH}/{year}/{month}/{day}/{year}-{month}-{day}-{hour}.json.gz"
print(f"osspath: {oss_path}")
bucket.put_object(oss_path, file_obj)
print(f"Put file to oss done...")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 关闭内存中的文件对象
file_obj.close()
运行测试
点击“运行”对代码进行测试,查看运行结果是否符合预期。
调度设置与任务发布
由于gharchive网站每一小时新生成一个文件,所以将调度周期设置为1小时即可。

然后点击“提交”完成发布,这样Python任务就可以周期调度完成gharchive文件到云对象存储OSS的数据同步了。
补数实现全量同步
周期任务是从指定的时间开始周期性执行从而获得数据的。为了获得在此之前的全量数据,可以直接用同样的代码和任务,执行“补数”任务就可以了,批量同步周期任务最早一个周期之前的所有数据,实现全量同步。这非常方便,且通过同一套代码保证了逻辑的一致性。
点击“运维”,进入周期任务的运维页面,然后点击“补数”。
gharchive上获取从2012-02-12日开始的文件,所以将补数任务开始时间设置为2012-02-12 00:00:00。
该任务的周期调度是从2024-06-18日的11点开始,所以将补数任务结束时间设置为2024-06-18 11:00:00

预览补数任务的实例,共要产生108251个任务任务实例,这意味着上述时间段总共有108251个小时,也意味着本次补数将从gharchive网站同步108251个文件到云对象存储上。

任务编排
在后续任务开发中,通过将本任务设置为依赖任务,实现工作流编排。