spark-5 Spark ML

Spark ML

  • 5. Spark ML
    • 5.1 Spark ML Pipelines
      • 5.1.1 Key concepts:
      • 5.1.2 How to build a workflow
    • 5.2 Typical machine learning workflow
    • 5.3 Introduction to Machine Learning Algorithms
      • 5.3.1 Data processing
      • 5.3.2 Feature Engineering
      • 5.3.3 Model selection and establishment
      • 5.3.4 Model Evaluation
    • 5.4 Code example

5. Spark ML

Functionally speaking, ML is very similar to machine learning libraries such as Scikit-Learn, but the calculation engine uses Spark, that is, all calculation processes are distributed, which is the biggest difference between it and other machine learning libraries.

In the Spark ML module, it is mainly divided into two packages: spark.ml and spark.mllib. We call the former Spark ML API and the latter Spark MLlib API. The biggest difference between MLlib and ML is that ML is based on DataFrame, while the MLlib API is based on RDD, which is similar to the relationship between GraphX and GraphFrame. After Spark 2.0, the RDD-based API, that is, the MLlib API, has entered the maintenance state, and the primary API of Spark MLlib is the ML API.

There are three reasons why Spark ML provides a DataFrame-based API:
(1) Provide a unified API across languages;
(2) DataFrame is easy to use ML Pipelines features;
(3) Improve performance.

5.1 Spark ML Pipelines

The Spark ML API introduces the Pipelines API (pipeline), which is similar to the Pipeline in the Python machine learning library Scikit-Learn. It uses a series of APIs to define and standardize each workflow, which includes data collection, preprocessing, feature extraction, A series of stages such as feature selection, model fitting, model validation, and model evaluation. For example, classifying documents may include word segmentation, feature extraction, training a classification model, and tuning. Most machine learning libraries are not designed for distributed computing, nor do they provide pipeline creation and tuning. This is the biggest difference between Spark ML PipeLines and traditional machine learning libraries.

Spark ML Pipelines provides a modular abstraction for the distributed machine learning process, which makes it easier to combine multiple algorithms into one Pipeline or workflow.

5.1.1 Key concepts:

(1) DataFrame: DataFrame, like the DataFrame used in Spark SQL, is the basic data structure of Spark and runs through the entire Pipeline. It can store text, feature vectors, training and test sets. In addition to common types, DataFrame also supports the Spark MLlib-specific Vector type.

(2) Transformer: Transformer corresponds to the process of data conversion. It receives a DataFrame, and under its action, a new DataFrame will be generated. In machine learning, it is often used in the process involving feature conversion. In addition, it is also used in the process of converting the feature data set into a data set with prediction results for the trained model. Transformer must implement the transform() method .

(3) Estimator: From the above definition of Transformer, we can know that the trained model is also a Transformer, then the Estimator contains an algorithm that allows the data set to fit a Transformer, and the Estimator must implement the fit() method.

(4) Pipeline: A Pipeline assembles multiple Transformers and Estimators into a specific machine learning workflow.

(5) Parameter: All Estimators and Transformers share a common set of APIs to specify parameters.

5.1.2 How to build a workflow

A pipeline will contain one or more stages structurally, and each stage will complete a task, such as data set processing conversion, model training, parameter setting or data prediction, etc., define each workflow stage in the pipeline, and then follow the Specific processing logic, organize Pipeline Stages in an orderly manner and create a Pipeline.

pipeline =Pipeline(stages=[stage1,stage2,stage3])

Taking the text classification on the spark official website as an example, the text stored in the row form of the DataFrame is first converted into words (Words) through the tokenization process of the Tokenizer, and then converted into features (Feature vectors) by HashingTF, and the features are obtained by LR. regression model.
During the training phase, the Pipeline is as follows:


Figure 5.1 The pipeline in the training phase

During the testing phase, the Pipeline is as follows:


Figure 5.2 The pipeline in the testing phase

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF,Tokenizer

spark = SparkSession.builder.master("local").appName("spark ML").getOrCreate()

#Create DataFrame training set
#Training set includes fields id, text, label
df_train = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

#Build Transformers and Evaluators
#Define the tokenizer, the Tokenizer that comes with spark splits words with spaces; inputCol is the input column name, and outputCol is the column name of the conversion output
tokenizer=Tokenizer(inputCol="text", outputCol="words")
hashTf=HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
 
# create training pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
 
#training model
model=pipeline.fit(df_train)

# Test the DataFrame build
df_test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])
 
#test
predict = model. transform(df_test)
#Display prediction results id|text|words|features|rawPrediction|probability|prediction
predict. show()

5.2 Typical machine learning workflow


Figure 5.3 Typical machine learning modeling process

A typical machine learning application involves several steps from input, processing to output, forming a scientific workflow, as shown in Figure 6-1. A typical machine learning application involves the following steps:

(1) Data processing: load sample data, parse the data into the format required by the algorithm, preprocess the data and handle missing values, divide the training set and test set.
(2) Feature engineering: including feature selection and feature extraction.
(3) Model selection and training: select an algorithm according to the application scenario, build and train a machine learning model, use training data to make predictions and observe the results, use test data to test and evaluate the model, or use a third data set (called verification data set) to validate the model using cross-validation techniques, tune the model for better performance and accuracy, and tune the model for scalability so that it can handle large data sets in the future.
(4) Model quantification and deployment model.

5.3 Introduction to Machine Learning Algorithms

5.3.1 Data processing

