Pyspark_SQL1

Pyspark

Note: If you think the blog is good, don’t forget to like and collect it. I will update the content related to artificial intelligence and big data every week. Most of the content is original, Python Java Scala SQL code, CV NLP recommendation system, etc., Spark Flink Kafka, Hbase, Hive, Flume, etc. are all pure dry goods, and the interpretation of various top conference papers makes progress together.
Continue to share Pyspark_SQL1 with you today
#博学谷IT Learning Technical Support

`

Article directory

  • Pyspark
  • foreword
  • 1. Data structure comparison of Spark SQL
  • 2. Introductory case of Spark SQL
  • 3. Detailed explanation of DataFrame
    • 3.1 Basic introduction to DataFrame
    • 3.2 Construction method of DataFrame
  • Summarize

Foreword

Spark SQL is more commonly used in work than Spark RDD. I usually write most of my offline tasks is Spark SQL, the importance is self-evident.

Although the bottom layer of Spark SQL is also Spark RDD, it is automatically optimized. Efficiency will be higher. The official main push is also Spark SQL.

1. Data structure comparison of Spark SQL

Example: pandas is a NumPy-based tool created to solve data analysis tasks .
illustrate:
df of pandas: two-dimensional table, processing stand-alone data
Spark Core: handle any data structure, handle large-scale distributed data
Spark SQL: two-dimensional table, processing large-scale distributed data


RDD: The storage is directly an object. For example, in the figure, the storage is a Person object, but it is not clear what data is in it.

DataFrame: Format and store the data of each field in Person to form a dataFrame, and you can directly see the data

dataSet: Store the data in the Person object in a structured way, while retaining the type of the object, so that you can know the object from a Person

Since Python does not support generics, the DataSet type cannot be used, and the client only supports the dataFrame type

2. Introductory case of Spark SQL

data set:
1, Zhang San, male, Beijing
2, Li Si, female, Shanghai
3, Wang Wu, female, Beijing
4, Zhao Liu, male, Guangzhou
5, Tian Qi, male, Beijing
6, Friday, female, Hangzhou

Requirements: With the following structured data, who are the students in the Beijing area that are required to be queried?

from pyspark import SparkContext, SparkConf
import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    print("Spark init")
    spark = SparkSession.builder.appName("Spark Sql init").master("local[*]").getOrCreate()

    df = spark.read.csv(
        path="file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/a.txt",
        header=True,
        inferSchema=True,
        sep=",",
        encoding="UTF-8"
    )

    df. printSchema()
    df. show()

    # method one
    df.createOrReplaceTempView("df_temp")
    df_res = spark.sql("""
        select * from df_temp where address = 'Beijing'
    """)

    # Method Two
    df_res = df.where("address = 'Beijing'")

    df_res. show()
    spark. stop()

3. Detailed explanation of DataFrame

3.1 Basic introduction to DataFrame


A dataFrame representation is a two-dimensional table, a two-dimensional table must have row list structure description information

Table structure description information (metadata): StructType
Field: StructField
Definition: the name of the field, the type of the field, whether the field can be Null

Think: Under a StructType object, it is composed of multiple StructFields to construct a complete metadata information
row: Row object
Column: Column object
Note: dataFrame is essentially an RDD, it just wraps the RDD and adds schema metadata information based on it to process structured data

3.2 Construction method of DataFrame

Method 1: Get a dataFrame through RDD

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("Spark Rdd Df")

    spark = SparkSession.builder.appName("Spark Rdd Df").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    rdd_init = sc. parallelize(['zhangsan 20', 'lisi 18', 'wangwu 23'])
    rdd_map = rdd_init.map(lambda line: (line.split()[0], int(line.split()[1])))

    schema = StructType(). add("name", StringType(), True). add("age", IntegerType(), True)

    # schema = StructType(
    # fields=[
    # StructField("name", StringType(), True),
    # StructField("age", IntegerType(), True)
    # ]
    # )

    df = spark.createDataFrame(data=rdd_map, schema=schema)
    df. printSchema()
    df. show()

    spark. stop()

Method 2: How to convert Pandas DF to Spark SQL DF object

from pyspark import SparkContext, SparkConf
import os
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("pandas df to Spark df")

    spark = SparkSession.builder.appName("pandas df to Spark df").master("local[*]").getOrCreate()
    pd_df = pd.DataFrame({<!-- -->
        "id": [1, 2, 3, 4],
        "name": ["zhangsan", "lisi", "wangwu", "zhaoliu"],
        "address": ["beijing", "shanghai", "shenzhen", "guangzhou"]
    })

    schema = StructType().add("id", IntegerType(), False).add("name", StringType(), False).add("address", StringType(),
                                                                                               False)
    spark_df = spark.createDataFrame(data=pd_df, schema=schema)
    spark_df. printSchema()
    spark_df. show()

    spark. stop()

Method 3: The method of internally initializing data to directly obtain a DF object

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("create spark df")

    spark = SparkSession.builder.appName("create spark df").master("local[*]").getOrCreate()
    schema = StructType().add("id", IntegerType(), False).add("name", StringType(), False).add("address", StringType(),
                                                                                               False)
    df = spark.createDataFrame(
        data=[(1, "zhangsan", "beijing"),
              (2, "lisi", "shanghai"),
              (3, "wangwu", "guangzhou")],
        schema=schema
    )
    df. printSchema()
    df. show()

    spark. stop()

Method 4: Get the DF object by reading the external txt file

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("read text")

    spark = SparkSession.builder.appName("read text").master("local[*]").getOrCreate()
    schema = StructType().add("dept", StringType(), False)
    
    df = spark. read \
        .format("text") \
        .schema(schema=schema) \
        .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/dept.txt")

    df. printSchema()
    df. show()

    spark. stop()

Method 5: Obtain the DF object by reading the external csv file

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("read csv")

    spark = SparkSession.builder.appName("read csv").master("local[*]").getOrCreate()
    schema = StructType(). add("id", IntegerType(), False). add("dept", StringType(), False)
    
    df = spark. read \
        .format("csv") \
        .option("sep", " ") \
        .option("header", True) \
        .option("encoding", "utf-8") \
        .schema(schema=schema) \
        .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/dept.txt")

    df. printSchema()
    df. show()

    spark. stop()

Method 6: Obtain DF object by reading external json file

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("read json")

    spark = SparkSession.builder.appName("read json").master("local[*]").getOrCreate()
    schema = StructType(). add("id", IntegerType(), False). add("name", StringType(), False) \
        .add("age", IntegerType(), False).add("address", StringType(), True)
    df = spark.read \
        .format("json") \
        .schema(schema=schema) \
        .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/stu.json")

    df. printSchema()
    df. show()

    spark. stop()

Summary

Today I mainly share with you the basic concepts and introductory cases of Pyspark_SQL.