Pyspark_SQL2

Pyspark

Note: If you think the blog is good, don’t forget to like and collect it. I will update the content related to artificial intelligence and big data every week. Most of the content is original, Python Java Scala SQL code, CV NLP recommendation system, etc., Spark Flink Kafka, Hbase, Hive, Flume, etc. are all pure dry goods, and the interpretation of various top conference papers makes progress together.
Continue to share Pyspark_SQL2 with you today
#博学谷IT Learning Technical Support

Article directory

  • Pyspark
  • foreword
  • 1. Spark SQL shuffle partition settings
  • 2. Data write operation of Spark SQL
  • 3. WordCount case implementation
  • Summarize

Foreword

Continue to share Pyspark_SQL2 today.

1. Spark SQL shuffle partition settings

The bottom layer of Spark SQL is essentially the RDD program of Spark, and the Spark Sql component is considered to be a translation software, which is used to translate SQL/DSL into Spark RDD program, execute and run
? There is also a shuffle partition in Spark SQL. After the shuffle partition is executed, the number of shuffle partitions is 200 by default. However, in practice, it is generally necessary to adjust this partition, because when the amount of data is relatively small, 200 partitions Relatively large, but when the amount of data is relatively large, 200 partitions appear relatively small.
How to adjust the number of shuffle partitions? spark.sql.shuffle.partitions

Solution 1: Directly modify the spark-defaults.conf configuration file. Global settings. The default value is 200. It is not recommended to modify
Set as :
spark.sql.shuffle.partition 100
Solution 2: When the client submits through the spark-submit command, dynamically set the number of shuffle partitions: more commonly used (deployment online, when submitting and running based on spark-submit)
Set as:
–conf spark.sql.shuffle.partitions=100’
Solution 3: Set it directly in the code: it is mainly used in the test and development environment, and it can be run directly by right-clicking it, but it is generally deleted when it is deployed online, and it is uniformly configured in spark-submit with the highest priority
Set as:
sparkSession.conf.set(‘spark.sql.shuffle.partitions’,100)
or:
sparkSession.builder.appName().master()
.config(‘spark.sql.shuffle.partitions’, 4).getOrCreate()

2. Spark SQL data writing operation

Demo 1: output to file json csv orc text ….

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    print("write data")

    spark = SparkSession.builder.appName("write data").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 100) \
        .getOrCreate()
    schema = StructType().add("id", IntegerType()).add("name", StringType()) \
        .add("age", IntegerType())
    df = spark.createDataFrame(data=[
        (1, "zhangsan", 20),
        (2, "lisi", 21),
        (3, "wangwu", 22),
        (4, "zhaoliu", 23),
        (5, "tianqi", 19)
    ], schema=schema)

    df = df.where("age > 20")

    # append overwrite ignore error
    df.write.mode("overwrite").format("csv")\
        .option("header", True) \
        .save("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/output1.csv")
    spark. stop()

Output the result data to a relational database based on the JDBC scheme, for example: MySQL

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("write data jdbc")

    spark = SparkSession.builder.appName("write data jdbc").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 100) \
        .getOrCreate()
    schema = StructType().add("id", IntegerType()).add("name", StringType()) \
        .add("age", IntegerType())
    df = spark.createDataFrame(data=[
        (1, "zhangsan", 20),
        (2, "lisi", 21),
        (3, "wangwu", 22),
        (4, "zhaoliu", 23),
        (5, "tianqi", 19)
    ], schema=schema)

    df = df.where("age > 20")

    # append overwrite ignore error
    df.write.jdbc(url="jdbc:mysql://node1:3306/day10_pyspark",
                  table="stu",
                  mode="append",
                  properties={<!-- -->"user": "root", "password": "xxx"})
    spark. stop()

3. WordCount case implementation

Contents of the words.txt file:

hadoop hive hive hadoop sqoop
sqoop kafka hadoop sqoop hive hive
hadoop hadoop hive sqoop kafka kafka
kafka hue kafka hbase hue hadoop hadoop hive
sqoop sqoop kafka hue hue kafka
method 1:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("word count")

    spark = SparkSession.builder.appName("word count").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    rdd_init = sc.textFile(name="file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/word.txt")
    rdd_data = rdd_init. flatMap(lambda line: line. split()). map(lambda word: (word,))

    schema = StructType().add("word", StringType(), True)
    df = spark.createDataFrame(rdd_data, schema=schema)

    # df.createTempView("word_table")
    # df_res = spark.sql("""
    # select *, count(word) as count_num
    # from word_table
    # group by word
    # """)
    #
    df. printSchema()
    df. show()

    df_res = df.groupBy("word").count()
    df_res. show()

    spark. stop()

Method 2:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("word count")

    spark = SparkSession.builder.appName("word count").master("local[*]").getOrCreate()
    schema = StructType().add("line", StringType(), True)
    df = spark. read\
        .format("text") \
        .schema(schema=schema) \
        .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/word.txt")

    # df.createTempView("t1")
    # spark.sql("""
    # select
    # words,
    # count(1) as word_cnt
    # from t1 lateral view explode(split(line," ")) as words
    # group by words
    # """).show()

    df.select(F.explode(F.split("line", " ")).alias("words")).groupBy("words").agg(
        F.count("words").alias("word_cnt")
    ).show()

    spark. stop()

Summary

Today I mainly share with you the Pyspark_SQL shuffle partition settings, write out, and word count cases.