Pyspark
Note: If you think the blog is good, don’t forget to like and collect it. I will update the content related to artificial intelligence and big data every week. Most of the content is original, Python Java Scala SQL code, CV NLP recommendation system, etc., Spark Flink Kafka, Hbase, Hive, Flume, etc. are all pure dry goods, and the interpretation of various top conference papers makes progress together.
Continue to share Pyspark_SQL2 with you today
#博学谷IT Learning Technical Support
Article directory
- Pyspark
- foreword
- 1. Spark SQL shuffle partition settings
- 2. Data write operation of Spark SQL
- 3. WordCount case implementation
- Summarize
Foreword
Continue to share Pyspark_SQL2 today.
1. Spark SQL shuffle partition settings
The bottom layer of Spark SQL is essentially the RDD program of Spark, and the Spark Sql component is considered to be a translation software, which is used to translate SQL/DSL into Spark RDD program, execute and run
? There is also a shuffle partition in Spark SQL. After the shuffle partition is executed, the number of shuffle partitions is 200 by default. However, in practice, it is generally necessary to adjust this partition, because when the amount of data is relatively small, 200 partitions Relatively large, but when the amount of data is relatively large, 200 partitions appear relatively small.
How to adjust the number of shuffle partitions? spark.sql.shuffle.partitions
Solution 1: Directly modify the spark-defaults.conf configuration file. Global settings. The default value is 200. It is not recommended to modify
Set as :
spark.sql.shuffle.partition 100
Solution 2: When the client submits through the spark-submit command, dynamically set the number of shuffle partitions: more commonly used (deployment online, when submitting and running based on spark-submit)
Set as:
–conf spark.sql.shuffle.partitions=100’
Solution 3: Set it directly in the code: it is mainly used in the test and development environment, and it can be run directly by right-clicking it, but it is generally deleted when it is deployed online, and it is uniformly configured in spark-submit with the highest priority
Set as:
sparkSession.conf.set(‘spark.sql.shuffle.partitions’,100)
or:
sparkSession.builder.appName().master()
.config(‘spark.sql.shuffle.partitions’, 4).getOrCreate()
2. Spark SQL data writing operation
Demo 1: output to file json csv orc text ….
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * if __name__ == '__main__': print("write data") spark = SparkSession.builder.appName("write data").master("local[*]") \ .config('spark.sql.shuffle.partitions', 100) \ .getOrCreate() schema = StructType().add("id", IntegerType()).add("name", StringType()) \ .add("age", IntegerType()) df = spark.createDataFrame(data=[ (1, "zhangsan", 20), (2, "lisi", 21), (3, "wangwu", 22), (4, "zhaoliu", 23), (5, "tianqi", 19) ], schema=schema) df = df.where("age > 20") # append overwrite ignore error df.write.mode("overwrite").format("csv")\ .option("header", True) \ .save("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/output1.csv") spark. stop()
Output the result data to a relational database based on the JDBC scheme, for example: MySQL
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * import pyspark.sql.functions as F if __name__ == '__main__': print("write data jdbc") spark = SparkSession.builder.appName("write data jdbc").master("local[*]") \ .config('spark.sql.shuffle.partitions', 100) \ .getOrCreate() schema = StructType().add("id", IntegerType()).add("name", StringType()) \ .add("age", IntegerType()) df = spark.createDataFrame(data=[ (1, "zhangsan", 20), (2, "lisi", 21), (3, "wangwu", 22), (4, "zhaoliu", 23), (5, "tianqi", 19) ], schema=schema) df = df.where("age > 20") # append overwrite ignore error df.write.jdbc(url="jdbc:mysql://node1:3306/day10_pyspark", table="stu", mode="append", properties={<!-- -->"user": "root", "password": "xxx"}) spark. stop()
3. WordCount case implementation
Contents of the words.txt file:
hadoop hive hive hadoop sqoop
sqoop kafka hadoop sqoop hive hive
hadoop hadoop hive sqoop kafka kafka
kafka hue kafka hbase hue hadoop hadoop hive
sqoop sqoop kafka hue hue kafka
method 1:
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * import pyspark.sql.functions as F if __name__ == '__main__': print("word count") spark = SparkSession.builder.appName("word count").master("local[*]").getOrCreate() sc = spark.sparkContext rdd_init = sc.textFile(name="file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/word.txt") rdd_data = rdd_init. flatMap(lambda line: line. split()). map(lambda word: (word,)) schema = StructType().add("word", StringType(), True) df = spark.createDataFrame(rdd_data, schema=schema) # df.createTempView("word_table") # df_res = spark.sql(""" # select *, count(word) as count_num # from word_table # group by word # """) # df. printSchema() df. show() df_res = df.groupBy("word").count() df_res. show() spark. stop()
Method 2:
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * import pyspark.sql.functions as F if __name__ == '__main__': print("word count") spark = SparkSession.builder.appName("word count").master("local[*]").getOrCreate() schema = StructType().add("line", StringType(), True) df = spark. read\ .format("text") \ .schema(schema=schema) \ .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/word.txt") # df.createTempView("t1") # spark.sql(""" # select # words, # count(1) as word_cnt # from t1 lateral view explode(split(line," ")) as words # group by words # """).show() df.select(F.explode(F.split("line", " ")).alias("words")).groupBy("words").agg( F.count("words").alias("word_cnt") ).show() spark. stop()
Summary
Today I mainly share with you the Pyspark_SQL shuffle partition settings, write out, and word count cases.