Spark_SQL function definition (define UDF function, use window function)

1. UDF function definition

(1) Function definition

(2) Spark supports defining functions

(3) Define UDF function

(4) Define UDF that returns Array type

(5) Define UDF that returns dictionary type

2. Window function

(1) Brief description of windowing function

(2) Syntax of window function


1. UDF function definition

(1) Function definition

Whether Hive or SparkSQL analyzes and processes data, it is often necessary to use functions. The SparkSQL module itself comes with many functions that implement public functions, which are in pyspark.sql.functions. SparkSQL, like Hive, supports defining functions: UDF and UDAF. In particular, UDF functions are the most widely used in actual projects.
There are three types of custom functions in Hive:

The first type: UDF (User-Defined_-function) function

· One-to-one relationship, input a value and then output a value after passing through the function;

· Inherit the UDF class in Hive, the method name is evaluate, and the return value cannot be void. In fact, it is to implement a method;

Second type: UDAF (User-Defined Aggregation Function) aggregation function

· Many-to-one relationship, input multiple values and output one value, usually used in conjunction with groupBy;

The third type: UDTF (User-Defined Table-Generating Functions) function

· One-to-many relationship, input one value and output multiple values (one row becomes multiple rows);

· User-defined generation function, a bit like flatMap;

(2) Spark supports defining functions

Currently, various versions of the Spark framework and various languages support custom functions: in SparkSQL, currently only UDF functions and UDAF functions are supported, and Python currently only supports UDF.

Spark version and supported function definitions
Apache Spark Version Spark SQL UDF (Python, Java, Scala) Spark SQL UDAF (Java, Scala) Spark SQL UDF (R) Hive UDF, UDAF, UDTF
1.1-1.4
1.5 experimental
1.6
2.0
(3) Define UDF function

①sparksession.udf.register()

The registered UDF can be used in DSL and SQL. The return value is used in DSL style, and the name given in the parameter is used in SQL style.

②pyspark.sql.functions.udf

Only available for DSL style.

Where F is: from pyspark.sql import functions as F. Among them, the method name registered as UDF refers to the specific calculation method, such as: def add(x, y): x + y. add is the method name that will be registered as UDF

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # Build an RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(['num'])

    # TODO 1: Method 1 sparksession.udf.register(), both DSL and SQL styles can be used
    # UDF processing function
    def num_ride_10(num):
        returnnum*10
    # Parameter 1: The name of the registered UDF. This UDF name can only be used in SQL style.
    # Parameter 2: UDF processing logic is a separately defined method
    # Parameter 3: Declare the return value type of UDF. Note: When registering UDF, it is necessary to declare the return value type, and the real return value of UDF must be consistent with the declared return value.
    # The UDF currently defined in this way can be used in SQL style through the name of parameter 1, and the DSL style of the user through the return value object
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # Used in SQL style
    # selectExpr is executed with the expression of SELECT, the expression is a SQL-style expression (string)
    # select method, accepts ordinary string field names, or calculation of Column objects when returning values
    df.selectExpr('udf1(num)').show()

    # DSL style usage
    # Return value UDF object. If used as a method, the parameter passed in must be a Column object.
    df.select(udf2(df['num'])).show()

    # TODO 2: Method 2 registration, can only be used for DSL style
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

Method 1 result:

Method 2 results:

(4) Define a UDF that returns an Array type

Note: Array or list types can be described using spark’s ArrayType.

Note: Declare ArrayType like this: ArrayType(StringType()), and pass in the data type in the array in ArrayType.

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # Build an RDD
    rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    df = rdd.toDF(['line'])

    # Register UDF, UDF execution function definition
    def split_line(data):
        return data.split(' ')

    # TODO 1: Method 1 See UDF later
    udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))

    #DLS style
    df.select(udf2(df['line'])).show()

    # SQL style
    df.createTempView('lines')
    spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)

    # TODO 2: Construct UDF in the form of method
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

