python|【Python】基于逻辑回归的金融风控评分卡

【导语】 信用风险计量模型可以包括跟个人信用评级,企业信用评级和国家信用评级。人信用评级有一系列评级模型组成,常见是A卡(申请评分卡)、B卡(行为模型)、C卡(催收模型)和F卡(反欺诈模型)。

  • A卡(Application score card):目的在于预测申请时(申请信用卡、申请贷款)对申请人进行量化评估。
  • B卡(Behavior score card):目的在于预测使用时点(获得贷款、信用卡的使用期间)未来一定时间内逾期的概率。
  • C卡(Collection score card):目的在于预测已经逾期并进入催收阶段后未来一定时间内还款的概率。
【优缺点】 优点:
  • 易于使用。业务人员在操作时,只需要按照评分卡每样打分然后算个总分就能操作,不需要接受太多专业训练
  • 直观透明。客户和审核人员都能知道看到结果,以及结果是如何产生的。
  • 应用范围广。例如:支付宝的芝麻信用分。
缺点:
  • 利润的信息维度不高。简单是优点,但在日益增长的数据前,就变成缺点。有着大量数据资源却使用有限,造成数据资源的浪费。
  • 当信息维度高时,评分卡建模会变得非常困难。
  • 某些不重要的特征,在另一些时刻会变得重要。例如在疫情期间,和收入相关的特征重要度会上升。
【构建流程】
  1. 数据探究。研究数据都包含哪些信息,
  2. 样本选取。选取一定时间周期内该平台上的信贷样本数据,划分训练集和测试集。
  3. 变量选取。也就是特征筛选。需要一定的业务理解。一般这部分费时较久
  4. 逻辑回归。根据筛选后的特征,构建逻辑回归模型。
  5. 评分卡转换。根据一定的公式转换。
  6. 验证并上线。验证评分卡效果,并上线持续监测。

