

Note: If you think the blog is good, don’t forget to like and collect it. I will update the content related to artificial intelligence and big data every week. Most of the content is original, Python Java Scala SQL code, CV NLP recommendation system, etc., Spark Flink Kafka, Hbase, Hive, Flume, etc. are all pure dry goods, and the interpretation of various top conference papers makes progress together.
Continue to share Pyspark_SQL5 with you today
#博学谷IT Learning Technical Support

Article directory

  • Pyspark
  • foreword
  • 1. Complete the UDF function based on Pandas
  • 2. Implement custom UDAF function based on Pandas
  • 3. Comprehensive case of implementing custom UDF and UDAF based on Pandas
  • Summarize


Continue to share Pyspark_SQL5 today.

1. Complete UDF function based on Pandas

Requirements for custom Python functions: SeriesToSeries

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win

if __name__ == '__main__':
    print("spark pandas udf")

    spark = SparkSession.builder.appName("spark pandas udf").master("local[*]")\
        .config('spark.sql.shuffle.partitions', 200) \

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

    schema = StructType(). add("a", IntegerType()) \
        .add("b", IntegerType())

    df = spark.createDataFrame([
        (1, 5),
        (2, 6),
        (3, 7),
        (4, 8),
    ], schema=schema)
    df. createTempView("t1")

    # For DSL format
    def sum_ab(a: pd.Series, b: pd.Series) -> pd.Series:
        return a + b

    # For SQL format
    spark.udf.register("sum_ab", sum_ab)
            sum_ab(a,b) as sum_ab
        from t1

    df. select("*", sum_ab("a", "b"). alias("sum_ab")). show()
    spark. stop()

2. Implement custom UDAF function based on Pandas

Requirements for custom Python functions: SeriesTo scalar

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win

if __name__ == '__main__':
    print("spark pandas udaf")

    spark = SparkSession.builder.appName("spark pandas udaf").master("local[*]")\
        .config('spark.sql.shuffle.partitions', 200) \

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

    schema = StructType(). add("a", IntegerType()) \
        .add("b", IntegerType())

    df = spark.createDataFrame([
        (1, 5),
        (2, 6),
        (3, 7),
        (4, 8),
    ], schema=schema)
    df. createTempView("t1")
df. show()

    # For DLS format
    def avg_column(a: pd.Series) -> float:
        return a. mean()

    # For SQL format
    spark.udf.register("avg_column", avg_column)

            avg_column(a) over(order by a) as a_avg,
            avg_column(b) over(order by b) as b_avg
        from t1

    df. select("*",

    spark. stop()

3. Comprehensive case of implementing custom UDF and UDAF based on Pandas

_c0,opponent,win,home and away,hits,number of shots,shooting percentage,3-point shooting percentage,rebounds,assists,scoring
0, Warriors, Win, Guest, 10, 23, 0.435, 0.444, 6, 11, 27
1, king, win, guest, 8, 21, 0.381, 0.286, 3, 9, 28
3, rocket, negative, guest, 8, 19, 0.526, 0.462, 7, 9, 20
4, Clippers, Win, Main, 8, 21, 0.526, 0.462, 7, 9, 28
5, heat, negative, off, 8, 19, 0.435, 0.444, 6, 11, 18
6, cavaliers, negative, guest, 8, 21, 0.435, 0.444, 6, 11, 28
7, Grizzlies, Negative, Main, 10, 20, 0.435, 0.444, 6, 11, 27
9, 76 people, win, master, 10, 21, 0.526, 0.462, 7, 9, 28
# Requirement 1: Assist column + 10
# Requirement 2: Number of rebounds + assists
# Requirement 3: The average score of the statistical outcome

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win

if __name__ == '__main__':
    print("spark pandas udf example")

    spark = SparkSession.builder.appName("spark pandas udf example").master("local[*]")\
        .config('spark.sql.shuffle.partitions', 200) \

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

    df ="csv")\
        .option("header", True).option("inferSchema", True) \

    df. createTempView("t1")
    df. printSchema()
    df. show()

    # Requirement 1: Assist column + 10
    # Requirement 2: Number of rebounds + assists
    # Requirement 3: The average score of the statistical outcome

    def method01(score: pd.Series) -> pd.Series:
        return score + 10

    def method02(score1: pd.Series, score2: pd.Series) -> pd.Series:
        return score1 + score2

    def method03(score: pd.Series) -> float:
        return score. mean()

    spark.udf.register("method01", method01)
    spark.udf.register("method02", method02)
    spark.udf.register("method03", method03)

            method01(`Assists`) as z_10,
            method02(`assists`,`rebounds`) as z_l_plus
        from t1
            method03(`score`) as avg_score
        from t1
        group by `winner`

    df. select("*",
              method02("Assist", "Rebound").alias("z_l_plus")
              ).show()"win or lose", "score").groupBy("win or lose").agg(

    spark. stop()


Today I mainly share pandas UDF and UDAF with you.