1. UDF function definition
(1) Function definition
(2) Spark supports defining functions
(3) Define UDF function
(4) Define UDF that returns Array type
(5) Define UDF that returns dictionary type
2. Window function
(1) Brief description of windowing function
(2) Syntax of window function
1. UDF function definition
(1) Function definition
Whether Hive or SparkSQL analyzes and processes data, it is often necessary to use functions. The SparkSQL module itself comes with many functions that implement public functions, which are in pyspark.sql.functions. SparkSQL, like Hive, supports defining functions: UDF and UDAF. In particular, UDF functions are the most widely used in actual projects.
There are three types of custom functions in Hive:
The first type: UDF (User-Defined_-function) function
· One-to-one relationship, input a value and then output a value after passing through the function;
· Inherit the UDF class in Hive, the method name is evaluate, and the return value cannot be void. In fact, it is to implement a method;
Second type: UDAF (User-Defined Aggregation Function) aggregation function
· Many-to-one relationship, input multiple values and output one value, usually used in conjunction with groupBy;
The third type: UDTF (User-Defined Table-Generating Functions) function
· One-to-many relationship, input one value and output multiple values (one row becomes multiple rows);
· User-defined generation function, a bit like flatMap;
(2) Spark supports defining functions
Currently, various versions of the Spark framework and various languages support custom functions: in SparkSQL, currently only UDF functions and UDAF functions are supported, and Python currently only supports UDF.
Apache Spark Version | Spark SQL UDF (Python, Java, Scala) | Spark SQL UDAF (Java, Scala) | Spark SQL UDF (R) | Hive UDF, UDAF, UDTF |
1.1-1.4 | √ | √ | ||
1.5 | √ | experimental | √ | |
1.6 | √ | √ | √ | |
2.0 | √ | √ | √ | √ |
(3) Define UDF function
①sparksession.udf.register()
The registered UDF can be used in DSL and SQL. The return value is used in DSL style, and the name given in the parameter is used in SQL style.
②pyspark.sql.functions.udf
Only available for DSL style.
Where F is: from pyspark.sql import functions as F. Among them, the method name registered as UDF refers to the specific calculation method, such as: def add(x, y): x + y. add is the method name that will be registered as UDF
# cording:utf8 from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType if __name__ == '__main__': spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate() sc = spark.sparkContext # Build an RDD rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x]) df = rdd.toDF(['num']) # TODO 1: Method 1 sparksession.udf.register(), both DSL and SQL styles can be used # UDF processing function def num_ride_10(num): returnnum*10 # Parameter 1: The name of the registered UDF. This UDF name can only be used in SQL style. # Parameter 2: UDF processing logic is a separately defined method # Parameter 3: Declare the return value type of UDF. Note: When registering UDF, it is necessary to declare the return value type, and the real return value of UDF must be consistent with the declared return value. # The UDF currently defined in this way can be used in SQL style through the name of parameter 1, and the DSL style of the user through the return value object udf2 = spark.udf.register('udf1', num_ride_10, IntegerType()) # Used in SQL style # selectExpr is executed with the expression of SELECT, the expression is a SQL-style expression (string) # select method, accepts ordinary string field names, or calculation of Column objects when returning values df.selectExpr('udf1(num)').show() # DSL style usage # Return value UDF object. If used as a method, the parameter passed in must be a Column object. df.select(udf2(df['num'])).show() # TODO 2: Method 2 registration, can only be used for DSL style udf3 = F.udf(num_ride_10, IntegerType()) df.select(udf3(df['num'])).show()
Method 1 result:
Method 2 results:
(4) Define a UDF that returns an Array type
Note: Array or list types can be described using spark’s ArrayType.
Note: Declare ArrayType like this: ArrayType(StringType()), and pass in the data type in the array in ArrayType.
# cording:utf8 from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ == '__main__': spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate() sc = spark.sparkContext # Build an RDD rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']]) df = rdd.toDF(['line']) # Register UDF, UDF execution function definition def split_line(data): return data.split(' ') # TODO 1: Method 1 See UDF later udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType())) #DLS style df.select(udf2(df['line'])).show() # SQL style df.createTempView('lines') spark.sql('SELECT udf1(line) FROM lines').show(truncate=False) # TODO 2: Construct UDF in the form of method udf3 = F.udf(split_line, ArrayType(StringType())) df.select(udf3(df['line'])).show(truncate=False)
(5) Define the UDF that returns the dictionary type
Note: Dictionary type return values can be described using StructType. StructType is a common structured type supported by Spark.
Can only be used in:
· Used to describe Schema in DF
· UDF is used to describe data whose return value is a dictionary
# cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ == '__main__': spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate() sc = spark.sparkContext # Assume there are three numbers: 1 2 3 When passing in the numbers, return the letters corresponding to the serial numbers of the numbers and then combine them with the numbers to form a dict and return #Example: Pass in 1 and return {'num':1, 'letters': 'a'} rdd = sc.parallelize([[1], [2], [3]]) df = rdd.toDF(['num']) # Register UDF def process(data): return {'num': data, 'letters': string.ascii_letters[data]} ''' If the UDF return value is a dictionary, you need to use StructType to receive it. ''' udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\ add('letters', StringType(), nullable=True)) # SQL style df.selectExpr('udf1(num)').show(truncate=False) #DSL style df.select(udf1(df['num'])).show(truncate=False)
(6) Construct UDAF function through RDD
# cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ == '__main__': spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.parallelize([1, 2, 3, 4, 5], 3) df = rdd.map(lambda x: [x]).toDF(['num']) #Method: Use the mapPartitions operator of RDD to complete the aggregation operation # If you use mapPartitions API to complete UDAF aggregation, you must use a single partition. single_partition_rdd = df.rdd.repartition(1) def process(iter): sum = 0 for row in iter: sum + = row['num'] return [sum] # Must nest the list, because the mapPartitions method requires the return value to be a list object print(single_partition_rdd.mapPartitions(process).collect())
2. Window function
(1) Brief description of window function
●Introduction
The windowing function is introduced to display both the data before aggregation and the data after aggregation. That is, add the result of the aggregate function to the last column of each row. Windowing is used to define a window for a row (the window here refers to the set of rows that the operation will operate on). It operates on a set of values. There is no need to use the GROUP BY clause to group the data. It can be done in the same Rows returns both the columns of the base row and the aggregated columns.
●Aggregation functions and windowing functions
Aggregation functions turn multiple rows into one row, count, avg…
The windowing function turns one row into multiple rows;
If the aggregate function wants to display other columns, the columns must be added to group by. The windowing function can display all the information directly without using group by.
●Classification of window functions
1.Aggregation windowing function Aggregation function (column) OVER (option), the option here can be a PARTITION BY clause, but cannot be an ORDER BY subclause Sentences.
2.Sort windowing function Sorting function (column) OVER (option), the option here can be the ORDER BY clause, or OVER (PARTITION BY clause ORDER BY clause), but not Can be a PARTITION BY clause.
3.Window function of partition type NTILE
(2) Syntax of window function
Window function syntax:
# cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ == '__main__': spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.parallelize([ ('Zhang San', 'class_1', 99), ('王五', 'class_2', 35), ('王三', 'class_3', 57), ('Wang Jiu', 'class_4', 12), ('Wang Li', 'class_5', 99), ('Wang Juan', 'class_1', 90), ('Wang Jun', 'class_2', 91), ('Wang Jun', 'class_3', 33), ('王君', 'class_4', 55), ('Wang Jun', 'class_5', 66), ('Zheng Ying', 'class_1', 11), ('Zheng Hui', 'class_2', 33), ('Zhang Li', 'class_3', 36), ('Zhang Zhang', 'class_4', 79), ('Huang Kai', 'class_5', 90), ('黄凯', 'class_1', 90), ('Huang Kai', 'class_2', 90), ('Wang Kai', 'class_3', 11), ('王凯杰', 'class_1', 11), ('Wang Kaijie', 'class_2', 3), ('Wang Jingliang', 'class_3', 99)]) schema = StructType().add('name', StringType()).\ add('class', StringType()).\ add('score', IntegerType()) df = rdd.toDF(schema) #Create table df.createTempView('stu') # TODO 1: Demonstration of aggregate window function spark.sql(''' SELECT *, AVG(score) over() AS avg_socre FROM stu ''').show() # TODO 2: Sorting-related window function calculations # RANK over, DENSE_RANK over, ROW_NUMBER over spark.sql(''' SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, RANK() OVER(ORDER BY score) AS RANK FROM study ''').show() #TODONTILE spark.sql(''' SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu ''').show()
TODO1 results:
TODO2 result display:
TODO3 result display: