A Machine Learning Approach to Credit Risk Forecasting: A Lending Club Case Study


Introduction


This case study details the end-to-end development of a predictive model designed to identify potential loan defaulters from the publicly available Lending Club dataset. The project encapsulates the entire data science lifecycle, employing advanced feature engineering and multiple classification algorithms to build a system capable of accurately forecasting default probability, thereby addressing a critical business need in the fintech lending sector.


We will use classifiers to predict a loan default by users. For this, we will use a real-world dataset provided by Lending Club. Lending Club is a fintech firm that has publicly available data on its website. If you are interested, you can collect dataset from Kaggle from this link. Lending Club Dataset. Lending Club Dataset. The data is helpful for analytical studies and it contains hundreds of features. Looking into all the features is out of the scope of our study. Therefore, we will only use a subset of features for our predictions. The this study will give us an idea about how real business problems are solved using EDA and Machine Learning.
We will use Apache Spark APIs in Scala in Zeppelin-Notebook on Linux Ubuntu 22.04.


Import Libraries


%spark
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, count, isnan, when}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, count, isnan, when}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

Load Data


%spark
// Define the file location and selected columns
val fileLocation = "hdfs://localhost:9000/user/hduser/data/accepted_2007_to_2018Q4.csv.gz"
val selectedColumns = Array(
    "id",
    "purpose",
    "term",
    "verification_status",
    "acc_now_delinq",
    "addr_state",
    "annual_inc",
    "application_type",
    "dti",
    "grade",
    "home_ownership",
    "initial_list_status",
    "installment",
    "int_rate",
    "loan_amnt",
    "loan_status",
    "tax_liens",
    "delinq_amnt",
    "pub_rec",
    "last_fico_range_high",
    "last_fico_range_low",
    "recoveries",
    "collection_recovery_fee"
)

// Load the CSV file with the selected columns
val dfraw: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("compression", "gzip")
    .load(fileLocation)
    .select(selectedColumns.head, selectedColumns.tail: _*)

// Show the first few rows
dfraw.show(5)
+--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|      id|           purpose|      term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type|  dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|
+--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|68407277|debt_consolidation| 36 months|       Not Verified|           0.0|        PA|   55000.0|      Individual| 5.91|    C|      MORTGAGE|                  w|     123.03|   13.99|   3600.0| Fully Paid|      0.0|        0.0|    0.0|               564.0|              560.0|       0.0|                    0.0|
|68355089|    small_business| 36 months|       Not Verified|           0.0|        SD|   65000.0|      Individual|16.06|    C|      MORTGAGE|                  w|     820.28|   11.99|  24700.0| Fully Paid|      0.0|        0.0|    0.0|               699.0|              695.0|       0.0|                    0.0|
|68341763|  home_improvement| 60 months|       Not Verified|           0.0|        IL|   63000.0|       Joint App|10.78|    B|      MORTGAGE|                  w|     432.66|   10.78|  20000.0| Fully Paid|      0.0|        0.0|    0.0|               704.0|              700.0|       0.0|                    0.0|
|66310712|debt_consolidation| 60 months|    Source Verified|           0.0|        NJ|  110000.0|      Individual|17.06|    C|      MORTGAGE|                  w|      829.9|   14.85|  35000.0|    Current|      0.0|        0.0|    0.0|               679.0|              675.0|       0.0|                    0.0|
|68476807|    major_purchase| 60 months|    Source Verified|           0.0|        PA|  104433.0|      Individual|25.37|    F|      MORTGAGE|                  w|     289.91|   22.45|  10400.0| Fully Paid|      0.0|        0.0|    0.0|               704.0|              700.0|       0.0|                    0.0|
+--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
only showing top 5 rows

fileLocation: String = hdfs://localhost:9000/user/hduser/data/accepted_2007_to_2018Q4.csv.gz
selectedColumns: Array[String] = Array(id, purpose, term, verification_status, acc_now_delinq, addr_state, annual_inc, application_type, dti, grade, home_ownership, initial_list_status, installment, int_rate, loan_amnt, loan_status, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee)
dfraw: org.apache.spark.sql.DataFrame = [id: string, purpose: string ... 21 more fields]

%spark
println(s"Shape of the DataFrame: (${dfraw.count()}, ${dfraw.columns.length})")
Shape of the DataFrame: (2260701, 23)

Missing values


%spark
// Count null values for each column
val nullCounts = dfraw.select(
  dfraw.columns.map(c =>
    count(when(col(c).isNull || isnan(col(c)), true)).alias(c)
  ): _*
)

nullCounts.show()
+---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
| id|purpose|term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type| dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|
+---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|  0|     34|  33|                 33|           221|        34|        37|              88|1745|   33|            33|                 50|         33|      33|       33|         33|      338|        258|    125|                  76|                 68|        35|                     39|
+---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+

nullCounts: org.apache.spark.sql.DataFrame = [id: bigint, purpose: bigint ... 21 more fields]

Droping all missing values


%spark
// Drop rows with any null values
val dfCleaned = dfraw.na.drop()
dfCleaned: org.apache.spark.sql.DataFrame = [id: string, purpose: string ... 21 more fields]

%spark
println(s"Shape of the DataFrame: (${dfCleaned.count()}, ${dfCleaned.columns.length})")
Shape of the DataFrame: (2258596, 23)

%spark
dfCleaned.printSchema()
root
 |-- id: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- term: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- acc_now_delinq: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- tax_liens: double (nullable = true)
 |-- delinq_amnt: double (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- last_fico_range_high: string (nullable = true)
 |-- last_fico_range_low: string (nullable = true)
 |-- recoveries: string (nullable = true)
 |-- collection_recovery_fee: string (nullable = true)


Exploratory Data Analysis


Target feature: The loan_status feature, which is our target variable, contains other values than Fully Paid and Charged Off. Therefore, we consider to encode all. Inspecting “loan_status” column unique values


%spark
val uniqueLoanStatuses = dfCleaned.select("loan_status").distinct()
uniqueLoanStatuses.show()
+--------------------+
|         loan_status|
+--------------------+
|          Fully Paid|
|             Default|
|     In Grace Period|
|Does not meet the...|
|         Charged Off|
|  Late (31-120 days)|
|             Current|
|Does not meet the...|
|   Late (16-30 days)|
+--------------------+

uniqueLoanStatuses: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_status: string]

Current: Applicant is in the process of paying the instalments, i.e. the tenure of the loan is not yet completed. These candidates are not labelled as ‘defaulted’. Dropping the current customers as they are not required for driving factors consideration. Also id column is not required.


