Read from Spark.sql to Lightgbm model storage

Summary

This article will introduce the steps to read from Spark.sql to Lightgbm model storage

Overall architecture process

Import essential toolkit, data reading, data preprocessing, model building, model evaluation, field filtering, model storage

Technical details

1. Import necessary tool packages
from pyspark.conf import SparkConf #SparkConf contains various parameters for spark cluster configuration
from pyspark.sql import SQLContext #Main entrance of DataFrame and SQL methods
from pyspark.sql import SparkSession #Start Spark
import time #Calculate running time
import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score, confusion_matrix, recall_score, precision_score #Import evaluation criteria

import warnings
warnings.filterwarnings("ignore")
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns',None)

t1 = time.time()

conf = SparkConf()\
.setExecutorEnv("PYHONHASHSEED","123")\
.setMaster("yarn")

spark SparkSession \
.builder\
.config(conf=conf)\
.config("spark.driver.maxResultSize", "64g")\
.config("spark.executor.memory", "16g")\
.config("hive.exec.dynamic.partition", "true")\
.config("hive.exec.dynamic.partition.mode", "nonstrict")\
.config("spark.driver.cores", 2)\
.config("spark.driver.maxResultSize", "32g")\
.config("spark.driver.memory", "32g")\
.config("spark.executor.memory", "45g")\
.config("spark.executor.instances", 16)\
.config("spark.executor.cores", 8)\
.config("spark.kryoserializer.butter.max", "128m")\
.config("spark.network.timeout", "10000000")\
.config("spark.sql.autoBroadcastJoinThreshold", "128")\
.config("spark.sql.broadcastTimeout", "500000")\
.config("spark.sql.shuffle.partitions","800")\
.config("spark.sql.sources,partitionOverwriteMode", "dynamic")\
.config("spark.yarn.am.memory", "16g")\
.config("spark.yarn.am.cores", 2)\
.config("spark.yarn.executor.memoryOverhead", "128g")\
.config("yarn.nodemanager.vmem-check-enabled", "False")\
.config("yarn.nodemanager.pmen-check-enabled", "False")\
.config("spark.dynamicAllocation.maxExecutors", "500")\
.appName("project")\
.enableHiveSupport()\
.getOrCreate()

sc = spark.sparkContext

print(f"spark started and took {time.time()-t1:.05f}s")
2. Data reading
data = spark.sql("""select * from temp.tables""") # Enable Spark_SQL to read data
df= data.toPandas() # Convert the read data into Pandas format
df.shape
3. Data preprocessing
# Data preprocessing

df.rename(columns = {<!-- -->"flag": 'real_flag'}, inplace = True) # Modify column names
df.head(3) #Print the first three lines

del data # Delete unused variables, release memory, and reduce memory pressure
df['customer_id'] = pd.to_numeric(df['customer_id'], errors="coerce") # Convert to data format (because there is dirty data [including Chinese letters], so int cannot be used directly)
df = df.dropna(subset=['customer_id'])

df.isnull().sum() # Check missing values

# Fill in the median for missing values of age;
df["age"].fillna(df["age"].median(), inplace = True) #Mode: df["age"].mode()[0]
df["age"].value_counts() # Check the distribution

for col in df.columns:
    df[col] = df[col].astype("int") # Convert data type to int
4. Model construction
y = df['real_flag'].values #Real label
X = df.drop(['customer_id', 'real_flag'], axis=1) # Delete the customer ID and real label from the data set
X.head()

from sklearn.model_selection import train_test_split
X_train,

params = {<!-- -->'n_estimators': 1500,
            'learning_rate': 0.1,
            'max_depth': 15,
            'metric': 'auc',
            'verbose': -1,
            'seed: 2023,
            'n_jobs':-1

model=LGBMClarsifier(**params)
model.fit(X_train, y_train,
            eval_set=[(X_train, y_train), (X_test, y_test)],
            eval_metric = 'auc',
            verbose=50,
            early_stopping_rounds = 100)
y_pred = model.predict(X_test.num_iteration = model.best_iteration_)
5. Model evaluation
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)
lgb_acc = model.score(X_test, y_test) * 100
lgb_recall = recall_score(y_test, y_pred) * 100
lgb_precision = precision_score(y_test, y_pred) * 100 I
lgb_f1 = f1_score(y_test, y_pred, pos_label=1) * 100
print("1gb accuracy:{:.2f}%".format(lgb_acc))
print("lgb recall rate:{:.2f}%".fornat(lgb_recall))
print("lgb precision:{:.2f}%".format(lgb_precision))
print("lgb F1 score:{:.2f}%".format(lgb_f1))


#from sklearn.metrics import classification_report
#printf(classification_report(y_test, y_pred))

# Confusion matrix
plt.title("Confusion Matrix", fontsize=21)
data_confusion_matrix = confusion_matrix(y_test, y_pred)
sns.heatmap(data_confusion_matrix, annot=True, cmap='Blues', fmt='d', cbar='False', annot_kws={'size': 28})
plt.xlabel('Predicted 1abel')
plt.ylabel('True label')


from sklearn.metrics import roc_curve, auc
probs = model.predict_proba(X_test)
preds = probs[:, 1]
fpr, tpr, threshold = roc_curve(y_test, preds)
# Draw ROC curve
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.plot([0, 1], [0, 1], 'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive(TPR)')
plt.xlabel('False Positive(FPR)')
plt.title('ROC')
plt.legend(loc='lower right')
plt.show()
6. Field filtering (determine the number of fields required and return to the third step)
import matplotlib.pyplot as plt
import seaborn as sns
warnings.simplefilter(action="ignore", category=FutureWarning)

feature_imp = pd.DataFrame(sorted(zip(model.feature_importances_, X)), columns=['Value', 'Feature'])
plt.figure(figsize=(20, 10))
sns.barplot(x = 'Value', y = 'Feature')
plt.title(" Importance score of model features")
plt.tight_layout()
plt.show()

feature_imp_sort = feature_imp.sort_values(by = "Value", ascending=False)[:] #[:]Limit the number of displayed rows
feature_imp_sort

print(feature_imp_sort['Feature'].values)
7. Model storage
import joblib
joblib.dump(model, 'model_joblib.pkl')