Article directory
-
- RDD operations
-
- Function classification
- Transformation function
- Action function
- Basic exercises [Wordcount quick demonstration]
- Transformer operator
- Action operator
- important function
- postscript
RDD operations
Function classification
- *The Transformation operation only establishes the calculation relationship, and the Action operation is the actual executor*.
- Transformation operator
- conversion operator
- There is no conversion between operations. If you want to see the result, trigger it through the action operator.
- Action operator
- action operator
- Trigger the execution of the Job and see the result information
Transformation function
value type valueType
map
flatMap
filter
mapValue
DoubleValueType
- intersection
- union
- difference
- distinct
Key-Value value type
- reduceByKey
- groupByKey
- sortByKey
- combineByKey is the underlying API
- foldBykey
- aggregateBykey
Action function
- collect
- saveAsTextFile
- first
- take
- takeSample
- top
Basic exercises [Wordcount quick demonstration]
Transformer operator
- Single value type code
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf,SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions ''' if __name__ == '__main__': # 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN")# Generally you don’t write like this at work, just copy the log4j file directly # 2-map operation rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6]) rdd__map = rdd1.map(lambda x: x * 2) print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]] # 3-filter operation print(rdd1.glom().collect()) print(rdd1.filter(lambda x: x > 3).glom().collect()) #4-flatMap rdd2 = sc.parallelize([" hello you", "hello me "]) print(rdd2.flatMap(lambda word: re.split("\s + ", word.strip())).collect()) #5-groupBY x = sc.parallelize([1, 2, 3]) y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B') print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])] #6-mapValue x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"]) ]) def f(x): return len(x) print(x1.mapValues(f).collect())
- Double value type code
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions ''' if __name__ == '__main__': # 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # 2-Find the union of two RDDs rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]) Union_RDD = rdd1.union(rdd2) print(Union_RDD.collect()) print(rdd1.intersection(rdd2).collect()) print(rdd2.subtract(rdd1).collect()) # Return a new RDD containing the distinct elements in this RDD. print(Union_RDD.distinct().collect()) print(Union_RDD.distinct().glom().collect())
- key-Value operator
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions ''' if __name__ == '__main__': # 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # 2-key and value type operators # groupByKey rdd1 = sc.parallelize([("a", 1), ("b", 2)]) rdd2 = sc.parallelize([("c", 1), ("b", 3)]) rdd3 = rdd1.union(rdd2) key1 = rdd3.groupByKey() print("groupByKey:",key1.collect()) #groupByKey: # [('b', <pyspark.resultiterable.ResultIterable object at 0x7f001c469c40), # ('c', <pyspark.resultiterable.ResultIterable object at 0x7f001c469310), # ('a', <pyspark.resultiterable.ResultIterable object at 0x7f001c469a00)] print(key1.mapValues(list).collect())#Need to get the value of groupByKey through mapValue print(key1.mapValues(tuple).collect()) #reduceByKey key2 = rdd3.reduceByKey(lambda x, y: x + y) print(key2.collect()) #sortByKey print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, 'b'), (1, 'c\ '), (1, 'a')] #countByKey print(rdd3.countByValue())#defaultdict(<class 'int', {('a', 1): 1, ('b', 2): 1, ('c' , 1): 1, ('b', 3): 1})
Action operator
- Partial operation
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions ''' if __name__ == '__main__': # 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # 2-key and value type operators # groupByKey rdd1 = sc.parallelize([("a", 1), ("b", 2)]) rdd2 = sc.parallelize([("c", 1), ("b", 3)]) print(rdd1.first()) print(rdd1.take(2)) print(rdd1.top(2)) print(rdd1.collect()) rdd3 = sc.parallelize([1, 2, 3, 4, 5]) from operator import add from operator import mul print(rdd3.reduce(add)) print(rdd3.reduce(mul)) rdd4 = sc.parallelize(range(0, 10)) # Can we ensure that the sampling results are consistent each time? Use seed random number seed. print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 123)) print(rdd4.takeSample(True, 3, 34))
- Other supplementary operators
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions ''' def f(iterator): # [1,2,3] [4,5] for x in iterator: # for x in [1,2,3] x=1,2,3 print 1.2.3 print(x) def f1(iterator): # [1,2,3] [4,5] sum(1 + 2 + 3) sum(4 + 5) yield sum(iterator) if __name__ == '__main__': # 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # 2-foreach-Applies a function to all elements of this RDD. rdd1 = sc.parallelize([("a", 1), ("b", 2)]) print(rdd1.glom().collect()) # def f(x):print(x) rdd1.foreach(lambda x: print(x)) # 3-foreachPartition--Applies a function to each partition of this RDD. # From a performance perspective, parallel partitioning is more efficient than elements. rdd1.foreachPartition(f) # 4-map---Convert according to elements rdd2 = sc.parallelize([1, 2, 3, 4]) print(rdd2.map(lambda x: x * 2).collect()) # 5-mapPartiton-----Convert according to partition # Return a new RDD by applying a function to each partition of this RDD. print(rdd2.mapPartitions(f1).collect()) # [3, 7]
Important functions
Basic functions
- Basic transformation
- and action operation
Partition operation function
- mapPartition
- foreachPartition
repartition function
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions alt + 6 can bring up all TODO, TODO is where Python provides reserved functions ''' if __name__ == '__main__': #TODO: 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly #TODO: 2-Execute the repartition function --repartition rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3) print("partitions num:",rdd1.getNumPartitions()) print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]] print("repartition result:") #TODO: Repartition can increase or decrease partitions, but shuffle will occur. If the partition is reduced, it is recommended to use coalesc to avoid shuffle. rdd__repartition1 = rdd1.repartition(5) print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []] rdd__repartition2 = rdd1.repartition(2) print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]] #TODO: 3-reduce partitioning--coalese print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]] print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]] print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []] # Conclusion: repartition calls coalese’s shuffle method by default. # TODO: 4-PartitonBy, you can adjust the partition, and you can also adjust the partitioner (a hash partitioner (generally scattered data), a range partitioner (sorted and photographed)) # This class specifically provides functions for KeyValue pairs whose data type is in RDD. # Among the five major features of RDD, the fourth feature is the key-value partitioner. The default is the hashpartitioner partitioner. rdd__map = rdd1.map(lambda x: (x, x)) print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3 print(rdd__map.partitionBy(2).glom().collect())aggregate function
- Code:
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions alt + 6 can bring up all TODO, TODO is where Python provides reserved functions ''' def addNum(x,y): return x + y if __name__ == '__main__': # TODO: 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # TODO: 2-Use reduce for aggregation calculations rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3) from operator import add # Get the return value -21 directly print(rdd1.reduce(add)) # TODO: 3-Use fold for aggregation calculations # The first parameter zeroValue is the initial value and will participate in the calculation of partitions. #The second parameter is the operation to perform the operation print(rdd1.fold(0, add)) # 21 print(rdd1.getNumPartitions()) # 3 print(rdd1.glom().collect()) print("fold result:", rdd1.fold(10, add)) # TODO: 3-Use aggreate for aggregate calculations # Operations within seqOp partitions, operations between combOp partitions print(rdd1.aggregate(0, add, add)) # 21 print(rdd1.glom().collect()) print("aggregate result:", rdd1.aggregate(1, add, add)) # aggregate result: 25 # Conclusion: fold is a simplified version of aggregate, and the functions within and between fold partitions are consistent. print("aggregate result:", rdd1.aggregate(1, addNum, addNum)) # aggregate result: 25
* Aggregation function of byKey class * **groupByKey----How to get value data? ------Answer: result.mapValue(list).collect** * **reduceByKey** * foldBykey
aggregateByKey
CombineByKey: This is a lower-level implementation of the bykey aggregation operator, which can achieve more complex functions.
Case 1:
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions alt + 6 can bring up all TODO, TODO is where Python provides reserved functions ''' ''' Operate on initial values ''' def createCombiner(value): #('a',[1]) return [value] # Here x=[value] result obtained by createCombiner def mergeValue(x,y): #The value of the same a here=y=1 x.append(y)#('a', [1, 1]),('b', [1]) return x def mergeCombiners(a,b): a.extend(b) return a if __name__ == '__main__': # TODO: 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # TODO: 2-Basic data processing from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) # [(a:[1,1]),(b,[1,1])] print(sorted(rdd.groupByKey().mapValues(list).collect())) # Generic functionality to combine elements per key using custom set aggregate functions. # - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) # Operate on the initial value # - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) # Merge elements within the partition # - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists) # Merge elements between partitions by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners) print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]
- Case 2
# -*- coding: utf-8 -*- # Program function: Complete the demonstration of the conversion operator of single Value type RDD from pyspark import SparkConf, SparkContext import re ''' Within a partition: An RDD can be divided into many partitions. Each partition contains a large number of elements, and each partition requires thread execution. Between partitions: There are some operations to do some accumulation between partitions alt + 6 can bring up all TODO, TODO is where Python provides reserved functions ''' ''' Operate on initial values [value,1], value refers to the current student score, 1 represents the future. Count how many exams a student has taken. ("Fred", 88)----------[88,1] ''' def createCombiner(value): # return [value, 1] ''' x represents the [value,1] value, x=[88,1] The value of the same key represented by y, such as 95 of ("Fred", 95), performs accumulation within the partition ''' def mergeValue(x, y): return [x[0] + y, x[1] + 1] ''' a = a[0] value,a[1] several exams ''' def mergeCombiners(a, b): return [a[0] + b[0], a[1] + b[1]] if __name__ == '__main__': # TODO: 1-Create SparkContext to apply for resources conf = SparkConf().setAppName("mini2").setMaster("local[*]") sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel("WARN") # Generally you don’t write like this at work, just copy the log4j file directly # TODO: 2-Basic data processing from operator import add # Here we need to realize the requirements: find the average score of a student x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), (" Wilma", 95), ("Wilma", 98)], 3) print(x.glom().collect()) # The first partition ("Fred", 88), ("Fred", 95) # The second partition ("Fred", 91), ("Wilma", 93), # The third partition ("Wilma", 95), ("Wilma", 98) #reduceByKey reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y) print("reduceBykey:", reduce_by_key_rdd.collect()) # [('Fred', 274), ('Wilma', 286)] # How to find the average grade? # Generic functionality to combine elements per key using custom set aggregate functions. # - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) # Operate on the initial value # - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) # Merge elements within the partition # - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists) # Merge elements between partitions combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners) print(combine_by_key_rdd.collect()) # [('Fred', [274, 3]), ('Wilma', [286, 3])] # How to implement the average next--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]= 274,x[1][1]=3 print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
- Interview questions:
- correlation function
AI Side Business Practical Manual: http://www.yibencezi.com/notes/253200?affiliate_id=1317 (Currently 40 + tools and practical cases, continuously updated, practical booklet ranks first, you can’t make money after three months of doing it Find me a refund and make a friend’s product)
Postscript
Blog homepage: https://manor.blog.csdn.net
Welcome to like Collect ?Leave a message Please correct me if there are any errors!
This article was originally written by Maynor and first appeared on the CSDN blog
It feels like in this life, the most affectionate and lasting gazes are all given to the mobile phone?
The column is continuously updated, welcome to subscribe: https://blog.csdn.net/xianyu120/category_12453356.html