%spark
val filteredDF = dfCleaned.filter(col("loan_status") =!= "Current")

// Show the result
// filteredDF.show()
val uniqueLoanStatus = filteredDF.select("loan_status").distinct()
uniqueLoanStatus.show()
+--------------------+
|         loan_status|
+--------------------+
|          Fully Paid|
|             Default|
|     In Grace Period|
|Does not meet the...|
|         Charged Off|
|  Late (31-120 days)|
|Does not meet the...|
|   Late (16-30 days)|
+--------------------+

filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, purpose: string ... 21 more fields]
uniqueLoanStatus: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_status: string]

%spark
val filteredDF1 = filteredDF.drop("id")
filteredDF1.show(5)
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|           purpose|      term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type|  dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|debt_consolidation| 36 months|       Not Verified|           0.0|        PA|   55000.0|      Individual| 5.91|    C|      MORTGAGE|                  w|     123.03|   13.99|   3600.0| Fully Paid|      0.0|        0.0|    0.0|               564.0|              560.0|       0.0|                    0.0|
|    small_business| 36 months|       Not Verified|           0.0|        SD|   65000.0|      Individual|16.06|    C|      MORTGAGE|                  w|     820.28|   11.99|  24700.0| Fully Paid|      0.0|        0.0|    0.0|               699.0|              695.0|       0.0|                    0.0|
|  home_improvement| 60 months|       Not Verified|           0.0|        IL|   63000.0|       Joint App|10.78|    B|      MORTGAGE|                  w|     432.66|   10.78|  20000.0| Fully Paid|      0.0|        0.0|    0.0|               704.0|              700.0|       0.0|                    0.0|
|    major_purchase| 60 months|    Source Verified|           0.0|        PA|  104433.0|      Individual|25.37|    F|      MORTGAGE|                  w|     289.91|   22.45|  10400.0| Fully Paid|      0.0|        0.0|    0.0|               704.0|              700.0|       0.0|                    0.0|
|debt_consolidation| 36 months|    Source Verified|           0.0|        GA|   34000.0|      Individual| 10.2|    C|          RENT|                  w|     405.18|   13.44|  11950.0| Fully Paid|      0.0|        0.0|    0.0|               759.0|              755.0|       0.0|                    0.0|
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
only showing top 5 rows

filteredDF1: org.apache.spark.sql.DataFrame = [purpose: string, term: string ... 20 more fields]

%spark
println(s"Shape of the DataFrame: (${filteredDF1.count()}, ${filteredDF1.columns.length})")
Shape of the DataFrame: (1381578, 22)

Reduce the data size for visualization to fasten following steps, otherwise the memory will soon run out

%spark
// Sample 100,000 rows from the DataFrame
val filtered_DF = filteredDF1.orderBy(rand()).limit(100000)

// Optionally, you can cache the sampled DataFrame for faster reuse
filtered_DF.cache()
filtered_DF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [purpose: string, term: string ... 20 more fields]
res11: filtered_DF.type = [purpose: string, term: string ... 20 more fields]

Heatmaps of our dataset.

%spark
// Identify categorical columns (assuming they are of StringType)
val categoricalCols = filtered_DF.schema.fields
  .filter(_.dataType.typeName == "string")
  .map(_.name)

// Apply StringIndexer to each categorical column
var dataEncoded = filtered_DF
for (colName <- categoricalCols) {
  val indexer = new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(s"${colName}_encoded")
    .fit(dataEncoded)
  dataEncoded = indexer.transform(dataEncoded).drop(colName).withColumnRenamed(s"${colName}_encoded", colName)
}

// Show the encoded DataFrame
// dataEncoded.show(5)
categoricalCols: Array[String] = Array(purpose, term, verification_status, acc_now_delinq, addr_state, annual_inc, application_type, dti, grade, home_ownership, initial_list_status, loan_status, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee)
dataEncoded: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [installment: double, int_rate: double ... 20 more fields]

%spark
// Write the DataFrame to a temporary file
val tempPath = "/tmp/encoded_data.csv"

dataEncoded.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(tempPath)
tempPath: String = /tmp/encoded_data.csv

%spark.pyspark
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("tempfile") \
    .master("local[*]") \
    .getOrCreate()

sparkDF= spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("/tmp/encoded_data.csv")
  
# sparkDF.show(5)

%spark.pyspark

# Read the CSV file into Pandas
pandasDF = sparkDF.toPandas()

# Plot the heatmap
plt.figure(figsize=(21, 14))
sns.heatmap(pandasDF.corr(), annot=True, cmap='viridis', fmt=".2f", linewidths=0.5)
plt.title("Correlation Heatmap (Full Data)", fontsize=18, pad=12)
plt.tight_layout()
plt.show()

%spark
filtered_DF.show(5)
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|           purpose|      term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type|  dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
|debt_consolidation| 60 months|           Verified|           0.0|        CA|   96000.0|      Individual|17.03|    C|          RENT|                  f|     321.55|   16.78|  13000.0| Fully Paid|      0.0|        0.0|    0.0|               694.0|              690.0|       0.0|                    0.0|
|debt_consolidation| 36 months|       Not Verified|           0.0|        GA|   75000.0|      Individual|19.22|    A|      MORTGAGE|                  f|     557.94|    7.26|  18000.0| Fully Paid|      0.0|        0.0|    0.0|               729.0|              725.0|       0.0|                    0.0|
|  home_improvement| 36 months|       Not Verified|           0.0|        NY|   50000.0|       Joint App|37.21|    B|      MORTGAGE|                  w|     483.45|    9.92|  15000.0| Fully Paid|      0.0|        0.0|    0.0|               699.0|              695.0|       0.0|                    0.0|
|          vacation| 36 months|       Not Verified|           0.0|        CA|   60000.0|      Individual| 11.4|    B|          RENT|                  f|      70.25|   12.49|   2100.0| Fully Paid|      0.0|        0.0|    0.0|               724.0|              720.0|       0.0|                    0.0|
|debt_consolidation| 36 months|       Not Verified|           0.0|        GA|   63000.0|      Individual|10.42|    A|      MORTGAGE|                  f|     281.58|    7.89|   9000.0| Fully Paid|      0.0|        0.0|    1.0|               794.0|              790.0|       0.0|                    0.0|
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+
only showing top 5 rows


%spark

// Define good loan statuses
val Good_Loan_statuses = Seq(
  "Fully Paid",
  "In Grace Period",
  "Does not meet the credit policy. Status:Fully Paid"
)

// Create a broadcast variable (efficient for large data)
val goodLoanSet = spark.sparkContext.broadcast(Good_Loan_statuses.toSet)

