Spark data structure: RDD

Table of Contents

1. Overview of RDD

1.RDD definition

2. Core points of RDD design:

3. Characteristics of RDD

2. RDD creation

(1) Create RDD

(2) Number of RDD partitions

3. RDD function

(1) Function classification

1. Overview of RDD

1.RDD definition

RDD (Resilient Distributed Dataset) is called a resilient distributed data set. It is the most basic data abstraction in Spark. It represents an immutable, partitionable set whose elements can be calculated in parallel.

·Dataset:A data collection used to store data.

·Distributed: The data in RDD is stored in a distributed manner and can be used for distributed computing.

·Resilient:The data in RDD can be stored in memory or on disk.

You can think of RDD as a distributed list or array, an abstract data structure. RDD is an abstract class and a generic type.

RDD hides the underlying details of Spark (automatic fault tolerance, location awareness, task scheduling execution, failure retry, etc.), allowing developers to operate RDD, a distributed data set, in a functional programming manner just like operating a local collection. Perform various parallel calculations. Many data processing functions in RDD are the same or similar to those in List.

4a29572a9fb846fcbdcdbda17300a846.png

2. The core points of RDD design:

RDD provides an abstract data model. You don’t have to worry about the distributed characteristics of the underlying data. You only need to express the specific application logic as a series of conversion operations (functions). Dependencies can also be formed between conversion operations between different RDDs. Then pipeline is realized, thereby avoiding the storage of intermediate results, greatly reducing data copying, disk IO and serialization overhead, and also providing more APl (map/reduec/filter/groupBy, etc.).

c78d71350f574f28b3d68f698ff9558d.png

3.Characteristics of RDD

(1) A list of partitions (RDD has partitions)

①A set of partitions/a partition list, which is the basic unit of the data set;

② For RDD, each shard will be processed by a computing task, and the number of shards determines the degree of parallelism;

③Users can create RDD by specifying the number of shards of the RDD. If not specified, the default value will be used;

(2) A function for computing each split (the calculation method will act on each partition (shard))

①A function will be applied to each partition;

②The calculation of RDD in Spark is based on shards, and the compute function will be applied to each partition;

(3) A list of dependencies bn other RDDs (RDDs are interdependent)

①One RDD will depend on multiple other RDDs;

② Each conversion of RDD will generate a new RDD, so a pipeline-like dependency relationship will be formed between RDDs. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of the RDD (Spark’s fault tolerance mechanism).

(4) Optienally, a Partitioner for key-value RDDs (e.g. to say that the RDD ishash-partitioned) (KV type RDD can have a partitioner)

①Optionally, there will be a Partitioner for RDD of KeyValue type, which is the partition function of RDD;

② Currently, two types of partition functions are implemented in Spark, one is hash-based HashPartitioner, and the other is range-based RangePartitioner.

③Only for key-value RDD, there will be Partitioner. The value of Parititioner for non-key-value RDD is None.

④The Partitioner function not only determines the number of fragments of the RDD itself, but also determines the number of fragments when the parent RDD Shuffle is output.

(5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (RDD partition data will be read as close to the data location as possible)

①Optional, a list that stores the preferred location for accessing each Partition;

②For an HDFS file, this list saves the location of the block where each Partition is located.

③According to the concept of “mobile data is not as good as mobile computing”, when Spark performs task scheduling, it will try its best to select those worker nodes with data for task calculation. (data locality)

RDD is a representation of a data set. It not only represents the data set, but also represents where the data set comes from and how it is calculated. The main attributes include five aspects (must be remembered to deepen understanding through coding, frequently asked in interviews):

3264ada3f1454d2784cdf1fe1af3e799.png

An important advantage of RDD design is the ability to record dependencies between RDDs, the so-called lineage. Through rich transfer operations (Transformation), a complex directed acyclic graph can be constructed, and calculations can be performed step by step through this graph.

Analysis of wordcount code RDD characteristics

2. RDD creation

(1) Create RDD

There are two main ways to encapsulate data into RDD collections: parallelizing local collections (in Driver Program) and referencing data sets loaded from external storage systems (such as HDFS, Hive, HBase, Kafka, Elasticsearch, etc.).

/* SparkApplication application, creates SparkContext object, and closes resources after application ends, code example */

def main(args: Array [string]): Unit = {

    //Build SparkContext object
    val sc : SparkContext = {
        // a. Create SparkConf object
        import org.apache.spark.{SparkConf,SparkContext}
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName.StripSuuffix("$"))
            .setMaster("local[2]")
        //b. Pass the SparkConf object and create an instance
        val context = SparkContext.getOrCreate(sparkConf) //Get it if it exists, not create it
        //c. Return the instance object
        context
    }
    //TODO: Create a local collection
    val seq:Seq[Int] = Seq(1, 2, 3, 4, 5, 6, 7, 8)
    
    //Parallelize local collection and create RDD
    /*
        def parallelize[T: ClassTag](
            seq: Seq[T],
            numSlices: Int = defaultParallelism
        ): RDD[T]
    */
    val inputRDD:RDD[Int] = sc.parallelize(seq,numSlices = 2)
    inputRDD.foreach(item => println(item))

    //TODO: external storage system
    /*
        def textFile(
            path: string,
            mihPartitions: Int = defaultMinPartitions
    ): RDD[String]
    */
    sc.textFile( path = "datas/wordcount.data", minPartitions = 2)
    //The file path: preferably the full path, you can specify the file name, you can specify the file directory, you can use wildcards to specify.
    //In an actual project, if massive data is read from HDFS and the application runs on YARN, by default, the number of RDD partitions is equal to the number of Block blocks on HDFS.


    //End application, close resources
    sc.stop()
}
(2) Number of RDD partitions

