使用 Zettapark 和 Python 机器学习库进行信用评分

概述

在本逐步教程中,您将能够使用适用于 Python 的 Zettapark,以及您最喜欢的数据分析和可视化 Python 库,以及流行的 scikit-learn 机器学习库,来解决一个端到端的机器学习用例。

:-:

前提条件

  • Lakehouse 账户
  • 客户端 Zettapark 环境,已安装 Zettapark 库。

您将学到的内容

  • 了解如何使用适用于 Python 的 Zettapark 实现端到端的机器学习管道。
  • 使用适用于 Python 的 Zettapark API 和矢量化函数进行开发。
  • 使用 Python 流行库(Pandas、seaborn)进行数据探索、可视化和准备。
  • 使用 scikit-learn Python 包进行机器学习。
  • 使用适用于 Python 的 Zettapark 部署和使用机器学习模型进行评分。

使用步骤

  • 第 1 步:运行信用评分设置笔记本。这将下载数据集,并创建本演示所需的数据库和表。确保自定义 config.json。
  • 第 2 步:现在您可以运行信用评分教程。

您可以从GitHub 存储库获取源代码(Jupyter Notebook ipynb 文件)和数据文件

使用 Zettapark for Python 进行信用评分设置笔记本

1. Lakehouse 试用账户

先决条件是拥有一个 Lakehouse 账户。如果您没有 Lakehouse 账户,可以联系免费试用。

注册试用后,请收藏 Lakehouse 账户的 URL,并保存您的凭证,这些信息在本实验中需要使用。

此版本需要 Zettapark 0.1.2 或更高版本。

2. Python 库

运行此演示需要以下库。在此部分,添加环境中缺少的任何 Python 库。

# !pip install -q --upgrade clickzetta_zettapark_python
# !pip install scikit-plot
# !pip install pyarrow==6.0.0
# !pip install seaborn
# !pip install matplotlib

3. 文件下载

3.1 数据集

! curl -o data/credit_files.csv https://raw.githubusercontent.com/yunqiqiliang/clickzetta_quickstart/refs/heads/main/Zettapark-credit-scoring/data/credit_files.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  292k  100  292k    0     0  66280      0  0:00:04  0:00:04 --:--:-- 69100
! curl -o data/credit_request.csv https://raw.githubusercontent.com/yunqiqiliang/clickzetta_quickstart/refs/heads/main/Zettapark-credit-scoring/data/credit_request.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  6068  100  6068    0     0   2297      0  0:00:02  0:00:02 --:--:--  2297

3.2 config.json 凭证文件

需要编辑以下文件,填入 Lakehouse 账户的凭证并保存。它将用于在主笔记本中连接到 Lakehouse:

{
  "username": "<username>",
  "password": "<password>",
  "service": "<service url>",
  "instance": "<instance id>",
  "workspace": "<workspace>",
  "schema": "<schema>",
  "vcluster": "<vcluster>",
  "sdk_job_timeout": 60,
  "hints": {
    "sdk.job.timeout": 60,
    "query_tag": "test_zettapark_credit_scoring"
  }
}
! curl -o config/config_tobe_renamed.json https://raw.githubusercontent.com/yunqiqiliang/clickzetta_quickstart/refs/heads/main/Zettapark-credit-scoring/config/config.json
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   321  100   321    0     0    138      0  0:00:02  0:00:02 --:--:--   138

4. 数据库

在下面的部分中,请在 config.json 文件中填写不同的参数,以连接到您的 Lakehouse 环境。

import pandas as pd
import json
from clickzetta.zettapark.session import Session
import clickzetta.zettapark.functions as F
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

# 从配置文件读取连接参数
with open('config/config.json', 'r') as config_file:
    config = json.load(config_file)

schema = config['schema']
vcluster = config['vcluster']

print("Connecting to Lakehouse.....\n")

# 创建会话
session = Session.builder.configs(config).create()

session.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}").collect()
session.sql(f"CREATE VCLUSTER IF NOT EXISTS {vcluster} VCLUSTER_SIZE=1 VCLUSTER_TYPE = GENERAL").collect()

print(session.sql("SELECT current_instance_id(), current_workspace(),current_workspace_id(), current_schema(), current_user(),current_user_id(), current_vcluster()").collect())

print("\nConnected!...\n")

