使用Python任务实时获取github的事件并导入Lakehouse Table

https://api.github.com/events 提供API服务,可以实时的获取5分钟前的事件数据,通过API获得实时数据,加上从gharchive网站通过文件方式同步的离线数据,则将数据的时效性从小时级别提升了到了分钟级,进一步提升数据的新鲜度,更好的服务对数据新鲜度有更高要求的应用。 本示例也演示了如何通过云器Lakehouse的Python connect将数据以bulkload的方式写入表中。

创建目标表

在SQL脚本的任务节点里执行如下建表语句。

CREATE TABLE IF NOT EXISTS `github_timeline_realtime_events` (

`id` STRING,

`type` STRING,

`repo_id` STRING,

`repo_name` STRING,

`minute` STRING,

`second` STRING,

`created_at` TIMESTAMP,

`data` STRING COMMENT 'record data, json format',

`__last_modified__` STRING COMMENT '__last_modified__'

) PARTITIONED BY (DATE STRING,HOUR STRING) COMMENT 'public GitHub timeline record,real time events, ingested from https://api.github.com/events';

编写Python代码

import requests,json

import time

import pytz

import pandas as pd

from requests.exceptions import RequestException

from datetime import datetime, timedelta

from clickzetta import connect

from clickzetta.bulkload.bulkload_enums import BulkLoadOptions,BulkLoadOperation,BulkLoadCommitOptions

def get_lakehouse_connect():

    conn = connect(

    username='',

    password='',

    service='api.clickzetta.com',

    instance='',

    workspace='gharchive',

    schema='public',

    vcluster='default')

    return conn

def get_lakehouse_queries_data_hints(sql_query,query_tag):

    conn = get_lakehouse_connect()

    # 执行 SQL

    cursor = conn.cursor()

    my_param = dict()

    my_param["hints"] = dict()

    my_param["hints"]["query_tag"] =query_tag

    cursor.execute(sql_query, parameters=my_param)

    df = pd.DataFrame(cursor.fetchall(), columns=[i[0] for i in cursor.description])

    return df

def get_data(url, headers, params):

    retry_times = 5

    intervals = [10,30,60, 300, 600]

    

    for i in range(retry_times):

        try:

            response = requests.get(url, headers=headers, params=params)

            response.raise_for_status()

            return response

        except RequestException as e:

            print(f"Github API请求 {url} 失败,第{i+1}次: {e}")

            if response is None or response.status_code != 200:

                return None

            if i < retry_times - 1:

                time.sleep(intervals[i])

            else:

                return None

            

def bulk_load_data(real_time_events):

    try:

        bulkload_stream_conn = get_lakehouse_connect()

        bulkload_stream = bulkload_stream_conn.create_bulkload_stream(schema='public', table='github_timeline_realtime_events',record_keys=['id'],operation=BulkLoadOperation.UPSERT)

        if bulkload_stream:

            writer = bulkload_stream.open_writer(0)

            print("Successfully connected to the ClickZetta Lakehouse")

        for event in real_time_events:

            event_json = json.dumps(event)

            event_dict = json.loads(event_json)

            # print(f"event_dict is:\n\n{event_dict}")

            row = writer.create_row()

            row.set_value('id', event_dict['id'])

            row.set_value('type', event_dict['type'])

            row.set_value('repo_id', event_dict['repo']['id'])

            row.set\_value('repo_name', event_dict['repo']['name'])

            created_at_utc = datetime.strptime(event_dict['created_at'], '%Y-%m-%dT%H:%M:%SZ')

            created_at_e8 = created_at_utc + timedelta(hours=8)

            # 提取日期(字符串)

            date_e8 = created_at_e8.strftime('%Y-%m-%d')

            # 提取小时(字符串)

            hour_e8 = created_at_e8.strftime('%H')

            # 提取分钟(字符串)

            minute_e8 = created_at_e8.strftime('%M')

            # 提取秒(字符串)

            second_e8 = created_at_e8.strftime('%S')

            row.set\_value('date', date_e8)

            row.set_value('hour', hour_e8)

            row.set_value('minute', minute_e8)

            row.set_value('second', second_e8)

            row.set_value('created_at', created_at_e8)

            

            row.set_value('data', event_json)

            

            row.set_value('__last\_modified__', created_at_e8)

            writer.write(row)

        writer.close()

        bulkload_stream.commit()

        print(f"{len(events)} events have been written to ClickZetta Lakehouse.")

        

    except Exception  as e:

        print("Error while connecting to ClickZetta Lakehouse,Exception is ", e)

    finally:

        print("finally ClickZetta Lakehouse connection is closed")

# tz = pytz.timezone('Asia/Shanghai')

# 初始化ETag和事件列表

etag = None

events = []

# Create a dictionary to store unique event IDs

seen_event_ids = {}

response = None

headers = {

    'Authorization': f'please replace your github token'

}

params = {'per_page': 100}

url = 'https://api.github.com/events'

while True:

    if etag:

        headers['If-None-Match'] = etag

    

    response = get_data(url, headers=headers, params = params)

    if response is not None:

        remaining = response.headers.get('X-RateLimit-Remaining')

        print(f"X-RateLimit-Remaining: {remaining}")

        ETag = response.headers.get('ETag')

        print(f"ETag: {ETag}")

        # 检查响应状态

        if response.status_code == 200:

            # 更新ETag

            etag = response.headers.get('ETag')

            

            # 获取事件并添加到列表中

            new_events = response.json()

            

            for event in new_events:

                event_id = event.get('id')

                if event_id not in seen_event_ids:

                    events.append(event)

                    seen_event_ids[event_id] = True

            

            # 打印事件数量

            print(f'事件数量: {len(events)}')

            

        elif response.status_code == 304:

            print('response.status_code == 304, 没有新事件。')

        else:

            print(f'response.status_code is {response.status_code}')

        # 根据GitHub的X-Poll-Interval头部信息进行休眠

        # sleep_time = int(response.headers.get('X-Poll-Interval', 0))

        sleep_time = 0

        # 检查'Link'头部是否有'next'关键字,如果没有则表示已到最后一页

        if 'next' not in response.links:

            url = 'https://api.github.com/events'

            # time.sleep(sleep_time)

            if len(events)>=700:

                bulk_load_data(events)

                events.clear()

                seen_event_ids = {}

        else:

            # 更新URL为下一页的链接

            url = response.links['next']['url']

运行任务

由于该任务为循环执行,点击“运行”后,该任务会在python节点常驻运行,持续获取github的最新事件并写入到Lakehouse的表中,直到人为取消。

联系我们
预约咨询
微信咨询
电话咨询