Python big data PySpark (6) RDD operation

Article directory

    • RDD operations
      • Function classification
      • Transformation function
      • Action function
      • Basic exercises [Wordcount quick demonstration]
      • Transformer operator
      • Action operator
      • important function
    • postscript

RDD operations

Function classification

  • *The Transformation operation only establishes the calculation relationship, and the Action operation is the actual executor*.
  • image-20210911110434999
  • Transformation operator
  • conversion operator
  • There is no conversion between operations. If you want to see the result, trigger it through the action operator.
  • image-20210911110807249
  • Action operator
  • action operator
  • Trigger the execution of the Job and see the result information
  • image-20210911110850559

Transformation function

  • value type valueType

  • map

  • flatMap

  • filter

  • mapValue

DoubleValueType

  • intersection
  • union
  • difference
  • distinct

Key-Value value type

  • reduceByKey
  • groupByKey
  • sortByKey
  • combineByKey is the underlying API
  • foldBykey
  • aggregateBykey

Action function

  • collect
  • saveAsTextFile
  • first
  • take
  • takeSample
  • top

Basic exercises [Wordcount quick demonstration]

Transformer operator

  • Single value type code
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf,SparkContext
import re
'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
'''
if __name__ == '__main__':

# 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")# Generally you don’t write like this at work, just copy the log4j file directly

# 2-map operation

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd__map = rdd1.map(lambda x: x * 2)
print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]]

# 3-filter operation

print(rdd1.glom().collect())
print(rdd1.filter(lambda x: x > 3).glom().collect())

#4-flatMap

rdd2 = sc.parallelize([" hello you", "hello me "])
print(rdd2.flatMap(lambda word: re.split("\s + ", word.strip())).collect())

#5-groupBY

x = sc.parallelize([1, 2, 3])
y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B')
print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])]

#6-mapValue

x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"]) ])
def f(x): return len(x)
print(x1.mapValues(f).collect())

  • Double value type code
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
'''
if __name__ == '__main__':

# 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# 2-Find the union of two RDDs

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
Union_RDD = rdd1.union(rdd2)
print(Union_RDD.collect())
print(rdd1.intersection(rdd2).collect())
print(rdd2.subtract(rdd1).collect())

# Return a new RDD containing the distinct elements in this RDD.

print(Union_RDD.distinct().collect())
print(Union_RDD.distinct().glom().collect())

  • key-Value operator
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
'''
if __name__ == '__main__':

# 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# 2-key and value type operators

# groupByKey

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
key1 = rdd3.groupByKey()
print("groupByKey:",key1.collect())
#groupByKey:

# [('b', <pyspark.resultiterable.ResultIterable object at 0x7f001c469c40),

# ('c', <pyspark.resultiterable.ResultIterable object at 0x7f001c469310),

# ('a', <pyspark.resultiterable.ResultIterable object at 0x7f001c469a00)]

print(key1.mapValues(list).collect())#Need to get the value of groupByKey through mapValue
print(key1.mapValues(tuple).collect())

#reduceByKey

key2 = rdd3.reduceByKey(lambda x, y: x + y)
print(key2.collect())

#sortByKey

print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, 'b'), (1, 'c\ '), (1, 'a')]

#countByKey

print(rdd3.countByValue())#defaultdict(<class 'int', {('a', 1): 1, ('b', 2): 1, ('c' , 1): 1, ('b', 3): 1})

Action operator

  • Partial operation
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
'''
if __name__ == '__main__':

# 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# 2-key and value type operators

# groupByKey

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])

print(rdd1.first())
print(rdd1.take(2))
print(rdd1.top(2))
print(rdd1.collect())

rdd3 = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
from operator import mul

print(rdd3.reduce(add))
print(rdd3.reduce(mul))

rdd4 = sc.parallelize(range(0, 10))

# Can we ensure that the sampling results are consistent each time? Use seed random number seed.

