This case study details the end-to-end development of a predictive model designed to identify potential loan defaulters from the publicly available Lending Club dataset. The project encapsulates the entire data science lifecycle, employing advanced feature engineering and multiple classification algorithms to build a system capable of accurately forecasting default probability, thereby addressing a critical business need in the fintech lending sector.
We will use classifiers to predict a loan default by users. For this, we will use a real-world dataset provided by Lending Club. Lending Club is a fintech firm that has publicly available data on its website. If you are interested, you can collect dataset from Kaggle from this link. Lending Club Dataset.
Lending Club Dataset. The data is helpful for analytical studies and it contains hundreds of features. Looking into all the features is out of the scope of our study. Therefore, we will only use a subset of features for our predictions. The this study will give us an idea about how real business problems are solved using EDA and Machine Learning.
We will use Apache Spark APIs in Scala in Zeppelin-Notebook on Linux Ubuntu 22.04.
%spark
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, count, isnan, when}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, count, isnan, when}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
%spark
// Define the file location and selected columns
val fileLocation = "hdfs://localhost:9000/user/hduser/data/accepted_2007_to_2018Q4.csv.gz"
val selectedColumns = Array(
"id",
"purpose",
"term",
"verification_status",
"acc_now_delinq",
"addr_state",
"annual_inc",
"application_type",
"dti",
"grade",
"home_ownership",
"initial_list_status",
"installment",
"int_rate",
"loan_amnt",
"loan_status",
"tax_liens",
"delinq_amnt",
"pub_rec",
"last_fico_range_high",
"last_fico_range_low",
"recoveries",
"collection_recovery_fee"
)
// Load the CSV file with the selected columns
val dfraw: DataFrame = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("compression", "gzip")
.load(fileLocation)
.select(selectedColumns.head, selectedColumns.tail: _*)
// Show the first few rows
dfraw.show(5)
+--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ | id| purpose| term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type| dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee| +--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ |68407277|debt_consolidation| 36 months| Not Verified| 0.0| PA| 55000.0| Individual| 5.91| C| MORTGAGE| w| 123.03| 13.99| 3600.0| Fully Paid| 0.0| 0.0| 0.0| 564.0| 560.0| 0.0| 0.0| |68355089| small_business| 36 months| Not Verified| 0.0| SD| 65000.0| Individual|16.06| C| MORTGAGE| w| 820.28| 11.99| 24700.0| Fully Paid| 0.0| 0.0| 0.0| 699.0| 695.0| 0.0| 0.0| |68341763| home_improvement| 60 months| Not Verified| 0.0| IL| 63000.0| Joint App|10.78| B| MORTGAGE| w| 432.66| 10.78| 20000.0| Fully Paid| 0.0| 0.0| 0.0| 704.0| 700.0| 0.0| 0.0| |66310712|debt_consolidation| 60 months| Source Verified| 0.0| NJ| 110000.0| Individual|17.06| C| MORTGAGE| w| 829.9| 14.85| 35000.0| Current| 0.0| 0.0| 0.0| 679.0| 675.0| 0.0| 0.0| |68476807| major_purchase| 60 months| Source Verified| 0.0| PA| 104433.0| Individual|25.37| F| MORTGAGE| w| 289.91| 22.45| 10400.0| Fully Paid| 0.0| 0.0| 0.0| 704.0| 700.0| 0.0| 0.0| +--------+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ only showing top 5 rows [1m[34mfileLocation[0m: [1m[32mString[0m = hdfs://localhost:9000/user/hduser/data/accepted_2007_to_2018Q4.csv.gz [1m[34mselectedColumns[0m: [1m[32mArray[String][0m = Array(id, purpose, term, verification_status, acc_now_delinq, addr_state, annual_inc, application_type, dti, grade, home_ownership, initial_list_status, installment, int_rate, loan_amnt, loan_status, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee) [1m[34mdfraw[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [id: string, purpose: string ... 21 more fields]
%spark
println(s"Shape of the DataFrame: (${dfraw.count()}, ${dfraw.columns.length})")
Shape of the DataFrame: (2260701, 23)
Missing values
%spark
// Count null values for each column
val nullCounts = dfraw.select(
dfraw.columns.map(c =>
count(when(col(c).isNull || isnan(col(c)), true)).alias(c)
): _*
)
nullCounts.show()
+---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ | id|purpose|term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type| dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee| +---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ | 0| 34| 33| 33| 221| 34| 37| 88|1745| 33| 33| 50| 33| 33| 33| 33| 338| 258| 125| 76| 68| 35| 39| +---+-------+----+-------------------+--------------+----------+----------+----------------+----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ [1m[34mnullCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [id: bigint, purpose: bigint ... 21 more fields]
%spark // Drop rows with any null values val dfCleaned = dfraw.na.drop()
[1m[34mdfCleaned[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [id: string, purpose: string ... 21 more fields]
%spark
println(s"Shape of the DataFrame: (${dfCleaned.count()}, ${dfCleaned.columns.length})")
Shape of the DataFrame: (2258596, 23)
%spark dfCleaned.printSchema()
root |-- id: string (nullable = true) |-- purpose: string (nullable = true) |-- term: string (nullable = true) |-- verification_status: string (nullable = true) |-- acc_now_delinq: string (nullable = true) |-- addr_state: string (nullable = true) |-- annual_inc: string (nullable = true) |-- application_type: string (nullable = true) |-- dti: string (nullable = true) |-- grade: string (nullable = true) |-- home_ownership: string (nullable = true) |-- initial_list_status: string (nullable = true) |-- installment: double (nullable = true) |-- int_rate: double (nullable = true) |-- loan_amnt: double (nullable = true) |-- loan_status: string (nullable = true) |-- tax_liens: double (nullable = true) |-- delinq_amnt: double (nullable = true) |-- pub_rec: string (nullable = true) |-- last_fico_range_high: string (nullable = true) |-- last_fico_range_low: string (nullable = true) |-- recoveries: string (nullable = true) |-- collection_recovery_fee: string (nullable = true)
Target feature: The loan_status feature, which is our target variable, contains other values than Fully Paid and Charged Off. Therefore, we consider to encode all. Inspecting “loan_status” column unique values
%spark
val uniqueLoanStatuses = dfCleaned.select("loan_status").distinct()
uniqueLoanStatuses.show()
+--------------------+ | loan_status| +--------------------+ | Fully Paid| | Default| | In Grace Period| |Does not meet the...| | Charged Off| | Late (31-120 days)| | Current| |Does not meet the...| | Late (16-30 days)| +--------------------+ [1m[34muniqueLoanStatuses[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [loan_status: string]
Current: Applicant is in the process of paying the instalments, i.e. the tenure of the loan is not yet completed. These candidates are not labelled as ‘defaulted’. Dropping the current customers as they are not required for driving factors consideration. Also id column is not required.
%spark
val filteredDF = dfCleaned.filter(col("loan_status") =!= "Current")
// Show the result
// filteredDF.show()
val uniqueLoanStatus = filteredDF.select("loan_status").distinct()
uniqueLoanStatus.show()
+--------------------+ | loan_status| +--------------------+ | Fully Paid| | Default| | In Grace Period| |Does not meet the...| | Charged Off| | Late (31-120 days)| |Does not meet the...| | Late (16-30 days)| +--------------------+ [1m[34mfilteredDF[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [id: string, purpose: string ... 21 more fields] [1m[34muniqueLoanStatus[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [loan_status: string]
%spark
val filteredDF1 = filteredDF.drop("id")
filteredDF1.show(5)
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ | purpose| term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type| dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee| +------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ |debt_consolidation| 36 months| Not Verified| 0.0| PA| 55000.0| Individual| 5.91| C| MORTGAGE| w| 123.03| 13.99| 3600.0| Fully Paid| 0.0| 0.0| 0.0| 564.0| 560.0| 0.0| 0.0| | small_business| 36 months| Not Verified| 0.0| SD| 65000.0| Individual|16.06| C| MORTGAGE| w| 820.28| 11.99| 24700.0| Fully Paid| 0.0| 0.0| 0.0| 699.0| 695.0| 0.0| 0.0| | home_improvement| 60 months| Not Verified| 0.0| IL| 63000.0| Joint App|10.78| B| MORTGAGE| w| 432.66| 10.78| 20000.0| Fully Paid| 0.0| 0.0| 0.0| 704.0| 700.0| 0.0| 0.0| | major_purchase| 60 months| Source Verified| 0.0| PA| 104433.0| Individual|25.37| F| MORTGAGE| w| 289.91| 22.45| 10400.0| Fully Paid| 0.0| 0.0| 0.0| 704.0| 700.0| 0.0| 0.0| |debt_consolidation| 36 months| Source Verified| 0.0| GA| 34000.0| Individual| 10.2| C| RENT| w| 405.18| 13.44| 11950.0| Fully Paid| 0.0| 0.0| 0.0| 759.0| 755.0| 0.0| 0.0| +------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ only showing top 5 rows [1m[34mfilteredDF1[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: string ... 20 more fields]
%spark
println(s"Shape of the DataFrame: (${filteredDF1.count()}, ${filteredDF1.columns.length})")
Shape of the DataFrame: (1381578, 22)
%spark // Sample 100,000 rows from the DataFrame val filtered_DF = filteredDF1.orderBy(rand()).limit(100000) // Optionally, you can cache the sampled DataFrame for faster reuse filtered_DF.cache()
[1m[34mfiltered_DF[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [purpose: string, term: string ... 20 more fields] [1m[34mres11[0m: [1m[32mfiltered_DF.type[0m = [purpose: string, term: string ... 20 more fields]
%spark
// Identify categorical columns (assuming they are of StringType)
val categoricalCols = filtered_DF.schema.fields
.filter(_.dataType.typeName == "string")
.map(_.name)
// Apply StringIndexer to each categorical column
var dataEncoded = filtered_DF
for (colName <- categoricalCols) {
val indexer = new StringIndexer()
.setInputCol(colName)
.setOutputCol(s"${colName}_encoded")
.fit(dataEncoded)
dataEncoded = indexer.transform(dataEncoded).drop(colName).withColumnRenamed(s"${colName}_encoded", colName)
}
// Show the encoded DataFrame
// dataEncoded.show(5)
[1m[34mcategoricalCols[0m: [1m[32mArray[String][0m = Array(purpose, term, verification_status, acc_now_delinq, addr_state, annual_inc, application_type, dti, grade, home_ownership, initial_list_status, loan_status, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee) [1m[34mdataEncoded[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [installment: double, int_rate: double ... 20 more fields]
%spark
// Write the DataFrame to a temporary file
val tempPath = "/tmp/encoded_data.csv"
dataEncoded.coalesce(1)
.write
.mode("overwrite")
.option("header", "true")
.csv(tempPath)
[1m[34mtempPath[0m: [1m[32mString[0m = /tmp/encoded_data.csv
%spark.pyspark
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("tempfile") \
.master("local[*]") \
.getOrCreate()
sparkDF= spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("/tmp/encoded_data.csv")
# sparkDF.show(5)
%spark.pyspark
# Read the CSV file into Pandas
pandasDF = sparkDF.toPandas()
# Plot the heatmap
plt.figure(figsize=(21, 14))
sns.heatmap(pandasDF.corr(), annot=True, cmap='viridis', fmt=".2f", linewidths=0.5)
plt.title("Correlation Heatmap (Full Data)", fontsize=18, pad=12)
plt.tight_layout()
plt.show()
%spark filtered_DF.show(5)
+------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ | purpose| term|verification_status|acc_now_delinq|addr_state|annual_inc|application_type| dti|grade|home_ownership|initial_list_status|installment|int_rate|loan_amnt|loan_status|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee| +------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ |debt_consolidation| 60 months| Verified| 0.0| CA| 96000.0| Individual|17.03| C| RENT| f| 321.55| 16.78| 13000.0| Fully Paid| 0.0| 0.0| 0.0| 694.0| 690.0| 0.0| 0.0| |debt_consolidation| 36 months| Not Verified| 0.0| GA| 75000.0| Individual|19.22| A| MORTGAGE| f| 557.94| 7.26| 18000.0| Fully Paid| 0.0| 0.0| 0.0| 729.0| 725.0| 0.0| 0.0| | home_improvement| 36 months| Not Verified| 0.0| NY| 50000.0| Joint App|37.21| B| MORTGAGE| w| 483.45| 9.92| 15000.0| Fully Paid| 0.0| 0.0| 0.0| 699.0| 695.0| 0.0| 0.0| | vacation| 36 months| Not Verified| 0.0| CA| 60000.0| Individual| 11.4| B| RENT| f| 70.25| 12.49| 2100.0| Fully Paid| 0.0| 0.0| 0.0| 724.0| 720.0| 0.0| 0.0| |debt_consolidation| 36 months| Not Verified| 0.0| GA| 63000.0| Individual|10.42| A| MORTGAGE| f| 281.58| 7.89| 9000.0| Fully Paid| 0.0| 0.0| 1.0| 794.0| 790.0| 0.0| 0.0| +------------------+----------+-------------------+--------------+----------+----------+----------------+-----+-----+--------------+-------------------+-----------+--------+---------+-----------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+ only showing top 5 rows
%spark
// Define good loan statuses
val Good_Loan_statuses = Seq(
"Fully Paid",
"In Grace Period",
"Does not meet the credit policy. Status:Fully Paid"
)
// Create a broadcast variable (efficient for large data)
val goodLoanSet = spark.sparkContext.broadcast(Good_Loan_statuses.toSet)
// Update loan_status column
val filtered_df = filtered_DF.withColumn(
"loan_status",
when(col("loan_status").isin(Good_Loan_statuses: _*), "Good Loan")
.otherwise("Bad Loan")
)
// Show unique updated values
filtered_df.select("loan_status").distinct().show(false)
+-----------+ |loan_status| +-----------+ |Good Loan | |Bad Loan | +-----------+ [1m[34mGood_Loan_statuses[0m: [1m[32mSeq[String][0m = List(Fully Paid, In Grace Period, Does not meet the credit policy. Status:Fully Paid) [1m[34mgoodLoanSet[0m: [1m[32morg.apache.spark.broadcast.Broadcast[scala.collection.immutable.Set[String]][0m = Broadcast(61) [1m[34mfiltered_df[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: string ... 20 more fields]
%spark
// Group and filter
val loanStat = filtered_df
.groupBy("loan_status")
.count()
.filter(col("loan_status").isin("Good Loan", "Bad Loan"))
// Display as table (Zeppelin will allow bar chart)
// Convert to JSON for export
val jsonStr = loanStat.toJSON.collect().mkString("[", ",", "]")
// Store inside notebook (key must be loanstat)
z.put("loanstat", jsonStr)
z.show(loanStat)
loan_status count Good Loan 78663 Bad Loan 21337
[1m[34mloanStat[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [loan_status: string, count: bigint]
[1m[34mjsonStr[0m: [1m[32mString[0m = [{"loan_status":"Good Loan","count":78663},{"loan_status":"Bad Loan","count":21337}]
%spark.pyspark
# Retrieve JSON string from Scala paragraph
json_list = z.get("loanstat")
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Convert JSON string into RDD with 1 element
rdd = spark.sparkContext.parallelize([json_list])
# Load into DataFrame
df_python = spark.read.json(rdd)
df_python.show()
+-----+-----------+ |count|loan_status| +-----+-----------+ |78663| Good Loan| |21337| Bad Loan| +-----+-----------+
%spark.pyspark
# Convert to pandas
loanStatPan = df_python.toPandas()
plt.bar(loanStatPan['loan_status'], loanStatPan['count'], color='lightblue')
plt.title('Loan Status Counts')
plt.xlabel('loan_status')
plt.ylabel('Counts')
plt.tight_layout()
plt.show()
%spark
// Count loans by state and order by count (descending)
val stateCounts = filtered_df
.groupBy("addr_state")
.agg(count("*").alias("count"))
.orderBy(desc("count"))
// Show the result in Zeppelin
z.show(stateCounts)
addr_state count CA 14473 TX 8163 NY 8084 FL 7102 IL 3787 NJ 3606 PA 3436 GA 3354 OH 3321 VA 2845 NC 2792 MI 2640 MD 2380 AZ 2373 MA 2301 CO 2280 WA 2108 MN 1835 IN 1628 MO 1610 TN 1580 NV 1493 CT 1449 WI 1309 AL 1213 LA 1212 SC 1202 OR 1188 OK 919 KY 904 KS 805 UT 771 AR 736 HI 532 NM 514 NH 472 MS 466 RI 445 WV 351 DE 301 MT 289 NE 267 DC 248 AK 229 SD 225 WY 194 VT 186 ID 131 ME 127 ND 122 IA 2
[1m[34mstateCounts[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [addr_state: string, count: bigint]
%spark
// Count the occurrences of each grade by loan_status
val gradeCounts = filtered_df
.groupBy("grade", "loan_status")
.agg(count("*").alias("count"))
.orderBy(col("grade")) // Use col("grade") instead of "grade"
// Pivot the data to have loan_status as columns
val gradePivot = gradeCounts
.groupBy("grade")
.pivot("loan_status")
.agg(sum("count"))
.orderBy(col("grade")) // Use col("grade") instead of "grade"
.na.fill(0) // Replace nulls with 0
// Show the result in Zeppelin
z.show(gradePivot)
grade Bad Loan Good Loan A 1103 15881 B 4280 24933 C 6953 21819 D 4840 10142 E 2743 4245 F 1069 1324 G 349 319
[1m[34mgradeCounts[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [grade: string, loan_status: string ... 1 more field] [1m[34mgradePivot[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [grade: string, Bad Loan: bigint ... 1 more field]
%spark
// Count the occurrences of each term by loan_status
val termCounts = filtered_df
.groupBy("term", "loan_status")
.agg(count("*").alias("count"))
.orderBy("term")
// Pivot the data to have loan_status as columns
val termPivot = termCounts
.groupBy("term")
.pivot("loan_status")
.agg(sum("count"))
.orderBy("term")
.na.fill(0) // Replace nulls with 0
// Show the result in Zeppelin
z.show(termPivot)
term Bad Loan Good Loan 36 months 12954 62482 60 months 8383 16181
[1m[34mtermCounts[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [term: string, loan_status: string ... 1 more field] [1m[34mtermPivot[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: string, Bad Loan: bigint ... 1 more field]
%spark
val filtered = filtered_df.filter($"loan_status".isin("Good Loan", "Bad Loan"))
val jsonRDD = filtered
.select(to_json(struct(filtered.columns.map(col): _*)).alias("json"))
.rdd
.map(_.getString(0))
.take(100000) // limit rows to avoid memory issues
z.put("scala_json", jsonRDD)
[1m[34mfiltered[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [purpose: string, term: string ... 20 more fields]
[1m[34mjsonRDD[0m: [1m[32mArray[String][0m = Array({"purpose":"debt_consolidation","term":" 60 months","verification_status":"Verified","acc_now_delinq":"0.0","addr_state":"CA","annual_inc":"96000.0","application_type":"Individual","dti":"17.03","grade":"C","home_ownership":"RENT","initial_list_status":"f","installment":321.55,"int_rate":16.78,"loan_amnt":13000.0,"loan_status":"Good Loan","tax_liens":0.0,"delinq_amnt":0.0,"pub_rec":"0.0","last_fico_range_high":"694.0","last_fico_range_low":"690.0","recoveries":"0.0","collection_recovery_fee":"0.0"}, {"purpose":"debt_consolidation","term":" 36 months","verification_status":"Not Verified","a...
%spark.pyspark
json_list = z.get("scala_json")
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Load the JSON strings into a PySpark DataFrame
df_python = spark.read.json(spark.sparkContext.parallelize(json_list))
# df_python.show(5)
%spark.pyspark
pdf = df_python.select("loan_amnt", "loan_status").toPandas()
plt.boxplot([
pdf[pdf.loan_status=="Good Loan"].loan_amnt,
pdf[pdf.loan_status=="Bad Loan"].loan_amnt
], labels=["Good Loan", "Bad Loan"])
plt.title("Loan Status vs Loan Amount")
plt.xlabel("Loan Status")
plt.ylabel("Loan Amount")
plt.show()
%spark.pyspark
# Convert to Pandas for plotting
pandas_df = df_python.select('installment', 'loan_status').toPandas()
fig, ax = plt.subplots(figsize=(8, 5))
# Installment - Histograms
good_loan_inst = pandas_df[pandas_df['loan_status'] == 'Good Loan']['installment']
bad_loan_inst = pandas_df[pandas_df['loan_status'] == 'Bad Loan']['installment']
ax.hist([good_loan_inst, bad_loan_inst], bins=20, label=['Good Loan', 'Bad Loan'], alpha=0.7)
ax.set_title('Installment Distribution by Status')
ax.set_xlabel('Installment')
ax.set_ylabel('Frequency')
ax.legend()
Group by ‘purpose’ and count the occurrences
%spark
val dfWithCount = filteredDF1.groupBy("purpose").count()
// Show the result
dfWithCount.show()
+------------------+------+ | purpose| count| +------------------+------+ | wedding| 2346| | educational| 403| | other| 80690| | small_business| 16061| |debt_consolidation|801085| | credit_card|301993| | moving| 9796| | vacation| 9304| | renewable_energy| 955| | house| 7578| | car| 14941| | major_purchase| 30392| | medical| 16049| | home_improvement| 89985| +------------------+------+ [1m[34mdfWithCount[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, count: bigint]
Replacing values in the ‘purpose’ column based on the ‘count’ column condition If ‘count’ is less than 300, set ‘purpose’ to “other”, else keep the original ‘purpose’
%spark
// Join the DataFrame with df_with_count on 'purpose' using a left join
val joinedDF = filteredDF1.join(dfWithCount, Seq("purpose"), "left")
// Replace 'purpose' with "other" if count is less than 300
val updatedDF = joinedDF.withColumn(
"purpose",
when(col("count") < 300, "other").otherwise(col("purpose"))
).drop("count")
// Get unique purposes
val uniquePurposes = updatedDF.select("purpose").distinct()
// Show unique purposes
uniquePurposes.show()
+------------------+ | purpose| +------------------+ | wedding| | educational| | other| | small_business| |debt_consolidation| | credit_card| | moving| | vacation| | renewable_energy| | house| | car| | major_purchase| | medical| | home_improvement| +------------------+ [1m[34mjoinedDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: string ... 21 more fields] [1m[34mupdatedDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: string ... 20 more fields] [1m[34muniquePurposes[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [purpose: string]
Group by ‘term’ and count the occurrences
%spark
// updatedDF.show()
val termCounts = updatedDF.groupBy("term").count()
// Show the result
termCounts.show()
+----------+-------+ | term| count| +----------+-------+ | 36 months|1042464| | 60 months| 339114| +----------+-------+ [1m[34mtermCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: string, count: bigint]
Applying a regular expression to extract numbers from the ‘term’ column and then casting it to the Integer data type.
%spark
// Extract the numeric part from the 'term' column and cast it to Int
val updatedDF1 = updatedDF.withColumn("term", regexp_extract(col("term"), "(\\d+)", 0).cast("int"))
[1m[34mupdatedDF1[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: int ... 20 more fields]
Group by ‘verification_status’ and count the occurrences
%spark
val verificationStatusCounts = updatedDF1.groupBy("verification_status").count()
// Show the result
verificationStatusCounts.show()
+-------------------+------+ |verification_status| count| +-------------------+------+ | Verified|429534| | Source Verified|535993| | Not Verified|416051| +-------------------+------+ [1m[34mverificationStatusCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [verification_status: string, count: bigint]
Encode ‘verification_status’ column values into a new column ‘verification_status_encoded’ If ‘verification_status’ is either “Verified” or “Source Verified”, set ‘verification_status_encoded’ to 0 Otherwise, set it to 1
%spark
// Encode 'verification_status' as 0 for "Verified" or "Source Verified", otherwise 1
val updatedDF2 = updatedDF1.withColumn(
"verification_status_encoded",
when(col("verification_status").isin("Verified", "Source Verified"), 0)
.otherwise(1)
).drop("verification_status")
// Group by 'verification_status_encoded' and count the occurrences
val encodedCounts = updatedDF2.groupBy("verification_status_encoded").count()
// Show the result
encodedCounts.show()
+---------------------------+------+ |verification_status_encoded| count| +---------------------------+------+ | 1|416051| | 0|965527| +---------------------------+------+ [1m[34mupdatedDF2[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: int ... 20 more fields] [1m[34mencodedCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [verification_status_encoded: int, count: bigint]
Group by ‘acc_now_delinq’ and count the occurrences
%spark
val acc_now_delinqCounts = updatedDF2.groupBy("acc_now_delinq").count()
// Show the result
acc_now_delinqCounts.show()
+--------------+-------+ |acc_now_delinq| count| +--------------+-------+ | 1.0| 6106| | 0.0|1375104| | 14.0| 1| | 5.0| 3| | 6.0| 1| | 4.0| 10| | 2.0| 311| | 3.0| 42| +--------------+-------+ [1m[34macc_now_delinqCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [acc_now_delinq: string, count: bigint]
Define the valid values for 'acc_now_delinq' valid_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Modify the 'acc_now_delinq' column: 1. Cast the column to IntegerType 2. Set values greater than or equal to 4 to 4, and keep other valid values as they are
Define the valid values for ‘acc_now_delinq’
valid_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Modify the ‘acc_now_delinq’ column:
%spark
// Define valid values
val validValues = Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Cast 'acc_now_delinq' to Int, cap values >= 4 to 4, and filter for valid values
val updatedDF3 = updatedDF2
.withColumn("acc_now_delinq", col("acc_now_delinq").cast("int"))
.withColumn("acc_now_delinq", when(col("acc_now_delinq") >= 4, 4).otherwise(col("acc_now_delinq")))
.filter(col("acc_now_delinq").isin(validValues: _*))
// Group by 'acc_now_delinq' and count the occurrences
val delinqCounts = updatedDF3.groupBy("acc_now_delinq").count()
// Show the result
delinqCounts.show()
+--------------+-------+ |acc_now_delinq| count| +--------------+-------+ | 1| 6106| | 3| 42| | 4| 15| | 2| 311| | 0|1375104| +--------------+-------+ [1m[34mvalidValues[0m: [1m[32mSeq[Int][0m = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) [1m[34mupdatedDF3[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [purpose: string, term: int ... 20 more fields] [1m[34mdelinqCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [acc_now_delinq: int, count: bigint]
Group by ‘application_type’ and count the occurrences
%spark
val application_typeCounts = updatedDF3.groupBy("application_type").count()
// Show the result
application_typeCounts.show()
+----------------+-------+ |application_type| count| +----------------+-------+ | Joint App| 29172| | Individual|1352406| +----------------+-------+ [1m[34mapplication_typeCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [application_type: string, count: bigint]
Define the valid values for 'application_type' valid_values = ['Joint App', 'Individual']
Modify the 'application_type' column: 1. Map 'Joint App' to 0 and 'Individual' to 1 2. Remove other values 3. Cast the column to IntegerType
Define the valid values for ‘application_type’
valid_values = [‘Joint App’, ‘Individual’]
Modify the ‘application_type’ column:
%spark
// Define valid values
val validValues = Seq("Joint App", "Individual")
// Encode 'application_type' as 0 for "Joint App", 1 for "Individual", otherwise null
val updatedDF4 = updatedDF3.withColumn(
"application_type",
when(col("application_type") === "Joint App", 0)
.when(col("application_type") === "Individual", 1)
.otherwise(null)
)
// Filter out null values and cast 'application_type' to Int
val updatedDF5 = updatedDF4
.filter(col("application_type").isNotNull)
.withColumn("application_type", col("application_type").cast("int"))
[1m[34mvalidValues[0m: [1m[32mSeq[String][0m = List(Joint App, Individual) [1m[34mupdatedDF4[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: int ... 20 more fields] [1m[34mupdatedDF5[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: int ... 20 more fields]
Group by ‘grade’ and count the occurrences
%spark
val gradeCounts = updatedDF5.groupBy("grade").count()
// Show the result
gradeCounts.show()
+-----+------+ |grade| count| +-----+------+ | F| 33289| | E| 97494| | B|400448| | D|209155| | C|394127| | A|237375| | G| 9690| +-----+------+ [1m[34mgradeCounts[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [grade: string, count: bigint]
StringIndexer to convert ‘grade’ column into numerical indices
%spark
// Create a StringIndexer for the 'grade' column
val gradeIndexer = new StringIndexer()
.setInputCol("grade")
.setOutputCol("grade_index")
.setStringOrderType("alphabetAsc")
// Fit and transform the DataFrame, then drop the original 'grade' column
val updatedDF6 = gradeIndexer.fit(updatedDF5).transform(updatedDF5).drop("grade")
[1m[34mgradeIndexer[0m: [1m[32morg.apache.spark.ml.feature.StringIndexer[0m = strIdx_ef68e929d71f [1m[34mupdatedDF6[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [purpose: string, term: int ... 20 more fields]
%spark
// Define a function to one-hot encode a column
def oneHotEncodeColumn(df: org.apache.spark.sql.DataFrame, inputCol: String): org.apache.spark.sql.DataFrame = {
// StringIndexer: Convert categorical values to indices
val indexer = new StringIndexer()
.setInputCol(inputCol)
.setOutputCol(inputCol + "_indexed")
val indexedDF = indexer.fit(df).transform(df)
// OneHotEncoder: Convert indices to one-hot encoded vectors
val encoder = new OneHotEncoder()
.setInputCol(inputCol + "_indexed")
.setOutputCol(inputCol + "_encoded")
val encodedDF = encoder.fit(indexedDF).transform(indexedDF)
// Drop the original and indexed columns
encodedDF.drop(inputCol, inputCol + "_indexed")
}
[1m[34moneHotEncodeColumn[0m: [1m[32m(df: org.apache.spark.sql.DataFrame, inputCol: String)org.apache.spark.sql.DataFrame[0m
%spark
// List of columns to encode
val columnsToEncode = Array("purpose", "addr_state", "home_ownership", "initial_list_status")
// Apply one-hot encoding to each column
var encodedDF = updatedDF6
for (column <- columnsToEncode) {
encodedDF = oneHotEncodeColumn(encodedDF, column)
}
[1m[34mcolumnsToEncode[0m: [1m[32mArray[String][0m = Array(purpose, addr_state, home_ownership, initial_list_status) [1m[34mencodedDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 20 more fields]
Cast to float type
%spark
// List of columns to cast to Float
val columnsToCast = Array(
"installment",
"int_rate",
"loan_amnt",
"annual_inc",
"dti",
"tax_liens",
"delinq_amnt",
"pub_rec",
"last_fico_range_high",
"last_fico_range_low",
"recoveries",
"collection_recovery_fee"
)
// Cast each column to Float
var castedDF = encodedDF
for (columnName <- columnsToCast) {
castedDF = castedDF.withColumn(columnName, col(columnName).cast(FloatType))
}
[1m[34mcolumnsToCast[0m: [1m[32mArray[String][0m = Array(installment, int_rate, loan_amnt, annual_inc, dti, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee) [1m[34mcastedDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 20 more fields]
%spark castedDF.printSchema()
root |-- term: integer (nullable = true) |-- acc_now_delinq: integer (nullable = true) |-- annual_inc: float (nullable = true) |-- application_type: integer (nullable = true) |-- dti: float (nullable = true) |-- installment: float (nullable = true) |-- int_rate: float (nullable = true) |-- loan_amnt: float (nullable = true) |-- loan_status: string (nullable = true) |-- tax_liens: float (nullable = true) |-- delinq_amnt: float (nullable = true) |-- pub_rec: float (nullable = true) |-- last_fico_range_high: float (nullable = true) |-- last_fico_range_low: float (nullable = true) |-- recoveries: float (nullable = true) |-- collection_recovery_fee: float (nullable = true) |-- verification_status_encoded: integer (nullable = false) |-- grade_index: double (nullable = false) |-- purpose_encoded: vector (nullable = true) |-- addr_state_encoded: vector (nullable = true) |-- home_ownership_encoded: vector (nullable = true) |-- initial_list_status_encoded: vector (nullable = true)
Encode ‘loan_status’ to 0 & 1, rename ‘loan_status’ to target and cast to Int.
%spark
val updatedcastedDF = castedDF.withColumn(
"target",
when(col("loan_status") === "Fully Paid", 0)
.when(col("loan_status").isin("In Grace Period"), 0)
.when(col("loan_status") === "Does not meet the credit policy. Status:Fully Paid", 0)
.when(
col("loan_status").isin(
"Does not meet the credit policy. Status:Charged Off",
"Charged Off",
"Late (16-30 days)",
"Late (31-120 days)",
"Default"
), 1
)
.otherwise(null)
).drop("loan_status")
val finalDF = updatedcastedDF
.filter(col("target").isNotNull)
.withColumn("target", col("target").cast("int"))
[1m[34mupdatedcastedDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 20 more fields] [1m[34mfinalDF[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 20 more fields]
%spark
val targetdf = finalDF.groupBy("target").count()
targetdf.show()
+------+-------+ |target| count| +------+-------+ | 1| 295020| | 0|1086558| +------+-------+ [1m[34mtargetdf[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [target: int, count: bigint]
%spark z.show(targetdf)
target count 1 295020 0 1086558
%spark finalDF.show(3)
+----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+ |term|acc_now_delinq|annual_inc|application_type| dti|installment|int_rate|loan_amnt|tax_liens|delinq_amnt|pub_rec|last_fico_range_high|last_fico_range_low|recoveries|collection_recovery_fee|verification_status_encoded|grade_index|purpose_encoded|addr_state_encoded|home_ownership_encoded|initial_list_status_encoded|target| +----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+ | 36| 0| 55000.0| 1| 5.91| 123.03| 13.99| 3600.0| 0.0| 0.0| 0.0| 564.0| 560.0| 0.0| 0.0| 1| 2.0| (13,[0],[1.0])| (50,[6],[1.0])| (5,[0],[1.0])| (1,[0],[1.0])| 0| | 36| 0| 65000.0| 1|16.06| 820.28| 11.99| 24700.0| 0.0| 0.0| 0.0| 699.0| 695.0| 0.0| 0.0| 1| 2.0| (13,[5],[1.0])| (50,[45],[1.0])| (5,[0],[1.0])| (1,[0],[1.0])| 0| | 60| 0| 63000.0| 0|10.78| 432.66| 10.78| 20000.0| 0.0| 0.0| 0.0| 704.0| 700.0| 0.0| 0.0| 1| 1.0| (13,[2],[1.0])| (50,[4],[1.0])| (5,[0],[1.0])| (1,[0],[1.0])| 0| +----+--------------+----------+----------------+-----+-----------+--------+---------+---------+-----------+-------+--------------------+-------------------+----------+-----------------------+---------------------------+-----------+---------------+------------------+----------------------+---------------------------+------+ only showing top 3 rows
Split the data into training and test sets (20% held out for testing)
%spark
// Randomly split the DataFrame into training (80%) and test (20%) sets
val Array(trainDF, testDF) = finalDF.randomSplit(Array(0.8, 0.2), seed = 42)
// Print the number of rows in each set
println(s"\nTraining set: ${trainDF.count()} rows")
println(s"Test set: ${testDF.count()} rows")
Training set: 1105548 rows Test set: 276030 rows [1m[34mtrainDF[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [term: int, acc_now_delinq: int ... 20 more fields] [1m[34mtestDF[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [term: int, acc_now_delinq: int ... 20 more fields]
%spark val traindf = trainDF.sample(false, 0.5, 42) traindf.cache()
[1m[34mtraindf[0m: [1m[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row][0m = [term: int, acc_now_delinq: int ... 20 more fields] [1m[34mres37[0m: [1m[32mtraindf.type[0m = [term: int, acc_now_delinq: int ... 20 more fields]
Get all columns except 'target'
Get all columns except ‘target’
%spark val allColumns = finalDF.columns val featureColumns = allColumns.filter(_ != "target")
[1m[34mallColumns[0m: [1m[32mArray[String][0m = Array(term, acc_now_delinq, annual_inc, application_type, dti, installment, int_rate, loan_amnt, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee, verification_status_encoded, grade_index, purpose_encoded, addr_state_encoded, home_ownership_encoded, initial_list_status_encoded, target) [1m[34mfeatureColumns[0m: [1m[32mArray[String][0m = Array(term, acc_now_delinq, annual_inc, application_type, dti, installment, int_rate, loan_amnt, tax_liens, delinq_amnt, pub_rec, last_fico_range_high, last_fico_range_low, recoveries, collection_recovery_fee, verification_status_encoded, grade_index, purpose_encoded, addr_state_encoded, home_ownership_encoded, initial_list_status_en...
Create VectorAssembler to combine feature columns into a single vector
%spark
val assembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("rawFeatures")
[1m[34massembler[0m: [1m[32morg.apache.spark.ml.feature.VectorAssembler[0m = VectorAssembler: uid=vecAssembler_20dd9c707548, handleInvalid=error, numInputCols=21
Create MinMaxScaler to scale the features
%spark
val scaler = new MinMaxScaler()
.setInputCol("rawFeatures")
.setOutputCol("features")
[1m[34mscaler[0m: [1m[32morg.apache.spark.ml.feature.MinMaxScaler[0m = minMaxScal_3daaa6642b2c
%spark
// Create a DecisionTreeClassifier
val dt = new DecisionTreeClassifier()
.setLabelCol("target")
.setFeaturesCol("features")
// Chain assembler, scaler, and tree in a Pipeline
val dtPipeline = new Pipeline()
.setStages(Array(assembler, scaler, dt))
[1m[34mdt[0m: [1m[32morg.apache.spark.ml.classification.DecisionTreeClassifier[0m = dtc_27456753ae61 [1m[34mdtPipeline[0m: [1m[32morg.apache.spark.ml.Pipeline[0m = pipeline_1547b5809387
%spark
// Train the model using the pipeline on the training DataFrame
val dtModel = dtPipeline.fit(traindf)
// Make predictions on the test DataFrame
val dtPredictions = dtModel.transform(testDF)
// Select and show example rows with prediction, target, and features
dtPredictions.select("prediction", "target", "features").show(5)
+----------+------+--------------------+ |prediction|target| features| +----------+------+--------------------+ | 0.0| 1|(86,[2,4,5,6,7,11...| | 0.0| 0|(86,[2,4,5,6,7,11...| | 1.0| 1|(86,[2,4,5,6,7,10...| | 0.0| 1|(86,[2,4,5,6,7,11...| | 0.0| 0|(86,[2,4,5,6,7,11...| +----------+------+--------------------+ only showing top 5 rows [1m[34mdtModel[0m: [1m[32morg.apache.spark.ml.PipelineModel[0m = pipeline_1547b5809387 [1m[34mdtPredictions[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 25 more fields]
%spark
// Evaluate accuracy and F1-score using MulticlassClassificationEvaluator
val dtEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("target")
.setPredictionCol("prediction")
val dtAccuracy = dtEvaluator.setMetricName("accuracy").evaluate(dtPredictions)
val dtF1 = dtEvaluator.setMetricName("f1").evaluate(dtPredictions)
// Cast prediction and target to Double before creating RDD
val dtPredictionAndLabels = dtPredictions
.select("prediction", "target")
.withColumn("prediction", col("prediction").cast("double"))
.withColumn("target", col("target").cast("double"))
.rdd
.map(row => (row.getDouble(0), row.getDouble(1)))
// Create MulticlassMetrics
val dtMetrics = new MulticlassMetrics(dtPredictionAndLabels)
// Precision and recall for class 1
val dtPrecision = dtMetrics.precision(1.0)
val dtRecall = dtMetrics.recall(1.0)
// Print evaluation metrics
println("Decision tree Evaluation Metrics:")
println(f"Accuracy: ${dtAccuracy}%.4f")
println(f"F1-Score: ${dtF1}%.4f")
println(f"Precision (Class 1): ${dtPrecision}%.4f")
println(f"Recall (Class 1): ${dtRecall}%.4f")
// Show confusion matrix
println("Confusion Matrix:")
println(dtMetrics.confusionMatrix.toString)
Decision tree Evaluation Metrics: Accuracy: 0.9385 F1-Score: 0.9374 Precision (Class 1): 0.8912 Recall (Class 1): 0.8107 Confusion Matrix: 211312.0 5831.0 11147.0 47740.0 [1m[34mdtEvaluator[0m: [1m[32morg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator[0m = MulticlassClassificationEvaluator: uid=mcEval_880cc59b4b68, metricName=f1, metricLabel=0.0, beta=1.0, eps=1.0E-15 [1m[34mdtAccuracy[0m: [1m[32mDouble[0m = 0.9384921928775858 [1m[34mdtF1[0m: [1m[32mDouble[0m = 0.9374103251527375 [1m[34mdtPredictionAndLabels[0m: [1m[32morg.apache.spark.rdd.RDD[(Double, Double)][0m = MapPartitionsRDD[649] at map at:64 [1m[34mdtMetrics[0m: [1m[32morg.apache.spark.mllib.evaluation.MulticlassMetrics[0m = org.apache.spark.mllib.evaluation.MulticlassMetrics@4936ec1d [1m[34mdtPrecision[0m: [1m[32mDouble[0m = 0.8911537958970338 [1m[34mdtRecall[0m: [1m[32mDouble[0m = 0.8107052490362898
%spark
val rf = new RandomForestClassifier()
.setLabelCol("target")
.setFeaturesCol("features")
.setNumTrees(50)
.setSeed(42)
.setMaxDepth(10)
// Chain assembler, scaler, and RandomForest in a Pipeline
val rfPipeline = new Pipeline()
.setStages(Array(assembler, scaler, rf))
[1m[34mrf[0m: [1m[32morg.apache.spark.ml.classification.RandomForestClassifier[0m = rfc_a1182570578f [1m[34mrfPipeline[0m: [1m[32morg.apache.spark.ml.Pipeline[0m = pipeline_b6a25706740a
%spark
// Train the model using the pipeline on the training DataFrame
val rfModel = rfPipeline.fit(traindf)
// Make predictions on the test DataFrame
val rfPredictions = rfModel.transform(testDF)
// Select and show example rows with prediction, target, probability, and features
println("Sample predictions:")
rfPredictions.select("prediction", "target", "probability", "features").show(5)
Sample predictions: +----------+------+--------------------+--------------------+ |prediction|target| probability| features| +----------+------+--------------------+--------------------+ | 0.0| 1|[0.95265186520119...|(86,[2,4,5,6,7,11...| | 0.0| 0|[0.95488686594927...|(86,[2,4,5,6,7,11...| | 1.0| 1|[0.01311484207614...|(86,[2,4,5,6,7,10...| | 1.0| 1|[0.41052203996494...|(86,[2,4,5,6,7,11...| | 0.0| 0|[0.94446260966670...|(86,[2,4,5,6,7,11...| +----------+------+--------------------+--------------------+ only showing top 5 rows [1m[34mrfModel[0m: [1m[32morg.apache.spark.ml.PipelineModel[0m = pipeline_b6a25706740a [1m[34mrfPredictions[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 25 more fields]
%spark
val rfEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("target")
.setPredictionCol("prediction")
// Calculate evaluation metrics
val rfAccuracy = rfEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
val rfF1 = rfEvaluator.setMetricName("f1").evaluate(rfPredictions)
val rfWeightedPrecision = rfEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val rfWeightedRecall = rfEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)
// Print evaluation metrics
println("\nRandomForest Evaluation Metrics:")
println(f"Accuracy: ${rfAccuracy}%.4f")
println(f"F1-Score: ${rfF1}%.4f")
println(f"Weighted Precision: ${rfWeightedPrecision}%.4f")
println(f"Weighted Recall: ${rfWeightedRecall}%.4f")
// Calculate confusion matrix using MulticlassMetrics
val rfPredictionAndLabels = rfPredictions
.select("prediction", "target")
.withColumn("prediction", col("prediction").cast("double"))
.withColumn("target", col("target").cast("double"))
.rdd
.map(row => (row.getDouble(0), row.getDouble(1)))
val rfMetrics = new MulticlassMetrics(rfPredictionAndLabels)
// Print confusion matrix
println("\nConfusion Matrix:")
println(rfMetrics.confusionMatrix.toString)
RandomForest Evaluation Metrics: Accuracy: 0.9413 F1-Score: 0.9394 Weighted Precision: 0.9409 Weighted Recall: 0.9413 Confusion Matrix: 213704.0 3439.0 12751.0 46136.0 [1m[34mrfEvaluator[0m: [1m[32morg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator[0m = MulticlassClassificationEvaluator: uid=mcEval_44d2660b6e9b, metricName=weightedRecall, metricLabel=0.0, beta=1.0, eps=1.0E-15 [1m[34mrfAccuracy[0m: [1m[32mDouble[0m = 0.9413469550411188 [1m[34mrfF1[0m: [1m[32mDouble[0m = 0.9394447495588897 [1m[34mrfWeightedPrecision[0m: [1m[32mDouble[0m = 0.9409062805086816 [1m[34mrfWeightedRecall[0m: [1m[32mDouble[0m = 0.9413469550411188 [1m[34mrfPredictionAndLabels[0m: [1m[32morg.apache.spark.rdd.RDD[(Double, Double)][0m = MapPartitionsRDD[829] at map at:73 [1m[34mrfMetrics[0m: [1m[32morg.apache.spark.mllib.evaluation.MulticlassMetrics[0m = org.apache.spark.mllib.evaluation.MulticlassMetrics@b36082d
%spark
val gbt = new GBTClassifier()
.setLabelCol("target")
.setFeaturesCol("features")
.setMaxIter(10) // Number of trees (iterations)
.setSeed(42)
.setMaxDepth(10) // Maximum depth of each tree
// Chain assembler, scaler, and GBT in a Pipeline
val gbtPipeline = new Pipeline()
.setStages(Array(assembler, scaler, gbt))
[1m[34mgbt[0m: [1m[32morg.apache.spark.ml.classification.GBTClassifier[0m = gbtc_c5dfebb505ce [1m[34mgbtPipeline[0m: [1m[32morg.apache.spark.ml.Pipeline[0m = pipeline_845aaf26109f
%spark
// Train the model using the pipeline on the training DataFrame
val gbtModel = gbtPipeline.fit(traindf)
// Make predictions on the test DataFrame
val gbtPredictions = gbtModel.transform(testDF)
// Select and show example rows with prediction, target, probability, and features
println("Sample predictions:")
gbtPredictions.select("prediction", "target", "probability", "features").show(5)
Sample predictions: +----------+------+--------------------+--------------------+ |prediction|target| probability| features| +----------+------+--------------------+--------------------+ | 0.0| 1|[0.91545855834496...|(86,[2,4,5,6,7,11...| | 0.0| 0|[0.90142643143001...|(86,[2,4,5,6,7,11...| | 1.0| 1|[0.06587782434719...|(86,[2,4,5,6,7,10...| | 1.0| 1|[0.44432633845501...|(86,[2,4,5,6,7,11...| | 0.0| 0|[0.91543345506188...|(86,[2,4,5,6,7,11...| +----------+------+--------------------+--------------------+ only showing top 5 rows [1m[34mgbtModel[0m: [1m[32morg.apache.spark.ml.PipelineModel[0m = pipeline_845aaf26109f [1m[34mgbtPredictions[0m: [1m[32morg.apache.spark.sql.DataFrame[0m = [term: int, acc_now_delinq: int ... 25 more fields]
%spark
val gbtEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("target")
.setPredictionCol("prediction")
// Calculate evaluation metrics
val gbtAccuracy = gbtEvaluator.setMetricName("accuracy").evaluate(gbtPredictions)
val gbtF1 = gbtEvaluator.setMetricName("f1").evaluate(gbtPredictions)
val gbtWeightedPrecision = gbtEvaluator.setMetricName("weightedPrecision").evaluate(gbtPredictions)
val gbtWeightedRecall = gbtEvaluator.setMetricName("weightedRecall").evaluate(gbtPredictions)
// Print evaluation metrics
println("\nGradient Boosting Evaluation Metrics:")
println(f"Accuracy: ${gbtAccuracy}%.4f")
println(f"F1-Score: ${gbtF1}%.4f")
println(f"Weighted Precision: ${gbtWeightedPrecision}%.4f")
println(f"Weighted Recall: ${gbtWeightedRecall}%.4f")
// Calculate confusion matrix using MulticlassMetrics
val gbtPredictionAndLabels = gbtPredictions
.select("prediction", "target")
.withColumn("prediction", col("prediction").cast("double"))
.withColumn("target", col("target").cast("double"))
.rdd
.map(row => (row.getDouble(0), row.getDouble(1)))
val gbtMetrics = new MulticlassMetrics(gbtPredictionAndLabels)
// Print confusion matrix
println("\nConfusion Matrix:")
println(gbtMetrics.confusionMatrix.toString)
Gradient Boosting Evaluation Metrics: Accuracy: 0.9432 F1-Score: 0.9421 Weighted Precision: 0.9423 Weighted Recall: 0.9432 Confusion Matrix: 212324.0 4819.0 10847.0 48040.0 [1m[34mgbtEvaluator[0m: [1m[32morg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator[0m = MulticlassClassificationEvaluator: uid=mcEval_724f979234cd, metricName=weightedRecall, metricLabel=0.0, beta=1.0, eps=1.0E-15 [1m[34mgbtAccuracy[0m: [1m[32mDouble[0m = 0.9432452994239756 [1m[34mgbtF1[0m: [1m[32mDouble[0m = 0.9421030107560974 [1m[34mgbtWeightedPrecision[0m: [1m[32mDouble[0m = 0.9423157980755705 [1m[34mgbtWeightedRecall[0m: [1m[32mDouble[0m = 0.9432452994239757 [1m[34mgbtPredictionAndLabels[0m: [1m[32morg.apache.spark.rdd.RDD[(Double, Double)][0m = MapPartitionsRDD[1321] at map at:73 [1m[34mgbtMetrics[0m: [1m[32morg.apache.spark.mllib.evaluation.MulticlassMetrics[0m = org.apache.spark.mllib.evaluation.MulticlassMetrics@2b...
Our three classifiers show strong but slightly varying performance:
However, these results are based only on a single train–test split, which may not reflect true model performance because:
Conclusion
Since Gradient Boosting (GBT) performs best among the three, it is the most suitable classifier for cross-validation.
%spark val gbtParamGrid = new ParamGridBuilder() .addGrid(gbt.maxDepth, Array(3, 6, 8)) .addGrid(gbt.maxIter, Array(20, 50)) .build()
[1m[34mgbtParamGrid[0m: [1m[32mArray[org.apache.spark.ml.param.ParamMap][0m =
Array({
gbtc_c5dfebb505ce-maxDepth: 3,
gbtc_c5dfebb505ce-maxIter: 20
}, {
gbtc_c5dfebb505ce-maxDepth: 6,
gbtc_c5dfebb505ce-maxIter: 20
}, {
gbtc_c5dfebb505ce-maxDepth: 8,
gbtc_c5dfebb505ce-maxIter: 20
}, {
gbtc_c5dfebb505ce-maxDepth: 3,
gbtc_c5dfebb505ce-maxIter: 50
}, {
gbtc_c5dfebb505ce-maxDepth: 6,
gbtc_c5dfebb505ce-maxIter: 50
}, {
gbtc_c5dfebb505ce-maxDepth: 8,
gbtc_c5dfebb505ce-maxIter: 50
})
%spark
// Evaluator for CV
val gbtCVEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("target")
.setPredictionCol("prediction")
.setMetricName("accuracy")
// CrossValidator (k=5)
val gbtCV = new CrossValidator()
.setEstimator(gbtPipeline)
.setEvaluator(gbtCVEvaluator)
.setEstimatorParamMaps(gbtParamGrid)
.setNumFolds(5)
[1m[34mgbtCVEvaluator[0m: [1m[32morg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator[0m = MulticlassClassificationEvaluator: uid=mcEval_e4bd8f9f934e, metricName=accuracy, metricLabel=0.0, beta=1.0, eps=1.0E-15 [1m[34mgbtCV[0m: [1m[32morg.apache.spark.ml.tuning.CrossValidator[0m = cv_becbc246d5c8
%spark
// Train using cross validation
/*
val gbtCVModel = gbtCV.fit(traindf)
// Make predictions on the test set
val gbtCVPredictions = gbtCVModel.transform(testDF)
// Show example predictions
gbtCVPredictions.select("prediction", "target", "features").show(5)
*/
%spark
// Compute accuracy and F1 after CV
/*
val gbtCVAccuracy = gbtCVEvaluator.setMetricName("accuracy").evaluate(gbtCVPredictions)
val gbtCVF1 = gbtCVEvaluator.setMetricName("f1").evaluate(gbtCVPredictions)
// Prepare RDD for MulticlassMetrics
val gbtCVPredictionAndLabels = gbtCVPredictions
.select("prediction", "target")
.withColumn("prediction", col("prediction").cast("double"))
.withColumn("target", col("target").cast("double"))
.rdd
.map(row => (row.getDouble(0), row.getDouble(1)))
val gbtCVMetrics = new MulticlassMetrics(gbtCVPredictionAndLabels)
// Precision and recall for class 1
val gbtCVPrecision = gbtCVMetrics.precision(1.0)
val gbtCVRecall = gbtCVMetrics.recall(1.0)
// Print results
println("GBT Classifier Cross-Validation Evaluation Metrics:")
println(f"Accuracy: ${gbtCVAccuracy}%.4f")
println(f"F1-Score: ${gbtCVF1}%.4f")
println(f"Precision (Class 1): ${gbtCVPrecision}%.4f")
println(f"Recall (Class 1): ${gbtCVRecall}%.4f")
println("Confusion Matrix:")
println(gbtCVMetrics.confusionMatrix.toString)
*/