// Update loan_status column
val filtered_df = filtered_DF.withColumn(
  "loan_status",
  when(col("loan_status").isin(Good_Loan_statuses: _*), "Good Loan")
    .otherwise("Bad Loan")
)

// Show unique updated values
filtered_df.select("loan_status").distinct().show(false)
+-----------+
|loan_status|
+-----------+
|Good Loan  |
|Bad Loan   |
+-----------+

Good_Loan_statuses: Seq[String] = List(Fully Paid, In Grace Period, Does not meet the credit policy. Status:Fully Paid)
goodLoanSet: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Set[String]] = Broadcast(61)
filtered_df: org.apache.spark.sql.DataFrame = [purpose: string, term: string ... 20 more fields]

%spark
// Group and filter
val loanStat = filtered_df
  .groupBy("loan_status")
  .count()
  .filter(col("loan_status").isin("Good Loan", "Bad Loan"))

// Display as table (Zeppelin will allow bar chart)
// Convert to JSON for export
val jsonStr = loanStat.toJSON.collect().mkString("[", ",", "]")

// Store inside notebook (key must be loanstat)
z.put("loanstat", jsonStr)

z.show(loanStat)
loan_status	count
Good Loan	78663
Bad Loan	21337
loanStat: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_status: string, count: bigint]
jsonStr: String = [{"loan_status":"Good Loan","count":78663},{"loan_status":"Bad Loan","count":21337}]

%spark.pyspark
# Retrieve JSON string from Scala paragraph
json_list = z.get("loanstat")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Convert JSON string into RDD with 1 element
rdd = spark.sparkContext.parallelize([json_list])

# Load into DataFrame
df_python = spark.read.json(rdd)

df_python.show()
+-----+-----------+
|count|loan_status|
+-----+-----------+
|78663|  Good Loan|
|21337|   Bad Loan|
+-----+-----------+


%spark.pyspark
# Convert to pandas
loanStatPan = df_python.toPandas()

plt.bar(loanStatPan['loan_status'], loanStatPan['count'], color='lightblue')
plt.title('Loan Status Counts')
plt.xlabel('loan_status')
plt.ylabel('Counts')
plt.tight_layout()
plt.show()

%spark
// Count loans by state and order by count (descending)
val stateCounts = filtered_df
  .groupBy("addr_state")
  .agg(count("*").alias("count"))
  .orderBy(desc("count"))

// Show the result in Zeppelin
z.show(stateCounts)
addr_state	count
CA	14473
TX	8163
NY	8084
FL	7102
IL	3787
NJ	3606
PA	3436
GA	3354
OH	3321
VA	2845
NC	2792
MI	2640
MD	2380
AZ	2373
MA	2301
CO	2280
WA	2108
MN	1835
IN	1628
MO	1610
TN	1580
NV	1493
CT	1449
WI	1309
AL	1213
LA	1212
SC	1202
OR	1188
OK	919
KY	904
KS	805
UT	771
AR	736
HI	532
NM	514
NH	472
MS	466
RI	445
WV	351
DE	301
MT	289
NE	267
DC	248
AK	229
SD	225
WY	194
VT	186
ID	131
ME	127
ND	122
IA	2
stateCounts: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [addr_state: string, count: bigint]

%spark
// Count the occurrences of each grade by loan_status
val gradeCounts = filtered_df
  .groupBy("grade", "loan_status")
  .agg(count("*").alias("count"))
  .orderBy(col("grade"))  // Use col("grade") instead of "grade"

// Pivot the data to have loan_status as columns
val gradePivot = gradeCounts
  .groupBy("grade")
  .pivot("loan_status")
  .agg(sum("count"))
  .orderBy(col("grade"))  // Use col("grade") instead of "grade"
  .na.fill(0)  // Replace nulls with 0

// Show the result in Zeppelin
z.show(gradePivot)
grade	Bad Loan	Good Loan
A	1103	15881
B	4280	24933
C	6953	21819
D	4840	10142
E	2743	4245
F	1069	1324
G	349	319
gradeCounts: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [grade: string, loan_status: string ... 1 more field]
gradePivot: org.apache.spark.sql.DataFrame = [grade: string, Bad Loan: bigint ... 1 more field]

%spark
// Count the occurrences of each term by loan_status
val termCounts = filtered_df
  .groupBy("term", "loan_status")
  .agg(count("*").alias("count"))
  .orderBy("term")

// Pivot the data to have loan_status as columns
val termPivot = termCounts
  .groupBy("term")
  .pivot("loan_status")
  .agg(sum("count"))
  .orderBy("term")
  .na.fill(0)  // Replace nulls with 0

// Show the result in Zeppelin
z.show(termPivot)
term	Bad Loan	Good Loan
 36 months	12954	62482
 60 months	8383	16181
termCounts: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [term: string, loan_status: string ... 1 more field]
termPivot: org.apache.spark.sql.DataFrame = [term: string, Bad Loan: bigint ... 1 more field]

%spark
val filtered = filtered_df.filter($"loan_status".isin("Good Loan", "Bad Loan"))

val jsonRDD = filtered
  .select(to_json(struct(filtered.columns.map(col): _*)).alias("json"))
  .rdd
  .map(_.getString(0))
  .take(100000) // limit rows to avoid memory issues

