使用Python任务实时获取github的事件并导入Lakehouse Table
https://api.github.com/events 提供API服务,可以实时的获取5分钟前的事件数据,通过API获得实时数据,加上从gharchive网站通过文件方式同步的离线数据,则将数据的时效性从小时级别提升了到了分钟级,进一步提升数据的新鲜度,更好的服务对数据新鲜度有更高要求的应用。 本示例也演示了如何通过云器Lakehouse的Python connect将数据以bulkload的方式写入表中。
创建目标表
在SQL脚本的任务节点里执行如下建表语句。
CREATE TABLE IF NOT EXISTS `github_timeline_realtime_events` ( `id` STRING, `type` STRING, `repo_id` STRING, `repo_name` STRING, `minute` STRING, `second` STRING, `created_at` TIMESTAMP, `data` STRING COMMENT 'record data, json format', `__last_modified__` STRING COMMENT '__last_modified__' ) PARTITIONED BY (DATE STRING,HOUR STRING) COMMENT 'public GitHub timeline record,real time events, ingested from https://api.github.com/events';
编写Python代码
import requests,json import time import pytz import pandas as pd from requests.exceptions import RequestException from datetime import datetime, timedelta from clickzetta import connect from clickzetta.bulkload.bulkload_enums import BulkLoadOptions,BulkLoadOperation,BulkLoadCommitOptions def get_lakehouse_connect(): conn = connect( username='', password='', service='api.clickzetta.com', instance='', workspace='gharchive', schema='public', vcluster='default') return conn def get_lakehouse_queries_data_hints(sql_query,query_tag): conn = get_lakehouse_connect() # 执行 SQL cursor = conn.cursor() my_param = dict() my_param["hints"] = dict() my_param["hints"]["query_tag"] =query_tag cursor.execute(sql_query, parameters=my_param) df = pd.DataFrame(cursor.fetchall(), columns=[i[0] for i in cursor.description]) return df def get_data(url, headers, params): retry_times = 5 intervals = [10,30,60, 300, 600] for i in range(retry_times): try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response except RequestException as e: print(f"Github API请求 {url} 失败,第{i+1}次: {e}") if response is None or response.status_code != 200: return None if i < retry_times - 1: time.sleep(intervals[i]) else: return None def bulk_load_data(real_time_events): try: bulkload_stream_conn = get_lakehouse_connect() bulkload_stream = bulkload_stream_conn.create_bulkload_stream(schema='public', table='github_timeline_realtime_events',record_keys=['id'],operation=BulkLoadOperation.UPSERT) if bulkload_stream: writer = bulkload_stream.open_writer(0) print("Successfully connected to the ClickZetta Lakehouse") for event in real_time_events: event_json = json.dumps(event) event_dict = json.loads(event_json) # print(f"event_dict is:\n\n{event_dict}") row = writer.create_row() row.set_value('id', event_dict['id']) row.set_value('type', event_dict['type']) row.set_value('repo_id', event_dict['repo']['id']) row.set\_value('repo_name', event_dict['repo']['name']) created_at_utc = datetime.strptime(event_dict['created_at'], '%Y-%m-%dT%H:%M:%SZ') created_at_e8 = created_at_utc + timedelta(hours=8) # 提取日期(字符串) date_e8 = created_at_e8.strftime('%Y-%m-%d') # 提取小时(字符串) hour_e8 = created_at_e8.strftime('%H') # 提取分钟(字符串) minute_e8 = created_at_e8.strftime('%M') # 提取秒(字符串) second_e8 = created_at_e8.strftime('%S') row.set\_value('date', date_e8) row.set_value('hour', hour_e8) row.set_value('minute', minute_e8) row.set_value('second', second_e8) row.set_value('created_at', created_at_e8) row.set_value('data', event_json) row.set_value('__last\_modified__', created_at_e8) writer.write(row) writer.close() bulkload_stream.commit() print(f"{len(events)} events have been written to ClickZetta Lakehouse.") except Exception as e: print("Error while connecting to ClickZetta Lakehouse,Exception is ", e) finally: print("finally ClickZetta Lakehouse connection is closed") # tz = pytz.timezone('Asia/Shanghai') # 初始化ETag和事件列表 etag = None events = [] # Create a dictionary to store unique event IDs seen_event_ids = {} response = None headers = { 'Authorization': f'please replace your github token' } params = {'per_page': 100} url = 'https://api.github.com/events' while True: if etag: headers['If-None-Match'] = etag response = get_data(url, headers=headers, params = params) if response is not None: remaining = response.headers.get('X-RateLimit-Remaining') print(f"X-RateLimit-Remaining: {remaining}") ETag = response.headers.get('ETag') print(f"ETag: {ETag}") # 检查响应状态 if response.status_code == 200: # 更新ETag etag = response.headers.get('ETag') # 获取事件并添加到列表中 new_events = response.json() for event in new_events: event_id = event.get('id') if event_id not in seen_event_ids: events.append(event) seen_event_ids[event_id] = True # 打印事件数量 print(f'事件数量: {len(events)}') elif response.status_code == 304: print('response.status_code == 304, 没有新事件。') else: print(f'response.status_code is {response.status_code}') # 根据GitHub的X-Poll-Interval头部信息进行休眠 # sleep_time = int(response.headers.get('X-Poll-Interval', 0)) sleep_time = 0 # 检查'Link'头部是否有'next'关键字,如果没有则表示已到最后一页 if 'next' not in response.links: url = 'https://api.github.com/events' # time.sleep(sleep_time) if len(events)>=700: bulk_load_data(events) events.clear() seen_event_ids = {} else: # 更新URL为下一页的链接 url = response.links['next']['url']
运行任务
由于该任务为循环执行,点击“运行”后,该任务会在python节点常驻运行,持续获取github的最新事件并写入到Lakehouse的表中,直到人为取消。
Yunqi © 2024 Yunqi, Inc. All Rights Reserved.
联系我们