Partitioning is a physical layer concept and is the core of RDD parallel computing. The data is divided into multiple sub-sets inside the RDD. Each sub-set can be considered as a partition. The minimum operation logic will be applied to each partition. Each partition is run by a separate task. Therefore, the greater the number of partitions, the higher the parallelism of the entire application.

/*Two ways to get the number of RDD partitions:*/
    //Method 1: Obtain directly
    rdd.getNumPartitions
    //Method 2: Obtain through RDD
    rdd.partitions .length

What factors does the data of the RDD partition depend on?

①The principle of RDD partitioning is to make the number of partitions equal to the number of CPU cores in the cluster as much as possible, so that the computing resources of the CPU can be fully utilized;

②In practice, in order to fully squeeze the computing resources of the CPU, the parallelism is set to 2 to 3 times the number of CPU cores;

③The number of RDD partitions is related to the number of cores specified at startup, the number of partitions specified when calling the method, and the number of partitions in the file itself. The details are as follows:

(1). The number of CPU cores specified at startup determines a parameter value:

spark.default.parallelism=Specified number of CPU cores (minimum 2 in cluster mode).

(2) For Scala collections, call the parallelize(collection, number of partitions) method

If the number of partitions is not specified, spark.default.parallelism is used

If specified, use the specified number of partitions (do not specify greater than spark.default.parallelism)

(3), for textFile (file, number of partitions)

defaultMinPartitions:

If the number of partitions is not specified sc.defaultMinPartitions=min(defaultParallelism,2)

If specified, use the specified number of partitions. sc.defaultMinPartitions=The specified number of partitions. The number of partitions in rdd.

Number of partitions of RDD

(1) Pair with local files

Number of partitions of rdd = max (number of local file fragments, sc.defaultMinPartitions)

(2) For HDFS files

Number of partitions of rdd = max (number of blocks in hdfs file, sc.defaultMinPartitions)

Therefore, if there are multiple cores allocated and RDD is created by reading data from the file, even if the hdfs file has only one slice, the number of partitions in the final Spark RDD may be 2.

3. RDD function

(1) Function classification

For the large amount of data processed by Spark, the data will be divided and put into RDD as the basic data structure of Spark. Developers can perform rich operations on RDD, and then Spark will schedule cluster resources for calculation based on the operations. To sum up, RDD operations can be mainly divided into two types: Transformation and Action .

①Transformatioi conversion operation: return a new RDD

·Create a new dataset from an existing dataset.

·All Transformation functions are Lazy and will not be executed immediately and need to be triggered by the Action function.

②Action operation: the return value is not RDD (no return value or returns something else)

·Returns a value to the driver after running a calculation on the data table.

·All Action functions are executed immediately (Eager), such as count, first, collect, take, etc.

8f7b442e0e584463a7970ac3586da92f.png

56f3ae4a8fdc4b46aab55cd3ae5e0272.png

Function details in RDD:

① RDD does not actually store the data to be calculated, but records the location of the data and the conversion relationship of the data (what methods are called and what functions are passed in);

②All transformations in RDD are lazy evaluation/delayed execution, that is to say, they are not calculated directly. These transformations will only actually run when an Action is sent that requires a result to be returned to the Driver. The reason why lazy evaluation/delayed execution is used is that it can form a DAG directed acyclic graph for RDD operations during Action for stage division and parallel optimization. This design allows Spark to run more efficiently.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Algorithm skill tree Home page Overview 56718 people are learning the system