z.put("scala_json", jsonRDD)
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [purpose: string, term: string ... 20 more fields]
jsonRDD: Array[String] = Array({"purpose":"debt_consolidation","term":" 60 months","verification_status":"Verified","acc_now_delinq":"0.0","addr_state":"CA","annual_inc":"96000.0","application_type":"Individual","dti":"17.03","grade":"C","home_ownership":"RENT","initial_list_status":"f","installment":321.55,"int_rate":16.78,"loan_amnt":13000.0,"loan_status":"Good Loan","tax_liens":0.0,"delinq_amnt":0.0,"pub_rec":"0.0","last_fico_range_high":"694.0","last_fico_range_low":"690.0","recoveries":"0.0","collection_recovery_fee":"0.0"}, {"purpose":"debt_consolidation","term":" 36 months","verification_status":"Not Verified","a...

%spark.pyspark

json_list = z.get("scala_json")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Load the JSON strings into a PySpark DataFrame
df_python = spark.read.json(spark.sparkContext.parallelize(json_list))

# df_python.show(5)

%spark.pyspark
pdf = df_python.select("loan_amnt", "loan_status").toPandas()

plt.boxplot([
    pdf[pdf.loan_status=="Good Loan"].loan_amnt,
    pdf[pdf.loan_status=="Bad Loan"].loan_amnt
], labels=["Good Loan", "Bad Loan"])

plt.title("Loan Status vs Loan Amount")
plt.xlabel("Loan Status")
plt.ylabel("Loan Amount")
plt.show()

%spark.pyspark
# Convert to Pandas for plotting
pandas_df = df_python.select('installment', 'loan_status').toPandas()
fig, ax = plt.subplots(figsize=(8, 5))

# Installment - Histograms
good_loan_inst = pandas_df[pandas_df['loan_status'] == 'Good Loan']['installment']
bad_loan_inst = pandas_df[pandas_df['loan_status'] == 'Bad Loan']['installment']

ax.hist([good_loan_inst, bad_loan_inst], bins=20, label=['Good Loan', 'Bad Loan'], alpha=0.7)
ax.set_title('Installment Distribution by Status')
ax.set_xlabel('Installment')
ax.set_ylabel('Frequency')
ax.legend()


Data PreProcessing


Group by ‘purpose’ and count the occurrences


%spark
val dfWithCount = filteredDF1.groupBy("purpose").count()

// Show the result
dfWithCount.show()
+------------------+------+
|           purpose| count|
+------------------+------+
|           wedding|  2346|
|       educational|   403|
|             other| 80690|
|    small_business| 16061|
|debt_consolidation|801085|
|       credit_card|301993|
|            moving|  9796|
|          vacation|  9304|
|  renewable_energy|   955|
|             house|  7578|
|               car| 14941|
|    major_purchase| 30392|
|           medical| 16049|
|  home_improvement| 89985|
+------------------+------+

dfWithCount: org.apache.spark.sql.DataFrame = [purpose: string, count: bigint]

Replacing values in the ‘purpose’ column based on the ‘count’ column condition If ‘count’ is less than 300, set ‘purpose’ to “other”, else keep the original ‘purpose’


%spark
// Join the DataFrame with df_with_count on 'purpose' using a left join
val joinedDF = filteredDF1.join(dfWithCount, Seq("purpose"), "left")

// Replace 'purpose' with "other" if count is less than 300
val updatedDF = joinedDF.withColumn(
  "purpose",
  when(col("count") < 300, "other").otherwise(col("purpose"))
).drop("count")

// Get unique purposes
val uniquePurposes = updatedDF.select("purpose").distinct()

// Show unique purposes
uniquePurposes.show()
+------------------+
|           purpose|
+------------------+
|           wedding|
|       educational|
|             other|
|    small_business|
|debt_consolidation|
|       credit_card|
|            moving|
|          vacation|
|  renewable_energy|
|             house|
|               car|
|    major_purchase|
|           medical|
|  home_improvement|
+------------------+

joinedDF: org.apache.spark.sql.DataFrame = [purpose: string, term: string ... 21 more fields]
updatedDF: org.apache.spark.sql.DataFrame = [purpose: string, term: string ... 20 more fields]
uniquePurposes: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [purpose: string]

Group by ‘term’ and count the occurrences


%spark
// updatedDF.show()
val termCounts = updatedDF.groupBy("term").count()

// Show the result
termCounts.show()
+----------+-------+
|      term|  count|
+----------+-------+
| 36 months|1042464|
| 60 months| 339114|
+----------+-------+

termCounts: org.apache.spark.sql.DataFrame = [term: string, count: bigint]

Applying a regular expression to extract numbers from the ‘term’ column and then casting it to the Integer data type.


%spark
// Extract the numeric part from the 'term' column and cast it to Int
val updatedDF1 = updatedDF.withColumn("term", regexp_extract(col("term"), "(\\d+)", 0).cast("int"))
updatedDF1: org.apache.spark.sql.DataFrame = [purpose: string, term: int ... 20 more fields]

Group by ‘verification_status’ and count the occurrences


%spark
val verificationStatusCounts = updatedDF1.groupBy("verification_status").count()

// Show the result
verificationStatusCounts.show()
+-------------------+------+
|verification_status| count|
+-------------------+------+
|           Verified|429534|
|    Source Verified|535993|
|       Not Verified|416051|
+-------------------+------+

verificationStatusCounts: org.apache.spark.sql.DataFrame = [verification_status: string, count: bigint]

Encode ‘verification_status’ column values into a new column ‘verification_status_encoded’ If ‘verification_status’ is either “Verified” or “Source Verified”, set ‘verification_status_encoded’ to 0 Otherwise, set it to 1


%spark
// Encode 'verification_status' as 0 for "Verified" or "Source Verified", otherwise 1
val updatedDF2 = updatedDF1.withColumn(
  "verification_status_encoded",
  when(col("verification_status").isin("Verified", "Source Verified"), 0)
    .otherwise(1)
).drop("verification_status")

// Group by 'verification_status_encoded' and count the occurrences
val encodedCounts = updatedDF2.groupBy("verification_status_encoded").count()

// Show the result
encodedCounts.show()
+---------------------------+------+
|verification_status_encoded| count|
+---------------------------+------+
|                          1|416051|
|                          0|965527|
+---------------------------+------+

updatedDF2: org.apache.spark.sql.DataFrame = [purpose: string, term: int ... 20 more fields]
encodedCounts: org.apache.spark.sql.DataFrame = [verification_status_encoded: int, count: bigint]

Group by ‘acc_now_delinq’ and count the occurrences


%spark
val acc_now_delinqCounts = updatedDF2.groupBy("acc_now_delinq").count()

// Show the result
acc_now_delinqCounts.show()
+--------------+-------+
|acc_now_delinq|  count|
+--------------+-------+
|           1.0|   6106|
|           0.0|1375104|
|          14.0|      1|
|           5.0|      3|
|           6.0|      1|
|           4.0|     10|
|           2.0|    311|
|           3.0|     42|
+--------------+-------+

acc_now_delinqCounts: org.apache.spark.sql.DataFrame = [acc_now_delinq: string, count: bigint]

Define the valid values for 'acc_now_delinq' valid_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Modify the 'acc_now_delinq' column: 1. Cast the column to IntegerType 2. Set values greater than or equal to 4 to 4, and keep other valid values as they are

Define the valid values for ‘acc_now_delinq’
valid_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Modify the ‘acc_now_delinq’ column:

  1. Cast the column to IntegerType
  2. Set values greater than or equal to 4 to 4, and keep other valid values as they are

%spark
// Define valid values
val validValues = Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Cast 'acc_now_delinq' to Int, cap values >= 4 to 4, and filter for valid values
val updatedDF3 = updatedDF2
  .withColumn("acc_now_delinq", col("acc_now_delinq").cast("int"))
  .withColumn("acc_now_delinq", when(col("acc_now_delinq") >= 4, 4).otherwise(col("acc_now_delinq")))
  .filter(col("acc_now_delinq").isin(validValues: _*))

// Group by 'acc_now_delinq' and count the occurrences
val delinqCounts = updatedDF3.groupBy("acc_now_delinq").count()

// Show the result
delinqCounts.show()
+--------------+-------+
|acc_now_delinq|  count|
+--------------+-------+
|             1|   6106|
|             3|     42|
|             4|     15|
|             2|    311|
|             0|1375104|
+--------------+-------+

validValues: Seq[Int] = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
updatedDF3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [purpose: string, term: int ... 20 more fields]
delinqCounts: org.apache.spark.sql.DataFrame = [acc_now_delinq: int, count: bigint]

Group by ‘application_type’ and count the occurrences


%spark
val application_typeCounts = updatedDF3.groupBy("application_type").count()

// Show the result
application_typeCounts.show()
+----------------+-------+
|application_type|  count|
+----------------+-------+
|       Joint App|  29172|
|      Individual|1352406|
+----------------+-------+

application_typeCounts: org.apache.spark.sql.DataFrame = [application_type: string, count: bigint]

Define the valid values for 'application_type' valid_values = ['Joint App', 'Individual']

Modify the 'application_type' column: 1. Map 'Joint App' to 0 and 'Individual' to 1 2. Remove other values 3. Cast the column to IntegerType

Define the valid values for ‘application_type’
valid_values = [‘Joint App’, ‘Individual’]

Modify the ‘application_type’ column:

  1. Map ‘Joint App’ to 0 and ‘Individual’ to 1
  2. Remove other values
  3. Cast the column to IntegerType

%spark
// Define valid values
val validValues = Seq("Joint App", "Individual")

// Encode 'application_type' as 0 for "Joint App", 1 for "Individual", otherwise null
val updatedDF4 = updatedDF3.withColumn(
  "application_type",
  when(col("application_type") === "Joint App", 0)
    .when(col("application_type") === "Individual", 1)
    .otherwise(null)
)

// Filter out null values and cast 'application_type' to Int
val updatedDF5 = updatedDF4
  .filter(col("application_type").isNotNull)
  .withColumn("application_type", col("application_type").cast("int"))
validValues: Seq[String] = List(Joint App, Individual)
updatedDF4: org.apache.spark.sql.DataFrame = [purpose: string, term: int ... 20 more fields]
updatedDF5: org.apache.spark.sql.DataFrame = [purpose: string, term: int ... 20 more fields]

Group by ‘grade’ and count the occurrences


%spark
val gradeCounts = updatedDF5.groupBy("grade").count()

// Show the result
gradeCounts.show()
+-----+------+
|grade| count|
+-----+------+
|    F| 33289|
|    E| 97494|
|    B|400448|
|    D|209155|
|    C|394127|
|    A|237375|
|    G|  9690|
+-----+------+

gradeCounts: org.apache.spark.sql.DataFrame = [grade: string, count: bigint]

StringIndexer to convert ‘grade’ column into numerical indices


%spark
// Create a StringIndexer for the 'grade' column
val gradeIndexer = new StringIndexer()
  .setInputCol("grade")
  .setOutputCol("grade_index")
  .setStringOrderType("alphabetAsc")

// Fit and transform the DataFrame, then drop the original 'grade' column
val updatedDF6 = gradeIndexer.fit(updatedDF5).transform(updatedDF5).drop("grade")
gradeIndexer: org.apache.spark.ml.feature.StringIndexer = strIdx_ef68e929d71f
updatedDF6: org.apache.spark.sql.DataFrame = [purpose: string, term: int ... 20 more fields]

%spark
// Define a function to one-hot encode a column
def oneHotEncodeColumn(df: org.apache.spark.sql.DataFrame, inputCol: String): org.apache.spark.sql.DataFrame = {
  // StringIndexer: Convert categorical values to indices
  val indexer = new StringIndexer()
    .setInputCol(inputCol)
    .setOutputCol(inputCol + "_indexed")

  val indexedDF = indexer.fit(df).transform(df)

  // OneHotEncoder: Convert indices to one-hot encoded vectors
  val encoder = new OneHotEncoder()
    .setInputCol(inputCol + "_indexed")
    .setOutputCol(inputCol + "_encoded")

  val encodedDF = encoder.fit(indexedDF).transform(indexedDF)

  // Drop the original and indexed columns
  encodedDF.drop(inputCol, inputCol + "_indexed")
}
oneHotEncodeColumn: (df: org.apache.spark.sql.DataFrame, inputCol: String)org.apache.spark.sql.DataFrame

%spark

// List of columns to encode
val columnsToEncode = Array("purpose", "addr_state", "home_ownership", "initial_list_status")

// Apply one-hot encoding to each column
var encodedDF = updatedDF6
for (column <- columnsToEncode) {
  encodedDF = oneHotEncodeColumn(encodedDF, column)
}
columnsToEncode: Array[String] = Array(purpose, addr_state, home_ownership, initial_list_status)
encodedDF: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 20 more fields]

Cast to float type


%spark
// List of columns to cast to Float
val columnsToCast = Array(
  "installment",
  "int_rate",
  "loan_amnt",
  "annual_inc",
  "dti",
  "tax_liens",
  "delinq_amnt",
  "pub_rec",
  "last_fico_range_high",
  "last_fico_range_low",
  "recoveries",
  "collection_recovery_fee"
)

// Cast each column to Float
var castedDF = encodedDF
for (columnName <- columnsToCast) {
  castedDF = castedDF.withColumn(columnName, col(columnName).cast(FloatType))
}
columnsToCast: Array[String] = Array(installment, int_rate, loan_amnt, annual_inc, dti, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee)
castedDF: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 20 more fields]