【源程序】 数据集下载:givemesomecredit金融风控评分卡数据集
class CreditScore:def __init__(self, max_depth=None,# 决策树的深度 max_leaf_nodes=4,# 决策树的子节点数 min_samples_leaf=0.05,# 分节点最小划分比例 base_score=600,# 基础分 increase_score=50,# 增加分(概率增加一份所增加的分数) min_iv=0.1,# 特征筛选(选取IV值大于0.1的值) max_corr=0.6,# 特征筛选(选取相关性小于0.6的值) C=0.01,# 特征筛选(L1正则化系数) penalty='l2',# 特征筛选(L2正则化) round_num=2# 所有变量保留两位有效数 ):# 参数选择 self.max_depth = max_depth self.max_leaf_nodes = max_leaf_nodes self.min_samples_leaf = min_samples_leaf self.base_score = base_score self.increase_score = increase_score self.min_iv = min_iv self.max_corr = max_corr self.C = C self.penalty = penalty self.round_num = round_num# 保存变量结果 self.col_type_iv = None# 各变量类型以及IV值 self.col_continuous_cut_points = None# 连续变量的切分点,按小于等于,大于切分,空值单独归位一类,例如:['scorecashon', [-inf, 654.0, 733.0, 754.0, inf]] self.col_result = None# 最终评分卡选择的变量 self.score_card = None# 评分卡# 读取原始数据 # 划分训练集和测试集 def get_data_train_test(self, data, test_size=0.25, random_state=1234): x_train, x_test, y_train, y_test = train_test_split(data[[col for col in data.columns if col != 'y']], data['y'], test_size=test_size, random_state=random_state)# 随机选择25%作为测试集,剩余作为训练集 data_train = x_train.reset_index() del data_train['index'] data_train['y'] = y_train.reset_index()['y'] data_test = x_test.reset_index() del data_test['index'] data_test['y'] = y_test.reset_index()['y'] return data_train, data_test# 划分离散和连续变量(连续变量:int64,float64;离散变量:其他) def get_col_discrete_continue(self, data): col_all = data.columns col_all_type = data.dtypes col_type = [] for i in range(len(col_all)): if col_all[i] != 'y': if str(col_all_type[i]) in ('int64', 'float64'): col_type.append([col_all[i], 'continuous']) else: col_type.append([col_all[i], 'discrete']) return col_type# 基于决策树的分箱操作 def get_cut_point(self, data, col, max_depth=None, max_leaf_nodes=4, min_samples_leaf=0.05,round_num=2): data_notnull = data[[col, 'y']][data[col].notnull()]# 删除空值 cut_point = [] if len(np.unique(data_notnull[col])) > 1: x = data_notnull[col].values.reshape(-1, 1) y = data_notnull['y'].valuesclf = DecisionTreeClassifier(criterion='entropy',# “信息熵”最小化准则划分 max_depth=max_depth,# 树的深度 max_leaf_nodes=max_leaf_nodes,# 最大叶子节点数 min_samples_leaf=min_samples_leaf)# 叶子节点样本数量最小占比 clf.fit(x, y)# 训练决策树threshold = np.unique(clf.tree_.threshold)#阈值 x_num = np.unique(x)for i in threshold: if i != -2: point = np.round(max(x_num[x_num < i]), round_num)# 取切分点左边的数 cut_point.extend([point]) cut_point = [float(str(i)) for i in list(np.unique(cut_point))] cut_point = [-np.inf] + cut_point + [np.inf] return cut_point# ------------------------------------------------------------------------------------------------------------# 根据切分点切分变量数据 def get_cut_result(self, data, col_continuous_cut_points): cols = [i for i in data.columns if i not in [i[0] for i in col_continuous_cut_points]] data_cut_result = data[cols].copy() for col, cut_points in col_continuous_cut_points: data_cut_result[col] = pd.cut(data[col], cut_points).astype("str")data_cut_result = data_cut_result.fillna('null') data_cut_result.replace('nan', 'null', inplace=True) return data_cut_result# 获取按切分点的统计数据 def get_woe_iv(self, data_discrete, col): result = data_discrete.groupby(col)['y'].agg([('1_num', lambda y: (y == 1).sum()), ('0_num', lambda y: (y == 0).sum()), ('total_num', 'count')]).reset_index() result['1_pct'] = result['1_num'] / result['1_num'].sum() result['0_pct'] = result['0_num'] / result['0_num'].sum() result['total_pct'] = result['total_num'] / result['total_num'].sum() result['1_rate'] = result['1_num'] / result['total_num'] result['woe'] = np.log(result['1_pct'] / result['0_pct'])# WOE result['iv'] = (result['1_pct'] - result['0_pct']) * result['woe']# IV result.replace([-np.inf, np.inf], [0, 0], inplace=True) result['total_iv'] = result['iv'].sum() result = result.rename(columns={col: "cut_points"}) return result# 批量获取变量IV值 def get_iv(self, data): col_iv = [] for col in [i for i in data.columns if i != 'y']: col_woe_iv = self.get_woe_iv(data, col) col_iv.append([col, col_woe_iv['iv'].sum()]) return col_iv# 数据转换为woe def get_data_woe(self, data_discrete): data_woe = pd.DataFrame() for col in [i for i in data_discrete.columns if i != 'y']: col_woe_iv = self.get_woe_iv(data_discrete, col) data_woe[col] = data_discrete[col].replace(list(col_woe_iv['cut_points']), list(col_woe_iv['woe'])) data_woe['y'] = data_discrete['y'] return data_woe# 通过IV值和相关性以及逻辑回归选择变量(默认参数提出IV值>=0.1,相关系数>0.6,l1正则筛选的变量) def get_iv_corr_logistic_col(self, data_woe, col_iv, min_iv=0.1, max_corr=0.6, C=0.01, penalty='l2'): col_filter = [col for col, iv in col_iv if iv > min_iv] col_iv_filter = [[col, iv] for col, iv in col_iv if iv > min_iv] data_woe_corr = data_woe[col_filter].corr() data_woe_corr_list = data_woe_corr.values.reshape(-1, 1) col_iv_result = [] for col1, iv1 in col_iv_filter: for col2, iv2 in col_iv_filter: col_iv_result.append([col1, col2, iv1, iv2, iv1 - iv2])data_woe_corr_iv = pd.DataFrame(col_iv_result, columns=['col1', 'col2', 'iv1', 'iv2', 'iv1_iv2']) data_woe_corr_iv['corr'] = data_woe_corr_list # 剔除相关性较大,而IV值较低的变量 col_delete = data_woe_corr_iv['col1'][(data_woe_corr_iv['corr'] < 1) & (data_woe_corr_iv['corr'] > max_corr) & ( data_woe_corr_iv['iv1_iv2'] < 0)].unique() col_filter_result = [col for col in col_filter if col not in (col_delete)]# L2正则化筛选 lr = linear_model.LogisticRegression(C=C, penalty=penalty).fit(data_woe[col_filter_result], data_woe['y']) col_result = [col_filter_result[i] for i in range(len(col_filter_result)) if lr.coef_[0][i] != 0] return col_resultdef get_logistic_socre_card(self, data, col_continuous_cut_points, increase_score=50, base_score=600): col_types = self.get_col_discrete_continue(data) col_result = [i for i in data.columns if i != 'y'] data_discrete = self.get_cut_result(data, col_continuous_cut_points)# 按切分点划分数据,得到全部的离散数据 data_woe = self.get_data_woe(data_discrete)# 数据woe化 # 评分卡制作 lr = linear_model.LogisticRegression(C=1, penalty='l2') lr.fit(data_woe[col_result], data_woe['y']) b = -increase_score / np.log(2) # a = base_score - lr.intercept_[0] * bscore_card = pd.DataFrame() for col in col_result: col_cut_point_woe = self.get_woe_iv(data_discrete, col) col_cut_point_woe['col'] = col score_card = pd.concat([score_card, col_cut_point_woe])col_coef = pd.DataFrame(col_result, lr.coef_[0]).reset_index() col_coef.columns = ['col_coef', 'col'] score_card['lr_intercept'] = lr.intercept_[0] score_card = pd.merge(score_card, col_coef, on=['col'], how='left') score_card['score'] = score_card['woe'] * score_card['col_coef'] * b score_card = pd.merge(score_card, pd.DataFrame(col_types, columns=['col', 'type']), on='col', how='left') score_card = pd.merge(score_card, pd.DataFrame(col_continuous_cut_points, columns=['col', 'cuts']), on='col',how='left')# 切分点排序 data_cut_points_id = pd.DataFrame() for col, cut_point in col_continuous_cut_points: result = pd.DataFrame() result['cut_points'] = pd.cut(data[col], cut_point).astype('str').unique() result['cut_points_id'] = pd.cut(data[col], cut_point).unique()._codes result['cut_points'].replace('nan', 'null', inplace=True) result['col'] = col data_cut_points_id = pd.concat([data_cut_points_id, result]) score_card = pd.merge(score_card, data_cut_points_id, on=['col', 'cut_points'], how='left').sort_values( ['col', 'cut_points_id', 'cut_points'])score_card = score_card[ ['col', 'type', 'cuts', 'cut_points', '1_num', '0_num', 'total_num', '1_pct', '0_pct', 'total_pct', '1_rate', 'woe', 'iv', 'total_iv', 'col_coef', 'lr_intercept', 'score']].reset_index(drop=True) # score_card = score_card[ #['变量名', '变量类型', '切分点', '切分分组', 'y为1的数量', 'y为0的数量', '总数', 'y为1的数量占比', 'y为0的数量占比', '总数占比', #'y为1占总数比例', 'woe', '各分组iv', '变量iv值', 'logistic参数col_coef', 'logistic参数lr_intercept', '分组分数']].reset_index(drop=True) return score_carddef fit(self, data): data = https://www.it610.com/article/data.round(self.round_num)# 保留两位小数# -----------------------------------------变量筛选-------------------------------------------------------------- # 划分离散和连续变量 col_types = self.get_col_discrete_continue(data)col_continuous_cut_points = [] for col, col_type in col_types: if col_type =='continuous': point = self.get_cut_point(data[[col, 'y']], col, self.max_depth, self.max_leaf_nodes, self.min_samples_leaf,self.round_num) if point: col_continuous_cut_points.append([col, point]) #else: #col_cut_points.append([col,'discrete',None]) self.col_continuous_cut_points = col_continuous_cut_points# 连续变量的切分点,按小于等于,大于切分,空值单独归位一类,例如:['scorecashon', [-inf, 654.0, 733.0, 754.0, inf]]data_discrete = self.get_cut_result(data, col_continuous_cut_points)# 按切分点划分数据,得到全部的离散数据 col_iv = self.get_iv(data_discrete)# 各变量IV值 col_type_iv = pd.merge(pd.DataFrame(col_types, columns=['col', 'type']),pd.DataFrame(col_iv, columns=['col', 'iv']), on='col', how='left') self.col_type_iv = col_type_iv# 计算连续变量离散化后的IV值 data_woe = self.get_data_woe(data_discrete)# 数据woe化 col_result = self.get_iv_corr_logistic_col(data_woe, col_iv, min_iv=self.min_iv, max_corr=self.max_corr, C=self.C,penalty=self.penalty)# 变量筛选 self.col_result = col_result# 最终评分卡选择的变量# -----------------------------------------评分卡制作--------------------------------------------------------------col_result_continuous_cut_points = [col for col in col_continuous_cut_points if col[0] in col_result] score_card=self.get_logistic_socre_card(data[col_result + ['y']], col_result_continuous_cut_points, increase_score=self.increase_score, base_score=self.increase_score) self.score_card = score_card# 评分卡结果# def predict_score_proba(self, data, score_card, increase_score=50, base_score=600): b = -increase_score / np.log(2) a = base_score - score_card['lr_intercept'][0] * b col_result = score_card['col'].unique().tolist() + ['y'] col_continuous_cut_points = score_card[['col', 'cuts']][score_card['type'] == 'continuous'].drop_duplicates('col').values.tolist() data_discrete = self.get_cut_result(data[col_result], col_continuous_cut_points) data_score_proba = pd.DataFrame() for col in score_card['col'].unique(): col_score = col + 'score' cut_points = score_card['cut_points'][score_card['col'] == col].tolist() score = score_card['score'][score_card['col'] == col].tolist() data_score_proba[col_score] = data_discrete[col].replace(cut_points, score) data_score_proba['score'] = data_score_proba.sum(axis=1)+ score_card['lr_intercept'][0] * b + a data_score_proba['proba'] = 1 - 1 / (1 + np.e ** ((data_score_proba['score'] - a) / b)) return data_score_probadef score(self, data, score_card): data_score_proba = self.predict_score_proba(data, score_card) false_positive_rate, recall, thresholds = roc_curve(data['y'], data_score_proba['proba']) roc_auc = auc(false_positive_rate, recall) ks = max(recall - false_positive_rate) result = {} result['auc'] = roc_auc result['ks'] = ks return result

【python|【Python】基于逻辑回归的金融风控评分卡】参考资料:
【1】https://zhuanlan.zhihu.com/p/148102950
【2】https://blog.csdn.net/csqazwsxedc/article/details/87982257
【3】https://blog.csdn.net/csqazwsxedc/article/details/51225156

    推荐阅读