In a real production environment, the data we get is usually unsatisfactory, for example, there are a large number of missing values, the values of features are different dimensions, There are some irrelevant features and feature values that need to be processed again. Such data cannot be directly trained, so we need to preprocess these data. Data preprocessing is a very important step in machine learning. If the data is not preprocessed in the correct way, wrong training results will often be obtained. Common preprocessing methods are described below.

5.3.2 Feature Engineering

Feature engineering: The process of extracting features (useful information) from the original data to the maximum extent, and the model established by using the features can achieve the best performance on unknown data. Generally, there are two methods of treatment:
(1) Feature selection: It is to select a subset from the original feature data set, which is an inclusive relationship without changing the original feature space.
Feature selection mainly includes four processes:

  • Generation process: generate candidate feature subsets;
  • Evaluation function: evaluate the quality of feature subsets;
  • Stop conditions: decide when to stop;
  • Validation process: Whether the subset of features is valid.
    According to the degree of correlation between modeling and subsequent modeling process, feature selection algorithms are generally divided into three categories:
  • Filter (filter): feature selection is performed first, and then the learner is trained, so the process of feature selection has nothing to do with the learner. It is equivalent to filter the features first, and then use the subset of features to train the classifier. “Scoring” the features of each dimension, that is, assigning weights to the features of each dimension, such weights represent the importance of the features of the dimension, and then sorted according to the weights.
  • Wrapper: directly use the final classifier as the evaluation function of feature selection, and select the optimal feature subset for a specific classifier. Consider the selection of subsets as a search optimization problem, generate different combinations, evaluate the combinations, and compare with other combinations. This makes subset selection an optimization problem.
  • Embedding (embedding): The process of feature selection is integrated with the process of classifier learning, and feature selection is performed during the learning process. The main idea is to learn the best attributes to improve the accuracy of the model when the model is given. This sentence is not very easy to understand. In fact, it refers to the selection of attributes that are important to the training of the model in the process of determining the model.

(2) Feature extraction: through the relationship between attributes, such as combining different attributes to obtain new attributes, thus changing the original feature space. Feature extraction needs to implement two functions:

  • Reduce data dimensionality;
  • The extracted features are valid.

5.3.3 Model selection and establishment

Supervised learning: A function can be learned or created from the training data, and new instances can be inferred from this function. Training data is composed of data and labels corresponding to the data, including two stages of learning and reasoning. There are generally two types of classification and regression.

Unsupervised learning: only the data but not the label of the data, it is necessary to learn some characteristics from the data according to a certain measure according to the characteristics of the data itself. Usually refers to clustering.

Reinforcement learning: The algorithm itself has a state (state), the algorithm interacts with the environment (environment) by means of an agent (agent), and the result of the interaction is returned in the form of rewards and punishments (reward) and acts on the algorithm itself to obtain a new state, and Iterate on this and have the ability to interact with the environment. It is generally divided into action value priority and strategy value priority.
The current machine learning algorithms supported by Spark ML are shown in Table 5.1.

Table 5.1 Machine learning algorithms supported by Spark ML

Type Algorithm supported by Spark ML
Classification Logistic regression (Binomial logistic regression, Multinomial logistic regression), decision tree classification (Decision tree classifier) Random forest classifier, Gradient-boosted tree classifier, Multilayer perceptron classifier, One-vs-Rest classifier, Naive Bayesian (naive Bayes)
Regression Linear regression, Generalized linear regression, Decision tree regression, Random forest regression, Gradient-boosted tree regression, Survival regression, Order-preserving regression (Isotonic regression)
Recommendation Collaborative filtering
Clustering K-means (k-means), Gaussian Mixture Model, Topic model (latent Dirichlet allocation (LDA) ), bisecting k-means (bisecting k-means)

5.3.4 Model Evaluation

(1) Two categories
Binary classifiers are used to classify elements of a given dataset into one of two possible groups (such as fraudulent or not fraudulent), which is a special case of multiclass classification. Most binary classification metrics can be generalized as multi-class classification metrics.


(2) Multi-category
Multi-label classification problems involve mapping each sample in a dataset to a set of class labels. In this type of classification problem, the labels are not mutually exclusive. So the predicted and true labels are now vectors of sets of labels, rather than vectors of labels. Therefore, multi-label metrics extend the basic concepts of precision, recall, etc. to set operations.


(3) Regression


(4) Sorting algorithm (recommendation algorithm)

What a ranking algorithm (often thought of as a recommender system) does is return a set of related items or documents to the user based on some training data. The definition of a dependency can vary and is usually application-specific. Ranking system metrics are designed to quantify the effectiveness of these rankings or recommendations in various contexts.

5.4 Code example

A multi-classification model is established using artificial neural network, using 4-layer network design, sigmoid is selected as the hidden layer activation function, softmax is used as the output layer, and cross-entropy loss is used as the loss function.

# -*- coding: utf-8 -*-

from __future__ import print_function
from pyspark.ml.classification import Multilayer PerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("multilayer_perceptron_classification_example").getOrCreate()

    data = spark.read.format("libsvm").load("../data/mllib/sample_multiclass_classification_data.txt")
    (train_data, test_data) = data. randomSplit([0.6, 0.4], seed=2019)

    # The input layer is the size of features (4), and the output layer is the size of labels (3)
    layers = [4, 5, 4, 3]

    # train
    trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=2019)
    model = trainer. fit(train_data)

    # Compute the accuracy on the test set
    predictions = model. transform(test_data)
    predictions.select("prediction", "label", "features").show(5)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator. evaluate(predictions)
    print("Test Error = %g" % (1.0 - accuracy))
    print(model)

    spark. stop()