%spark
castedDF.printSchema()
root
 |-- term: integer (nullable = true)
 |-- acc_now_delinq: integer (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- application_type: integer (nullable = true)
 |-- dti: float (nullable = true)
 |-- installment: float (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- tax_liens: float (nullable = true)
 |-- delinq_amnt: float (nullable = true)
 |-- pub_rec: float (nullable = true)
 |-- last_fico_range_high: float (nullable = true)
 |-- last_fico_range_low: float (nullable = true)
 |-- recoveries: float (nullable = true)
 |-- collection_recovery_fee: float (nullable = true)
 |-- verification_status_encoded: integer (nullable = false)
 |-- grade_index: double (nullable = false)
 |-- purpose_encoded: vector (nullable = true)
 |-- addr_state_encoded: vector (nullable = true)
 |-- home_ownership_encoded: vector (nullable = true)
 |-- initial_list_status_encoded: vector (nullable = true)


Encode ‘loan_status’ to 0 & 1, rename ‘loan_status’ to target and cast to Int.


%spark
val updatedcastedDF = castedDF.withColumn(
  "target",
  when(col("loan_status") === "Fully Paid", 0)
    .when(col("loan_status").isin("In Grace Period"), 0)
    .when(col("loan_status") === "Does not meet the credit policy. Status:Fully Paid", 0)
    .when(
      col("loan_status").isin(
        "Does not meet the credit policy. Status:Charged Off",
        "Charged Off",
        "Late (16-30 days)",
        "Late (31-120 days)",
        "Default"
      ), 1
    )
    .otherwise(null)
).drop("loan_status")

val finalDF = updatedcastedDF
  .filter(col("target").isNotNull)
  .withColumn("target", col("target").cast("int"))
updatedcastedDF: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 20 more fields]
finalDF: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 20 more fields]