print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 34))

  • Other supplementary operators
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
'''


def f(iterator): # [1,2,3] [4,5]
for x in iterator: # for x in [1,2,3] x=1,2,3 print 1.2.3
print(x)


def f1(iterator): # [1,2,3] [4,5] sum(1 + 2 + 3) sum(4 + 5)
yield sum(iterator)


if __name__ == '__main__':

# 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# 2-foreach-Applies a function to all elements of this RDD.

rdd1 = sc.parallelize([("a", 1), ("b", 2)])
print(rdd1.glom().collect())

# def f(x):print(x)

rdd1.foreach(lambda x: print(x))

# 3-foreachPartition--Applies a function to each partition of this RDD.

# From a performance perspective, parallel partitioning is more efficient than elements.

rdd1.foreachPartition(f)

# 4-map---Convert according to elements

rdd2 = sc.parallelize([1, 2, 3, 4])
print(rdd2.map(lambda x: x * 2).collect())

# 5-mapPartiton-----Convert according to partition

# Return a new RDD by applying a function to each partition of this RDD.

print(rdd2.mapPartitions(f1).collect()) # [3, 7]

Important functions

  • image-20210911145341185

Basic functions

  • Basic transformation
  • and action operation

Partition operation function

  • mapPartition
  • foreachPartition

repartition function

# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re
'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
alt + 6 can bring up all TODO,
TODO is where Python provides reserved functions
'''
if __name__ == '__main__':
#TODO: 1-Create SparkContext to apply for resources
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly
#TODO: 2-Execute the repartition function --repartition
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print("partitions num:",rdd1.getNumPartitions())
print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]]
print("repartition result:")
#TODO: Repartition can increase or decrease partitions, but shuffle will occur. If the partition is reduced, it is recommended to use coalesc to avoid shuffle.
rdd__repartition1 = rdd1.repartition(5)
print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
rdd__repartition2 = rdd1.repartition(2)
print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]]
#TODO: 3-reduce partitioning--coalese
print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]]
print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]]
print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]

# Conclusion: repartition calls coalese’s shuffle method by default.

# TODO: 4-PartitonBy, you can adjust the partition, and you can also adjust the partitioner (a hash partitioner (generally scattered data), a range partitioner (sorted and photographed))

# This class specifically provides functions for KeyValue pairs whose data type is in RDD.

# Among the five major features of RDD, the fourth feature is the key-value partitioner. The default is the hashpartitioner partitioner.

rdd__map = rdd1.map(lambda x: (x, x))
print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3
print(rdd__map.partitionBy(2).glom().collect())

aggregate function

  • image-20210911152557498
  • Code:
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
alt + 6 can bring up all TODO,
TODO is where Python provides reserved functions
'''
def addNum(x,y):
return x + y
if __name__ == '__main__':

# TODO: 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# TODO: 2-Use reduce for aggregation calculations

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
from operator import add

# Get the return value -21 directly

print(rdd1.reduce(add))

# TODO: 3-Use fold for aggregation calculations

# The first parameter zeroValue is the initial value and will participate in the calculation of partitions.

 #The second parameter is the operation to perform the operation

print(rdd1.fold(0, add)) # 21
print(rdd1.getNumPartitions()) # 3
print(rdd1.glom().collect())
print("fold result:", rdd1.fold(10, add))

# TODO: 3-Use aggreate for aggregate calculations

# Operations within seqOp partitions, operations between combOp partitions

print(rdd1.aggregate(0, add, add)) # 21
print(rdd1.glom().collect())
print("aggregate result:", rdd1.aggregate(1, add, add)) # aggregate result: 25

# Conclusion: fold is a simplified version of aggregate, and the functions within and between fold partitions are consistent.

print("aggregate result:", rdd1.aggregate(1, addNum, addNum)) # aggregate result: 25

* Aggregation function of byKey class

* **groupByKey----How to get value data? ------Answer: result.mapValue(list).collect**

* **reduceByKey**

* foldBykey

image-20210911160802926

  • aggregateByKey

  • CombineByKey: This is a lower-level implementation of the bykey aggregation operator, which can achieve more complex functions.

  • image-20210911180750943

  • Case 1:

# -*- coding: utf-8 -*-
# Program function: Complete the demonstration of the conversion operator of single Value type RDD
from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
alt + 6 can bring up all TODO,
TODO is where Python provides reserved functions
'''