5. 表

此演示包含 2 张表:

  • CREDIT_FILES:此表包含当前文件上的信用情况以及贷款是否正在偿还或在偿还信用方面是否存在实际问题。此数据集将用于历史分析并构建机器学习模型,以对新申请进行评分。

  • CREDIT_REQUESTS:此表包含银行需要根据 ML 算法批准的新信用请求。

5.1 CREDIT_FILES 表

运行以下命令后,请登录到您的 Lakehouse 环境,确保表已创建。它应该有 2.9K 行。

credit_files = pd.read_csv('data/credit_files.csv')
credit_files.columns = credit_files.columns.str.lower()
session.sql("drop table if exists CREDIT_FILES").collect()
session.write_pandas(credit_files,"CREDIT_FILES",auto_create_table='True', quote_identifiers=False)
<clickzetta.zettapark.table.Table at 0x7fe58538e990>
credit_df = session.table("CREDIT_FILES")
credit_df.schema
StructType([StructField('`credit_request_id`', LongType(), nullable=True), StructField('`credit_amount`', LongType(), nullable=True), StructField('`credit_duration`', LongType(), nullable=True), StructField('`purpose`', StringType(), nullable=True), StructField('`installment_commitment`', LongType(), nullable=True), StructField('`other_parties`', StringType(), nullable=True), StructField('`credit_standing`', StringType(), nullable=True), StructField('`credit_score`', LongType(), nullable=True), StructField('`checking_balance`', DoubleType(), nullable=True), StructField('`savings_balance`', DoubleType(), nullable=True), StructField('`existing_credits`', LongType(), nullable=True), StructField('`assets`', StringType(), nullable=True), StructField('`housing`', StringType(), nullable=True), StructField('`qualification`', StringType(), nullable=True), StructField('`job_history`', LongType(), nullable=True), StructField('`age`', LongType(), nullable=True), StructField('`sex`', StringType(), nullable=True), StructField('`marital_status`', StringType(), nullable=True), StructField('`num_dependents`', LongType(), nullable=True), StructField('`residence_since`', LongType(), nullable=True), StructField('`other_payment_plans`', StringType(), nullable=True)])
credit_df.toPandas().head()
credit_df.toPandas().info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2940 entries, 0 to 2939
Data columns (total 21 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   credit_request_id       2940 non-null   int64  
 1   credit_amount           2940 non-null   int64  
 2   credit_duration         2940 non-null   int64  
 3   purpose                 2940 non-null   object 
 4   installment_commitment  2940 non-null   int64  
 5   other_parties           271 non-null    object 
 6   credit_standing         2940 non-null   object 
 7   credit_score            2940 non-null   int64  
 8   checking_balance        2940 non-null   float64
 9   savings_balance         2940 non-null   float64
 10  existing_credits        2940 non-null   int64  
 11  assets                  2489 non-null   object 
 12  housing                 2940 non-null   object 
 13  qualification           2940 non-null   object 
 14  job_history             2940 non-null   int64  
 15  age                     2940 non-null   int64  
 16  sex                     2940 non-null   object 
 17  marital_status          2940 non-null   object 
 18  num_dependents          2940 non-null   int64  
 19  residence_since         2940 non-null   int64  
 20  other_payment_plans     2940 non-null   object 
dtypes: float64(2), int64(10), object(9)
memory usage: 482.5+ KB

5.2 CREDIT_REQUEST 表

运行以下命令后,请登录到您的 Lakehouse 环境,确保表已创建。它应该有 60 行。

credit_requests = pd.read_csv('data/credit_request.csv')
credit_requests.columns = credit_requests.columns.str.lower()
session.sql("drop table if exists CREDIT_REQUESTS").collect()
session.write_pandas(credit_requests,"CREDIT_REQUESTS",auto_create_table='True', quote_identifiers=False)
<clickzetta.zettapark.table.Table at 0x7fe50b7556d0>
credit_req_df = session.table("CREDIT_REQUESTS")
credit_req_df.schema
StructType([StructField('`credit_request_id`', LongType(), nullable=True), StructField('`credit_amount`', LongType(), nullable=True), StructField('`credit_duration`', LongType(), nullable=True), StructField('`purpose`', StringType(), nullable=True), StructField('`installment_commitment`', LongType(), nullable=True), StructField('`other_parties`', StringType(), nullable=True), StructField('`credit_score`', LongType(), nullable=True), StructField('`checking_balance`', DoubleType(), nullable=True), StructField('`savings_balance`', DoubleType(), nullable=True), StructField('`existing_credits`', LongType(), nullable=True), StructField('`assets`', StringType(), nullable=True), StructField('`housing`', StringType(), nullable=True), StructField('`qualification`', StringType(), nullable=True), StructField('`job_history`', LongType(), nullable=True), StructField('`age`', LongType(), nullable=True), StructField('`sex`', StringType(), nullable=True), StructField('`marital_status`', StringType(), nullable=True), StructField('`num_dependents`', LongType(), nullable=True), StructField('`residence_since`', LongType(), nullable=True), StructField('`other_payment_plans`', StringType(), nullable=True)])
credit_req_df.toPandas().head()
credit_req_df.toPandas().info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 60 entries, 0 to 59
Data columns (total 20 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   credit_request_id       60 non-null     int64  
 1   credit_amount           60 non-null     int64  
 2   credit_duration         60 non-null     int64  
 3   purpose                 60 non-null     object 
 4   installment_commitment  60 non-null     int64  
 5   other_parties           8 non-null      object 
 6   credit_score            60 non-null     int64  
 7   checking_balance        60 non-null     float64
 8   savings_balance         60 non-null     float64
 9   existing_credits        60 non-null     int64  
 10  assets                  49 non-null     object 
 11  housing                 60 non-null     object 
 12  qualification           60 non-null     object 
 13  job_history             60 non-null     int64  
 14  age                     60 non-null     int64  
 15  sex                     60 non-null     object 
 16  marital_status          60 non-null     object 
 17  num_dependents          60 non-null     int64  
 18  residence_since         60 non-null     int64  
 19  other_payment_plans     60 non-null     object 
dtypes: float64(2), int64(10), object(8)
memory usage: 9.5+ KB

使用 Zeetapark for Python 进行信用评分

在此笔记本中,我们将使用 Zettapark Python API 进行信用卡评分演示。

在此场景中,Zettabank 希望利用其现有的信用文件来分析当前的信用状况,即贷款是否正在顺利偿还,或者是否存在任何延迟/违约情况。

基于当前的信用状况,Zettabank 希望基于数据集构建一个机器学习信用评分算法,以便能够自动评估贷款是否应该被批准或拒绝。

先决条件

请在运行此演示之前运行信用评分演示设置笔记本。

1. 数据探索

在此部分,我们将探索现有信用的数据集。

1.1 打开 Lakehouse 会话

import json


import pandas as pd
from clickzetta.zettapark import *
from clickzetta.zettapark.functions import *
# 从配置文件读取连接参数
with open('config/config.json', 'r') as config_file:
    config = json.load(config_file)


schema = config['schema']
vcluster = config['vcluster']


print("Connecting to Lakehouse.....\n")


# 创建会话
session = Session.builder.configs(config).create()


session.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}").collect()
session.sql(f"CREATE VCLUSTER IF NOT EXISTS {vcluster} VCLUSTER_SIZE=1 VCLUSTER_TYPE = GENERAL").collect()


print(session.sql("SELECT current_instance_id(), current_workspace(),current_workspace_id(), current_schema(), current_user(),current_user_id(), current_vcluster()").collect())


print("\nConnected!...\n")

1.2 探索 Lakehouse 表中的数据

credit_df = session.table("CREDIT_FILES")
credit_df.describe().toPandas()
credit_df.toPandas().info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2940 entries, 0 to 2939
Data columns (total 21 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   credit_request_id       2940 non-null   int64  
 1   credit_amount           2940 non-null   int64  
 2   credit_duration         2940 non-null   int64  
 3   purpose                 2940 non-null   object 
 4   installment_commitment  2940 non-null   int64  
 5   other_parties           271 non-null    object 
 6   credit_standing         2940 non-null   object 
 7   credit_score            2940 non-null   int64  
 8   checking_balance        2940 non-null   float64
 9   savings_balance         2940 non-null   float64
 10  existing_credits        2940 non-null   int64  
 11  assets                  2489 non-null   object 
 12  housing                 2940 non-null   object 
 13  qualification           2940 non-null   object 
 14  job_history             2940 non-null   int64  
 15  age                     2940 non-null   int64  
 16  sex                     2940 non-null   object 
 17  marital_status          2940 non-null   object 
 18  num_dependents          2940 non-null   int64  
 19  residence_since         2940 non-null   int64  
 20  other_payment_plans     2940 non-null   object 
dtypes: float64(2), int64(10), object(9)
memory usage: 482.5+ KB
credit_df.toPandas()

1.3 可视化数值特征

从这个可视化中,我们可以看到一些有趣的特征:

  • 大多数信用请求的金额较小(<50k)
  • 大多数信用期限为20个月或更短。
  • 大多数申请人的信用评分很高。
  • 大多数申请人在 Zettabank 的信贷或储蓄账户中余额不多。
  • 大多数申请人年龄小于40岁。
credit_df.toPandas().hist(figsize=(15,15))
array([[<Axes: title={'center': 'credit_request_id'}>,
        <Axes: title={'center': 'credit_amount'}>,
        <Axes: title={'center': 'credit_duration'}>],
       [<Axes: title={'center': 'installment_commitment'}>,
        <Axes: title={'center': 'credit_score'}>,
        <Axes: title={'center': 'checking_balance'}>],
       [<Axes: title={'center': 'savings_balance'}>,
        <Axes: title={'center': 'existing_credits'}>,
        <Axes: title={'center': 'job_history'}>],
       [<Axes: title={'center': 'age'}>,
        <Axes: title={'center': 'num_dependents'}>,
        <Axes: title={'center': 'residence_since'}>]], dtype=object)

1.4 可视化分类特征

从这个可视化中,我们可以看到一些有趣的特征:

  • 大多数流行的信用请求与车辆购买或消费品相关。
  • 绝大多数贷款没有担保人,也没有共同申请人。
  • 大多数文件中的信用状况良好。
  • 大多数申请人是男性、外国工人、技术工人,拥有自己的房屋/公寓。
  • 更高金额的贷款(每个贷款类别的阈值不同)有更高的违约可能性。
import matplotlib.pyplot as plt
import seaborn as sns


sns.set(style="darkgrid")


fig, axs = plt.subplots(5, 2, figsize=(15, 30))
df = credit_df.toPandas()
sns.countplot(data=df, y="purpose", ax=axs[0,0])
sns.countplot(data=df, x="other_parties", ax=axs[0,1])
sns.countplot(data=df, x="credit_standing", ax=axs[1,0])
sns.countplot(data=df, x="assets", ax=axs[1,1])
sns.countplot(data=df, x="housing", ax=axs[2,0])
sns.countplot(data=df, x="qualification", ax=axs[2,1])
sns.countplot(data=df, x="sex", ax=axs[3,0])
sns.countplot(data=df, x="marital_status", ax=axs[3,1])
sns.countplot(data=df, x="other_payment_plans", ax=axs[4,0])
sns.stripplot(y="purpose", x="credit_amount", data=df, hue='credit_standing', jitter=True, ax=axs[4,1])
plt.show()

1.5 通过 Zettapark API 运行查询

我们可以使用 Zettapark API 运行查询以获取各种见解。例如,让我们尝试确定不同类别的贷款范围。我们可以通过检查 Lakehouse 查询历史来了解 Zettapark API 如何作为 SQL 推送。

df_loan_status = credit_df.select(col("PURPOSE"),col("CREDIT_AMOUNT"))\
                          .groupBy(col("PURPOSE"))\
                          .agg([min(col("CREDIT_AMOUNT")).as_("MIN_CREDIT_AMOUNT"), max(col("CREDIT_AMOUNT")).as_("MAX_CREDIT_AMOUNT"), median(col("CREDIT_AMOUNT")).as_("MED_CREDIT_AMOUNT"),avg(col("CREDIT_AMOUNT")).as_("AVG_CREDIT_AMOUNT")])\
                          .sort(col("PURPOSE"))
df_loan_status.toPandas()

2. 数据转换和编码

对于当前用例,为了将数据准备成机器学习所需的格式,我们需要将分类值编码成数值。

为了实现这一点,我们可以利用 Zettapark Python API 进行编码。

2.1 准备机器学习的特征矩阵

在此部分,我们将利用 Zettapark Python API 准备一个随机森林分类器模型的特征矩阵。

from clickzetta.zettapark.functions import when


feature_matrix = credit_df.select(
    when(col("purpose") == "Consumer Goods", 1)
    .when(col("purpose") == "Vehicle", 2)
    .when(col("purpose") == "Tuition", 3)
    .when(col("purpose") == "Business", 4)
    .when(col("purpose") == "Repairs", 5)
    .otherwise(0).alias("purpose_code"),
    when(col("qualification") == "unskilled", 1)
    .when(col("qualification") == "skilled", 2)
    .when(col("qualification") == "highly skilled", 3)
    .otherwise(0).alias("qualification_code"),
    when(col("other_parties") == "Guarantor", 1)
    .when(col("other_parties") == "Co-Applicant", 2)
    .otherwise(0).alias("other_parties_code"),
    when(col("other_payment_plans") == "bank", 1)
    .when(col("other_payment_plans") == "stores", 2)
    .otherwise(0).alias("other_payment_plans_code"),
    when(col("housing") == "rent", 1)
    .when(col("housing") == "own", 2)
    .otherwise(0).alias("housing_code"),
    when(col("assets") == "Vehicle", 1)
    .when(col("assets") == "Investments", 2)
    .when(col("assets") == "Home", 3)
    .otherwise(0).alias("assets_code"),
    when(col("sex") == "M", 1)
    .otherwise(0).alias("sex_code"),
    when(col("marital_status") == "Married", 1)
    .when(col("marital_status") == "Single", 2)
    .otherwise(0).alias("marital_status_code"),
    when(col("credit_standing") == "good", 1)
    .otherwise(0).alias("credit_standing_code"),
    col("checking_balance"),
    col("savings_balance"),
    col("age"),
    col("job_history"),
    col("credit_score"),
    col("credit_duration"),
    col("credit_amount"),
    col("residence_since"),
    col("installment_commitment"),
    col("num_dependents"),
    col("existing_credits")
)


feature_matrix_pandas = feature_matrix.toPandas()
print(feature_matrix_pandas)
      purpose_code  qualification_code  other_parties_code  \
0                2                   2                   0   
1                2                   2                   0   
2                3                   2                   0   
3                3                   2                   0   
4                2                   2                   0   
...            ...                 ...                 ...   
2935             0                   0                   0   
2936             2                   0                   0   
2937             2                   0                   0   
2938             2                   2                   0   
2939             2                   2                   0   


      other_payment_plans_code  housing_code  assets_code  sex_code  \
0                            0             2            0         0   
1                            1             1            0         1   
2                            0             1            2         0   
3                            1             1            2         0   
4                            0             2            2         0   
...                        ...           ...          ...       ...   
2935                         1             0            0         1   
2936                         1             0            0         1   
2937                         0             0            1         1   
2938                         0             2            1         1   
2939                         0             2            1         1   


      marital_status_code  credit_standing_code  checking_balance  \
0                       1                     1           -728.12   
1                       2                     1              0.00   
2                       1                     1           4696.00   
3                       1                     1            -25.35   
4                       1                     1              0.00   
...                   ...                   ...               ...   
2935                    2                     1           1505.00   
2936                    2                     1           4486.00   
2937                    2                     1            720.00   
2938                    2                     1            752.00   
2939                    2                     1           1564.00   


      savings_balance  age  job_history  credit_score  credit_duration  \
0               17.00   39           15           466                6   
1             2443.00   35            1           202                6   
2              143.00   23            1           736               15   
3                0.00   23            3           732               12   
4              510.00   30            1           507               18   
...               ...  ...          ...           ...              ...   
2935             0.00   40            0           726               48   
2936          7361.86   66            0           343               12   
2937           460.00   68            0           396               16   
2938          1444.00   27            0           523               45   
2939          1998.00   27            0           552               45   


      credit_amount  residence_since  installment_commitment  num_dependents  \
0              8600                4                       1               1   
1             12040                1                       4               1   
2              3920                4                       4               1   
3             12000                4                       4               1   
4             10550                1                       4               1   
...             ...              ...                     ...             ...   
2935          53810                4                       3               1   
2936          14800                4                       2               1   
2937          11750                3                       2               1   
2938          45760                4                       3               1   
2939          45760                4                       3               1   


      existing_credits  
0                    2  
1                    1  
2                    1  
3                    1  
4                    2  
...                ...  
2935                 1  
2936                 3  
2937                 3  
2938                 1  
2939                 1  


[2940 rows x 20 columns]

现在已定义了特征矩阵,我们将其转换为 Pandas DataFrame。

df = feature_matrix.toPandas().astype(int)
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2940 entries, 0 to 2939
Data columns (total 20 columns):
 #   Column                    Non-Null Count  Dtype
---  ------                    --------------  -----
 0   purpose_code              2940 non-null   int64
 1   qualification_code        2940 non-null   int64
 2   other_parties_code        2940 non-null   int64
 3   other_payment_plans_code  2940 non-null   int64
 4   housing_code              2940 non-null   int64
 5   assets_code               2940 non-null   int64
 6   sex_code                  2940 non-null   int64
 7   marital_status_code       2940 non-null   int64
 8   credit_standing_code      2940 non-null   int64
 9   checking_balance          2940 non-null   int64
 10  savings_balance           2940 non-null   int64
 11  age                       2940 non-null   int64
 12  job_history               2940 non-null   int64
 13  credit_score              2940 non-null   int64
 14  credit_duration           2940 non-null   int64
 15  credit_amount             2940 non-null   int64
 16  residence_since           2940 non-null   int64
 17  installment_commitment    2940 non-null   int64
 18  num_dependents            2940 non-null   int64
 19  existing_credits          2940 non-null   int64
dtypes: int64(20)
memory usage: 459.5 KB

这是数据的样子:

df.head()

3. 随机森林模型训练

我们将使用 Python 中流行的 scikit-learn 机器学习库中的随机森林分类器模型。

from sklearn.model_selection import train_test_split


X_train, X_test, y_train, y_test = train_test_split(df.drop('credit_standing_code', axis=1), 
                                                    df['credit_standing_code'], test_size=0.30)
from sklearn.ensemble import RandomForestClassifier


rfc = RandomForestClassifier(n_estimators=100)
rfc.fit(X_train, y_train)

4. 测试模型

rfc_pred = rfc.predict(X_test)
from sklearn.metrics import classification_report, confusion_matrix


print(classification_report(y_test,rfc_pred))
              precision    recall  f1-score   support


           0       0.99      0.87      0.92       275
           1       0.94      1.00      0.97       607


    accuracy                           0.96       882
   macro avg       0.97      0.93      0.95       882
weighted avg       0.96      0.96      0.95       882
print(confusion_matrix(y_test,rfc_pred))
[[238  37]
 [  2 605]]

5. 在 Lakehouse 中进行推理

在下面的例子中,我们希望处理 60 个待处理的信用请求,并对是否批准贷款进行评估。数据如下所示:

df_cred_req = session.table("CREDIT_REQUESTS")
df_cred_req.toPandas()
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2940 entries, 0 to 2939
Data columns (total 20 columns):
 #   Column                    Non-Null Count  Dtype
---  ------                    --------------  -----
 0   purpose_code              2940 non-null   int64
 1   qualification_code        2940 non-null   int64
 2   other_parties_code        2940 non-null   int64
 3   other_payment_plans_code  2940 non-null   int64
 4   housing_code              2940 non-null   int64
 5   assets_code               2940 non-null   int64
 6   sex_code                  2940 non-null   int64
 7   marital_status_code       2940 non-null   int64
 8   credit_standing_code      2940 non-null   int64
 9   checking_balance          2940 non-null   int64
 10  savings_balance           2940 non-null   int64
 11  age                       2940 non-null   int64
 12  job_history               2940 non-null   int64
 13  credit_score              2940 non-null   int64
 14  credit_duration           2940 non-null   int64
 15  credit_amount             2940 non-null   int64
 16  residence_since           2940 non-null   int64
 17  installment_commitment    2940 non-null   int64
 18  num_dependents            2940 non-null   int64
 19  existing_credits          2940 non-null   int64
dtypes: int64(20)
memory usage: 459.5 KB

6. 开发评分函数

当 Zettabank 接收信用请求时,我们希望编写一个函数,该函数可以通过任务调用,对传入的微批请求进行评分。

Python 函数将首先使用 Zettapark API 构建模型的输入特征,以进行评分。

from clickzetta.zettapark.functions import col, when


def process_credit_requests_fn (session, credit_requests: str, credit_assessment: str) -> int:
    
    # 使用 Zettapark API 直接编码构建模型的输入特征。
    df_cred_req = session.table(credit_requests).select( 
                            col("CREDIT_REQUEST_ID"), col("PURPOSE"),
                            when(col("PURPOSE") == "Consumer Goods", 1)
                            .when(col("PURPOSE") == "Vehicle", 2)
                            .when(col("PURPOSE") == "Tuition", 3)
                            .when(col("PURPOSE") == "Business", 4)
                            .when(col("PURPOSE") == "Repairs", 5)
                            .otherwise(0).alias("PURPOSE_CODE"),
                            when(col("QUALIFICATION") == "unskilled", 1)
                            .when(col("QUALIFICATION") == "skilled", 2)
                            .when(col("QUALIFICATION") == "highly skilled", 3)
                            .otherwise(0).alias("QUALIFICATION_CODE"),
                            when(col("OTHER_PARTIES") == "Guarantor", 1)
                            .when(col("OTHER_PARTIES") == "Co-Applicant", 2)
                            .otherwise(0).alias("OTHER_PARTIES_CODE"),
                            when(col("OTHER_PAYMENT_PLANS") == "bank", 1)
                            .when(col("OTHER_PAYMENT_PLANS") == "stores", 2)
                            .otherwise(0).alias("OTHER_PAYMENT_PLANS_CODE"),
                            when(col("HOUSING") == "rent", 1)
                            .when(col("HOUSING") == "own", 2)
                            .otherwise(0).alias("HOUSING_CODE"),
                            when(col("ASSETS") == "Vehicle", 1)
                            .when(col("ASSETS") == "Investments", 2)
                            .when(col("ASSETS") == "Home", 3)
                            .otherwise(0).alias("ASSETS_CODE"),
                            when(col("SEX") == "M", 1)
                            .otherwise(0).alias("SEX_CODE"),
                            when(col("MARITAL_STATUS") == "Married", 1)
                            .when(col("MARITAL_STATUS") == "Single", 2)
                            .otherwise(0).alias("MARITAL_STATUS_CODE"),
                            col("CHECKING_BALANCE"),
                            col("SAVINGS_BALANCE"),
                            col("AGE"),
                            col("JOB_HISTORY"),
                            col("CREDIT_SCORE"),
                            col("CREDIT_DURATION"), 
                            col("CREDIT_AMOUNT"), 
                            col("RESIDENCE_SINCE"),
                            col("INSTALLMENT_COMMITMENT"),
                            col("NUM_DEPENDENTS"),
                            col("EXISTING_CREDITS")
                         )
    
    # 调用 UDF 对之前读取的现有信用请求进行评分    
    input_features = [ 'PURPOSE_CODE',
                   'QUALIFICATION_CODE',
                   'OTHER_PARTIES_CODE',
                   'OTHER_PAYMENT_PLANS_CODE',
                   'HOUSING_CODE',
                   'ASSETS_CODE',
                   'SEX_CODE',
                   'MARITAL_STATUS_CODE',
                   'CHECKING_BALANCE',
                   'SAVINGS_BALANCE',
                   'AGE',
                   'JOB_HISTORY',
                   'CREDIT_SCORE',
                   'CREDIT_DURATION',
                   'CREDIT_AMOUNT',
                   'RESIDENCE_SINCE',
                   'INSTALLMENT_COMMITMENT',
                   'NUM_DEPENDENTS',
                   'EXISTING_CREDITS']           


    df_assessment = df_cred_req.select(
                    col("CREDIT_REQUEST_ID"), col("PURPOSE"), col("CREDIT_AMOUNT"), col("CREDIT_DURATION"),
                    when(col("CREDIT_SCORE") > 600, "Approved").otherwise("Denied").alias("CREDIT_STATUS"))
    
    df_assessment.write.mode("overwrite").saveAsTable(credit_assessment)
    
    # 函数将返回评估的信用请求数。
    return df_assessment.count()

7. 调用评分函数

process_credit_requests_fn (session, "credit_requests",  "credit_assessments")
60
session.table("credit_assessments").toPandas()

附录

联系我们
预约咨询
微信咨询
电话咨询