%spark
val targetdf = finalDF.groupBy("target").count()
targetdf.show()
+------+-------+
|target|  count|
+------+-------+
|     1| 295020|
|     0|1086558|
+------+-------+

targetdf: org.apache.spark.sql.DataFrame = [target: int, count: bigint]

%spark
z.show(targetdf)
target	count
1	295020
0	1086558

%spark
finalDF.show(3)
+----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+
|term|acc_now_delinq|annual_inc|application_type|  dti|installment|int_rate|loan_amnt|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|verification_status_encoded|grade_index|purpose_encoded|addr_state_encoded|home_ownership_encoded|initial_list_status_encoded|target|
+----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+
|  36|             0|   55000.0|               1| 5.91|     123.03|   13.99|   3600.0|      0.0|        0.0|    0.0|               564.0|              560.0|       0.0|                    0.0|                          1|        2.0| (13,[0],[1.0])|    (50,[6],[1.0])|         (5,[0],[1.0])|              (1,[0],[1.0])|     0|
|  36|             0|   65000.0|               1|16.06|     820.28|   11.99|  24700.0|      0.0|        0.0|    0.0|               699.0|              695.0|       0.0|                    0.0|                          1|        2.0| (13,[5],[1.0])|   (50,[45],[1.0])|         (5,[0],[1.0])|              (1,[0],[1.0])|     0|
|  60|             0|   63000.0|               0|10.78|     432.66|   10.78|  20000.0|      0.0|        0.0|    0.0|               704.0|              700.0|       0.0|                    0.0|                          1|        1.0| (13,[2],[1.0])|    (50,[4],[1.0])|         (5,[0],[1.0])|              (1,[0],[1.0])|     0|
+----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+
only showing top 3 rows


Split the data into training and test sets (20% held out for testing)


%spark
// Randomly split the DataFrame into training (80%) and test (20%) sets
val Array(trainDF, testDF) = finalDF.randomSplit(Array(0.8, 0.2), seed = 42)

// Print the number of rows in each set
println(s"\nTraining set: ${trainDF.count()} rows")
println(s"Test set: ${testDF.count()} rows")
Training set: 1105548 rows
Test set: 276030 rows
trainDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [term: int, acc_now_delinq: int ... 20 more fields]
testDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [term: int, acc_now_delinq: int ... 20 more fields]

%spark
val traindf = trainDF.sample(false, 0.5, 42) 
traindf.cache()
traindf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [term: int, acc_now_delinq: int ... 20 more fields]
res37: traindf.type = [term: int, acc_now_delinq: int ... 20 more fields]

Get all columns except 'target'

Get all columns except ‘target’