'''
Operate on initial values
'''
def createCombiner(value): #('a',[1])
return [value]

# Here x=[value] result obtained by createCombiner
def mergeValue(x,y): #The value of the same a here=y=1
x.append(y)#('a', [1, 1]),('b', [1])
return x

def mergeCombiners(a,b):
a.extend(b)
return a
if __name__ == '__main__':

# TODO: 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# TODO: 2-Basic data processing

from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

# [(a:[1,1]),(b,[1,1])]

print(sorted(rdd.groupByKey().mapValues(list).collect()))

# Generic functionality to combine elements per key using custom set aggregate functions.

# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

# Operate on the initial value

# - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

# Merge elements within the partition

# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)

# Merge elements between partitions

by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]


  • Case 2
# -*- coding: utf-8 -*-

# Program function: Complete the demonstration of the conversion operator of single Value type RDD

from pyspark import SparkConf, SparkContext
import re

'''
Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution.
Between partitions: There are some operations to do some accumulation between partitions
alt + 6 can bring up all TODO,
TODO is where Python provides reserved functions
'''

'''
Operate on initial values
[value,1], value refers to the current student score, 1 represents the future. Count how many exams a student has taken.
("Fred", 88)----------[88,1]
'''


def createCombiner(value): #
return [value, 1]


'''
x represents the [value,1] value, x=[88,1]
The value of the same key represented by y, such as 95 of ("Fred", 95), performs accumulation within the partition
'''


def mergeValue(x, y):
return [x[0] + y, x[1] + 1]


'''
a = a[0] value,a[1] several exams
'''


def mergeCombiners(a, b):
return [a[0] + b[0], a[1] + b[1]]


if __name__ == '__main__':

# TODO: 1-Create SparkContext to apply for resources

conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly

# TODO: 2-Basic data processing

from operator import add

# Here we need to realize the requirements: find the average score of a student

x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), (" Wilma", 95), ("Wilma", 98)], 3)
print(x.glom().collect())

# The first partition ("Fred", 88), ("Fred", 95)

# The second partition ("Fred", 91), ("Wilma", 93),

# The third partition ("Wilma", 95), ("Wilma", 98)

#reduceByKey

reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y)
print("reduceBykey:", reduce_by_key_rdd.collect()) # [('Fred', 274), ('Wilma', 286)]

# How to find the average grade?

# Generic functionality to combine elements per key using custom set aggregate functions.

# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

# Operate on the initial value

# - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

# Merge elements within the partition

# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)

# Merge elements between partitions

combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(combine_by_key_rdd.collect()) # [('Fred', [274, 3]), ('Wilma', [286, 3])]

# How to implement the average next--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]= 274,x[1][1]=3

print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
  • Interview questions:

image-20210911160023982

  • correlation function

AI Side Business Practical Manual: http://www.yibencezi.com/notes/253200?affiliate_id=1317 (Currently 40 + tools and practical cases, continuously updated, practical booklet ranks first, you can’t make money after three months of doing it Find me a refund and make a friend’s product)

Postscript

Blog homepage: https://manor.blog.csdn.net

Welcome to like Collect ?Leave a message Please correct me if there are any errors!
This article was originally written by Maynor and first appeared on the CSDN blog
It feels like in this life, the most affectionate and lasting gazes are all given to the mobile phone?
The column is continuously updated, welcome to subscribe: https://blog.csdn.net/xianyu120/category_12453356.html