Summary
This article will introduce the steps to read from Spark.sql to Lightgbm model storage
Overall architecture process
Import essential toolkit, data reading, data preprocessing, model building, model evaluation, field filtering, model storage
Technical details
1. Import necessary tool packages
from pyspark.conf import SparkConf #SparkConf contains various parameters for spark cluster configuration from pyspark.sql import SQLContext #Main entrance of DataFrame and SQL methods from pyspark.sql import SparkSession #Start Spark import time #Calculate running time import numpy as np import pandas as pd from lightgbm import LGBMClassifier from sklearn.model_selection import GridSearchCV import matplotlib.pyplot as plt import seaborn as sns from sklearn.metrics import f1_score, confusion_matrix, recall_score, precision_score #Import evaluation criteria import warnings warnings.filterwarnings("ignore") pd.set_option('display.max_rows', None) pd.set_option('display.max_columns',None) t1 = time.time() conf = SparkConf()\ .setExecutorEnv("PYHONHASHSEED","123")\ .setMaster("yarn") spark SparkSession \ .builder\ .config(conf=conf)\ .config("spark.driver.maxResultSize", "64g")\ .config("spark.executor.memory", "16g")\ .config("hive.exec.dynamic.partition", "true")\ .config("hive.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.driver.cores", 2)\ .config("spark.driver.maxResultSize", "32g")\ .config("spark.driver.memory", "32g")\ .config("spark.executor.memory", "45g")\ .config("spark.executor.instances", 16)\ .config("spark.executor.cores", 8)\ .config("spark.kryoserializer.butter.max", "128m")\ .config("spark.network.timeout", "10000000")\ .config("spark.sql.autoBroadcastJoinThreshold", "128")\ .config("spark.sql.broadcastTimeout", "500000")\ .config("spark.sql.shuffle.partitions","800")\ .config("spark.sql.sources,partitionOverwriteMode", "dynamic")\ .config("spark.yarn.am.memory", "16g")\ .config("spark.yarn.am.cores", 2)\ .config("spark.yarn.executor.memoryOverhead", "128g")\ .config("yarn.nodemanager.vmem-check-enabled", "False")\ .config("yarn.nodemanager.pmen-check-enabled", "False")\ .config("spark.dynamicAllocation.maxExecutors", "500")\ .appName("project")\ .enableHiveSupport()\ .getOrCreate() sc = spark.sparkContext print(f"spark started and took {time.time()-t1:.05f}s")
2. Data reading
data = spark.sql("""select * from temp.tables""") # Enable Spark_SQL to read data df= data.toPandas() # Convert the read data into Pandas format df.shape
3. Data preprocessing
# Data preprocessing df.rename(columns = {<!-- -->"flag": 'real_flag'}, inplace = True) # Modify column names df.head(3) #Print the first three lines del data # Delete unused variables, release memory, and reduce memory pressure df['customer_id'] = pd.to_numeric(df['customer_id'], errors="coerce") # Convert to data format (because there is dirty data [including Chinese letters], so int cannot be used directly) df = df.dropna(subset=['customer_id']) df.isnull().sum() # Check missing values # Fill in the median for missing values of age; df["age"].fillna(df["age"].median(), inplace = True) #Mode: df["age"].mode()[0] df["age"].value_counts() # Check the distribution for col in df.columns: df[col] = df[col].astype("int") # Convert data type to int
4. Model construction
y = df['real_flag'].values #Real label X = df.drop(['customer_id', 'real_flag'], axis=1) # Delete the customer ID and real label from the data set X.head() from sklearn.model_selection import train_test_split X_train, params = {<!-- -->'n_estimators': 1500, 'learning_rate': 0.1, 'max_depth': 15, 'metric': 'auc', 'verbose': -1, 'seed: 2023, 'n_jobs':-1 model=LGBMClarsifier(**params) model.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], eval_metric = 'auc', verbose=50, early_stopping_rounds = 100) y_pred = model.predict(X_test.num_iteration = model.best_iteration_)
5. Model evaluation
y_pred = model.predict(X_test) y_pred_proba = model.predict_proba(X_test) lgb_acc = model.score(X_test, y_test) * 100 lgb_recall = recall_score(y_test, y_pred) * 100 lgb_precision = precision_score(y_test, y_pred) * 100 I lgb_f1 = f1_score(y_test, y_pred, pos_label=1) * 100 print("1gb accuracy:{:.2f}%".format(lgb_acc)) print("lgb recall rate:{:.2f}%".fornat(lgb_recall)) print("lgb precision:{:.2f}%".format(lgb_precision)) print("lgb F1 score:{:.2f}%".format(lgb_f1)) #from sklearn.metrics import classification_report #printf(classification_report(y_test, y_pred)) # Confusion matrix plt.title("Confusion Matrix", fontsize=21) data_confusion_matrix = confusion_matrix(y_test, y_pred) sns.heatmap(data_confusion_matrix, annot=True, cmap='Blues', fmt='d', cbar='False', annot_kws={'size': 28}) plt.xlabel('Predicted 1abel') plt.ylabel('True label') from sklearn.metrics import roc_curve, auc probs = model.predict_proba(X_test) preds = probs[:, 1] fpr, tpr, threshold = roc_curve(y_test, preds) # Draw ROC curve roc_auc = auc(fpr, tpr) plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc) plt.plot([0, 1], [0, 1], 'r--') plt.xlim([0, 1]) plt.ylim([0, 1]) plt.ylabel('True Positive(TPR)') plt.xlabel('False Positive(FPR)') plt.title('ROC') plt.legend(loc='lower right') plt.show()
6. Field filtering (determine the number of fields required and return to the third step)
import matplotlib.pyplot as plt import seaborn as sns warnings.simplefilter(action="ignore", category=FutureWarning) feature_imp = pd.DataFrame(sorted(zip(model.feature_importances_, X)), columns=['Value', 'Feature']) plt.figure(figsize=(20, 10)) sns.barplot(x = 'Value', y = 'Feature') plt.title(" Importance score of model features") plt.tight_layout() plt.show() feature_imp_sort = feature_imp.sort_values(by = "Value", ascending=False)[:] #[:]Limit the number of displayed rows feature_imp_sort print(feature_imp_sort['Feature'].values)
7. Model storage
import joblib joblib.dump(model, 'model_joblib.pkl')