%spark
val allColumns = finalDF.columns
val featureColumns = allColumns.filter(_ != "target")
allColumns: Array[String] = Array(term, acc_now_delinq, annual_inc, application_type, dti, installment, int_rate, loan_amnt, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee, verification_status_encoded, grade_index, purpose_encoded, addr_state_encoded, home_ownership_encoded, initial_list_status_encoded, target)
featureColumns: Array[String] = Array(term, acc_now_delinq, annual_inc, application_type, dti, installment, int_rate, loan_amnt, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee, verification_status_encoded, grade_index, purpose_encoded, addr_state_encoded, home_ownership_encoded, initial_list_status_en...

Create VectorAssembler to combine feature columns into a single vector


%spark
val assembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("rawFeatures")
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_20dd9c707548, handleInvalid=error, numInputCols=21

Create MinMaxScaler to scale the features


%spark
val scaler = new MinMaxScaler()
  .setInputCol("rawFeatures")
  .setOutputCol("features")
scaler: org.apache.spark.ml.feature.MinMaxScaler = minMaxScal_3daaa6642b2c

Classification


Decision tree classifier


%spark
// Create a DecisionTreeClassifier
val dt = new DecisionTreeClassifier()
  .setLabelCol("target")
  .setFeaturesCol("features")

// Chain assembler, scaler, and tree in a Pipeline
val dtPipeline = new Pipeline()
  .setStages(Array(assembler, scaler, dt))
dt: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_27456753ae61
dtPipeline: org.apache.spark.ml.Pipeline = pipeline_1547b5809387

%spark
// Train the model using the pipeline on the training DataFrame
val dtModel = dtPipeline.fit(traindf)

// Make predictions on the test DataFrame
val dtPredictions = dtModel.transform(testDF)

// Select and show example rows with prediction, target, and features
dtPredictions.select("prediction", "target", "features").show(5)
+----------+------+--------------------+
|prediction|target|            features|
+----------+------+--------------------+
|       0.0|     1|(86,[2,4,5,6,7,11...|
|       0.0|     0|(86,[2,4,5,6,7,11...|
|       1.0|     1|(86,[2,4,5,6,7,10...|
|       0.0|     1|(86,[2,4,5,6,7,11...|
|       0.0|     0|(86,[2,4,5,6,7,11...|
+----------+------+--------------------+
only showing top 5 rows

dtModel: org.apache.spark.ml.PipelineModel = pipeline_1547b5809387
dtPredictions: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 25 more fields]

%spark
// Evaluate accuracy and F1-score using MulticlassClassificationEvaluator
val dtEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("target")
  .setPredictionCol("prediction")

val dtAccuracy = dtEvaluator.setMetricName("accuracy").evaluate(dtPredictions)
val dtF1 = dtEvaluator.setMetricName("f1").evaluate(dtPredictions)

// Cast prediction and target to Double before creating RDD
val dtPredictionAndLabels = dtPredictions
  .select("prediction", "target")
  .withColumn("prediction", col("prediction").cast("double"))
  .withColumn("target", col("target").cast("double"))
  .rdd
  .map(row => (row.getDouble(0), row.getDouble(1)))

// Create MulticlassMetrics
val dtMetrics = new MulticlassMetrics(dtPredictionAndLabels)

// Precision and recall for class 1
val dtPrecision = dtMetrics.precision(1.0)
val dtRecall = dtMetrics.recall(1.0)

// Print evaluation metrics
println("Decision tree Evaluation Metrics:")
println(f"Accuracy: ${dtAccuracy}%.4f")
println(f"F1-Score: ${dtF1}%.4f")
println(f"Precision (Class 1): ${dtPrecision}%.4f")
println(f"Recall (Class 1): ${dtRecall}%.4f")

// Show confusion matrix
println("Confusion Matrix:")
println(dtMetrics.confusionMatrix.toString)
Decision tree Evaluation Metrics:
Accuracy: 0.9385
F1-Score: 0.9374
Precision (Class 1): 0.8912
Recall (Class 1): 0.8107
Confusion Matrix:
211312.0  5831.0   
11147.0   47740.0  
dtEvaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator: uid=mcEval_880cc59b4b68, metricName=f1, metricLabel=0.0, beta=1.0, eps=1.0E-15
dtAccuracy: Double = 0.9384921928775858
dtF1: Double = 0.9374103251527375
dtPredictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[649] at map at :64
dtMetrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@4936ec1d
dtPrecision: Double = 0.8911537958970338
dtRecall: Double = 0.8107052490362898

Random forest classifier


%spark
val rf = new RandomForestClassifier()
  .setLabelCol("target")
  .setFeaturesCol("features")
  .setNumTrees(50)
  .setSeed(42)
  .setMaxDepth(10)

// Chain assembler, scaler, and RandomForest in a Pipeline
val rfPipeline = new Pipeline()
  .setStages(Array(assembler, scaler, rf))
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_a1182570578f
rfPipeline: org.apache.spark.ml.Pipeline = pipeline_b6a25706740a

%spark
// Train the model using the pipeline on the training DataFrame
val rfModel = rfPipeline.fit(traindf)

// Make predictions on the test DataFrame
val rfPredictions = rfModel.transform(testDF)

// Select and show example rows with prediction, target, probability, and features
println("Sample predictions:")
rfPredictions.select("prediction", "target", "probability", "features").show(5)
Sample predictions:
+----------+------+--------------------+--------------------+
|prediction|target|         probability|            features|
+----------+------+--------------------+--------------------+
|       0.0|     1|[0.95265186520119...|(86,[2,4,5,6,7,11...|
|       0.0|     0|[0.95488686594927...|(86,[2,4,5,6,7,11...|
|       1.0|     1|[0.01311484207614...|(86,[2,4,5,6,7,10...|
|       1.0|     1|[0.41052203996494...|(86,[2,4,5,6,7,11...|
|       0.0|     0|[0.94446260966670...|(86,[2,4,5,6,7,11...|
+----------+------+--------------------+--------------------+
only showing top 5 rows

rfModel: org.apache.spark.ml.PipelineModel = pipeline_b6a25706740a
rfPredictions: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 25 more fields]

%spark
val rfEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("target")
  .setPredictionCol("prediction")

// Calculate evaluation metrics
val rfAccuracy = rfEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
val rfF1 = rfEvaluator.setMetricName("f1").evaluate(rfPredictions)
val rfWeightedPrecision = rfEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val rfWeightedRecall = rfEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)

// Print evaluation metrics
println("\nRandomForest Evaluation Metrics:")
println(f"Accuracy: ${rfAccuracy}%.4f")
println(f"F1-Score: ${rfF1}%.4f")
println(f"Weighted Precision: ${rfWeightedPrecision}%.4f")
println(f"Weighted Recall: ${rfWeightedRecall}%.4f")

// Calculate confusion matrix using MulticlassMetrics
val rfPredictionAndLabels = rfPredictions
  .select("prediction", "target")
  .withColumn("prediction", col("prediction").cast("double"))
  .withColumn("target", col("target").cast("double"))
  .rdd
  .map(row => (row.getDouble(0), row.getDouble(1)))

val rfMetrics = new MulticlassMetrics(rfPredictionAndLabels)

// Print confusion matrix
println("\nConfusion Matrix:")
println(rfMetrics.confusionMatrix.toString)
RandomForest Evaluation Metrics:
Accuracy: 0.9413
F1-Score: 0.9394
Weighted Precision: 0.9409
Weighted Recall: 0.9413

Confusion Matrix:
213704.0  3439.0   
12751.0   46136.0  
rfEvaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator: uid=mcEval_44d2660b6e9b, metricName=weightedRecall, metricLabel=0.0, beta=1.0, eps=1.0E-15
rfAccuracy: Double = 0.9413469550411188
rfF1: Double = 0.9394447495588897
rfWeightedPrecision: Double = 0.9409062805086816
rfWeightedRecall: Double = 0.9413469550411188
rfPredictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[829] at map at :73
rfMetrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@b36082d

Gradient Boosting


%spark
val gbt = new GBTClassifier()
  .setLabelCol("target")
  .setFeaturesCol("features")
  .setMaxIter(10)          // Number of trees (iterations)
  .setSeed(42)
  .setMaxDepth(10)         // Maximum depth of each tree

// Chain assembler, scaler, and GBT in a Pipeline
val gbtPipeline = new Pipeline()
  .setStages(Array(assembler, scaler, gbt))
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_c5dfebb505ce
gbtPipeline: org.apache.spark.ml.Pipeline = pipeline_845aaf26109f

%spark
// Train the model using the pipeline on the training DataFrame
val gbtModel = gbtPipeline.fit(traindf)

// Make predictions on the test DataFrame
val gbtPredictions = gbtModel.transform(testDF)

// Select and show example rows with prediction, target, probability, and features
println("Sample predictions:")
gbtPredictions.select("prediction", "target", "probability", "features").show(5)
Sample predictions:
+----------+------+--------------------+--------------------+
|prediction|target|         probability|            features|
+----------+------+--------------------+--------------------+
|       0.0|     1|[0.91545855834496...|(86,[2,4,5,6,7,11...|
|       0.0|     0|[0.90142643143001...|(86,[2,4,5,6,7,11...|
|       1.0|     1|[0.06587782434719...|(86,[2,4,5,6,7,10...|
|       1.0|     1|[0.44432633845501...|(86,[2,4,5,6,7,11...|
|       0.0|     0|[0.91543345506188...|(86,[2,4,5,6,7,11...|
+----------+------+--------------------+--------------------+
only showing top 5 rows

gbtModel: org.apache.spark.ml.PipelineModel = pipeline_845aaf26109f
gbtPredictions: org.apache.spark.sql.DataFrame = [term: int, acc_now_delinq: int ... 25 more fields]

%spark
val gbtEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("target")
  .setPredictionCol("prediction")

// Calculate evaluation metrics
val gbtAccuracy = gbtEvaluator.setMetricName("accuracy").evaluate(gbtPredictions)
val gbtF1 = gbtEvaluator.setMetricName("f1").evaluate(gbtPredictions)
val gbtWeightedPrecision = gbtEvaluator.setMetricName("weightedPrecision").evaluate(gbtPredictions)
val gbtWeightedRecall = gbtEvaluator.setMetricName("weightedRecall").evaluate(gbtPredictions)

// Print evaluation metrics
println("\nGradient Boosting Evaluation Metrics:")
println(f"Accuracy: ${gbtAccuracy}%.4f")
println(f"F1-Score: ${gbtF1}%.4f")
println(f"Weighted Precision: ${gbtWeightedPrecision}%.4f")
println(f"Weighted Recall: ${gbtWeightedRecall}%.4f")

// Calculate confusion matrix using MulticlassMetrics
val gbtPredictionAndLabels = gbtPredictions
  .select("prediction", "target")
  .withColumn("prediction", col("prediction").cast("double"))
  .withColumn("target", col("target").cast("double"))
  .rdd
  .map(row => (row.getDouble(0), row.getDouble(1)))

val gbtMetrics = new MulticlassMetrics(gbtPredictionAndLabels)

// Print confusion matrix
println("\nConfusion Matrix:")
println(gbtMetrics.confusionMatrix.toString)
Gradient Boosting Evaluation Metrics:
Accuracy: 0.9432
F1-Score: 0.9421
Weighted Precision: 0.9423
Weighted Recall: 0.9432

Confusion Matrix:
212324.0  4819.0   
10847.0   48040.0  
gbtEvaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator: uid=mcEval_724f979234cd, metricName=weightedRecall, metricLabel=0.0, beta=1.0, eps=1.0E-15
gbtAccuracy: Double = 0.9432452994239756
gbtF1: Double = 0.9421030107560974
gbtWeightedPrecision: Double = 0.9423157980755705
gbtWeightedRecall: Double = 0.9432452994239757
gbtPredictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[1321] at map at :73
gbtMetrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@2b...

Analysis of Classifier Results


Our three classifiers show strong but slightly varying performance:

  • Decision Tree: Accuracy 0.9385
  • Random Forest: Accuracy 0.9413
  • Gradient Boosting (GBT): Accuracy 0.9432 (Best)

However, these results are based only on a single train–test split, which may not reflect true model performance because:

  1. Risk of Overfitting or Underfitting
    A model may perform well on one particular split but poorly on others. Cross-validation checks performance across multiple folds, reducing split-bias.
  2. More Reliable Model Comparison
    Comparing DT, RF, and GBT on a single split may give misleading rankings. CV gives an average performance, making the comparison statistically stronger.
  3. Better Hyperparameter Tuning
    CV is essential for tuning learning rate, depth, tree count, and other parameters. Without CV, tuned parameters might be overfitted to a single test split.

Conclusion

Since Gradient Boosting (GBT) performs best among the three, it is the most suitable classifier for cross-validation.


Cross-Validation of Gradient Boosted Tree (GBT) Classifier


%spark
val gbtParamGrid = new ParamGridBuilder()
  .addGrid(gbt.maxDepth, Array(3, 6, 8))
  .addGrid(gbt.maxIter, Array(20, 50))
  .build()
gbtParamGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_c5dfebb505ce-maxDepth: 3,
	gbtc_c5dfebb505ce-maxIter: 20
}, {
	gbtc_c5dfebb505ce-maxDepth: 6,
	gbtc_c5dfebb505ce-maxIter: 20
}, {
	gbtc_c5dfebb505ce-maxDepth: 8,
	gbtc_c5dfebb505ce-maxIter: 20
}, {
	gbtc_c5dfebb505ce-maxDepth: 3,
	gbtc_c5dfebb505ce-maxIter: 50
}, {
	gbtc_c5dfebb505ce-maxDepth: 6,
	gbtc_c5dfebb505ce-maxIter: 50
}, {
	gbtc_c5dfebb505ce-maxDepth: 8,
	gbtc_c5dfebb505ce-maxIter: 50
})

%spark
// Evaluator for CV
val gbtCVEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("target")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

// CrossValidator (k=5)
val gbtCV = new CrossValidator()
  .setEstimator(gbtPipeline)
  .setEvaluator(gbtCVEvaluator)
  .setEstimatorParamMaps(gbtParamGrid)
  .setNumFolds(5)
gbtCVEvaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator: uid=mcEval_e4bd8f9f934e, metricName=accuracy, metricLabel=0.0, beta=1.0, eps=1.0E-15
gbtCV: org.apache.spark.ml.tuning.CrossValidator = cv_becbc246d5c8

%spark
// Train using cross validation
/*
val gbtCVModel = gbtCV.fit(traindf)

// Make predictions on the test set
val gbtCVPredictions = gbtCVModel.transform(testDF)

// Show example predictions
gbtCVPredictions.select("prediction", "target", "features").show(5)
*/

%spark
// Compute accuracy and F1 after CV
/*
val gbtCVAccuracy = gbtCVEvaluator.setMetricName("accuracy").evaluate(gbtCVPredictions)
val gbtCVF1 = gbtCVEvaluator.setMetricName("f1").evaluate(gbtCVPredictions)

// Prepare RDD for MulticlassMetrics
val gbtCVPredictionAndLabels = gbtCVPredictions
  .select("prediction", "target")
  .withColumn("prediction", col("prediction").cast("double"))
  .withColumn("target", col("target").cast("double"))
  .rdd
  .map(row => (row.getDouble(0), row.getDouble(1)))

val gbtCVMetrics = new MulticlassMetrics(gbtCVPredictionAndLabels)

// Precision and recall for class 1
val gbtCVPrecision = gbtCVMetrics.precision(1.0)
val gbtCVRecall = gbtCVMetrics.recall(1.0)

// Print results
println("GBT Classifier Cross-Validation Evaluation Metrics:")
println(f"Accuracy: ${gbtCVAccuracy}%.4f")
println(f"F1-Score: ${gbtCVF1}%.4f")
println(f"Precision (Class 1): ${gbtCVPrecision}%.4f")
println(f"Recall (Class 1): ${gbtCVRecall}%.4f")

println("Confusion Matrix:")
println(gbtCVMetrics.confusionMatrix.toString)
*/

References