(5) Define the UDF that returns the dictionary type

Note: Dictionary type return values can be described using StructType. StructType is a common structured type supported by Spark.
Can only be used in:
· Used to describe Schema in DF
· UDF is used to describe data whose return value is a dictionary

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # Assume there are three numbers: 1 2 3 When passing in the numbers, return the letters corresponding to the serial numbers of the numbers and then combine them with the numbers to form a dict and return
    #Example: Pass in 1 and return {'num':1, 'letters': 'a'}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])

    # Register UDF
    def process(data):
        return {'num': data, 'letters': string.ascii_letters[data]}

    '''
    If the UDF return value is a dictionary, you need to use StructType to receive it.
    '''
    udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
                              add('letters', StringType(), nullable=True))
    # SQL style
    df.selectExpr('udf1(num)').show(truncate=False)
    #DSL style
    df.select(udf1(df['num'])).show(truncate=False)

(6) Construct UDAF function through RDD

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    #Method: Use the mapPartitions operator of RDD to complete the aggregation operation
    # If you use mapPartitions API to complete UDAF aggregation, you must use a single partition.
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum + = row['num']

        return [sum] # Must nest the list, because the mapPartitions method requires the return value to be a list object

    print(single_partition_rdd.mapPartitions(process).collect())

2. Window function

(1) Brief description of window function

●Introduction

The windowing function is introduced to display both the data before aggregation and the data after aggregation. That is, add the result of the aggregate function to the last column of each row. Windowing is used to define a window for a row (the window here refers to the set of rows that the operation will operate on). It operates on a set of values. There is no need to use the GROUP BY clause to group the data. It can be done in the same Rows returns both the columns of the base row and the aggregated columns.

●Aggregation functions and windowing functions

Aggregation functions turn multiple rows into one row, count, avg…

The windowing function turns one row into multiple rows;

If the aggregate function wants to display other columns, the columns must be added to group by. The windowing function can display all the information directly without using group by.

Classification of window functions

1.Aggregation windowing function Aggregation function (column) OVER (option), the option here can be a PARTITION BY clause, but cannot be an ORDER BY subclause Sentences.

2.Sort windowing function Sorting function (column) OVER (option), the option here can be the ORDER BY clause, or OVER (PARTITION BY clause ORDER BY clause), but not Can be a PARTITION BY clause.

3.Window function of partition type NTILE

(2) Syntax of window function

Window function syntax:

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ('Zhang San', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('Wang Jiu', 'class_4', 12),
        ('Wang Li', 'class_5', 99),
        ('Wang Juan', 'class_1', 90),
        ('Wang Jun', 'class_2', 91),
        ('Wang Jun', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('Wang Jun', 'class_5', 66),
        ('Zheng Ying', 'class_1', 11),
        ('Zheng Hui', 'class_2', 33),
        ('Zhang Li', 'class_3', 36),
        ('Zhang Zhang', 'class_4', 79),
        ('Huang Kai', 'class_5', 90),
        ('黄凯', 'class_1', 90),
        ('Huang Kai', 'class_2', 90),
        ('Wang Kai', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('Wang Kaijie', 'class_2', 3),
        ('Wang Jingliang', 'class_3', 99)])
    schema = StructType().add('name', StringType()).\
        add('class', StringType()).\
        add('score', IntegerType())
    df = rdd.toDF(schema)
    #Create table
    df.createTempView('stu')

    # TODO 1: Demonstration of aggregate window function
    spark.sql('''
        SELECT *, AVG(score) over() AS avg_socre FROM stu
    ''').show()

    # TODO 2: Sorting-related window function calculations
    # RANK over, DENSE_RANK over, ROW_NUMBER over
    spark.sql('''
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS RANK
        FROM study
    ''').show()

    #TODONTILE
    spark.sql('''
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    ''').show()

TODO1 results:

TODO2 result display:

TODO3 result display: