(Nanny level) Spark meteorological monitoring data analysis – Step 2.2 Calculate the distribution of PM2.5 concentration in the five major concentration limit intervals

Table of Contents

words written in front

Required forward knowledge

Software version used

data set

Code principle

Part of the code

Code of Task2 function (main part)

Main function code

run spark

Guide package

Import of some Spark information and schema

If the spark link reports an error

operation result

General outline

(Nanny level) Spark weather monitoring data analysis – general outline

Words written in front

First of all, this blog is absolutely original. Readers can leave a message if they encounter any problems in programming, and they will reply when they see it. Since some readers are stuck in the production internship of big data, if you are anxious, you can add qq1259097853

Needed forward-looking knowledge

This blog assumes that readers have already installed Hadoop, Spark, and idea plug-ins. If you encounter difficulties in installing these big data software, you can search for the corresponding big data software installation in CSDN according to the version number.

Software version used

Hadoop2.7.7; Java1.8.0; sbt1.4.0; Spark2.4.0; Hive2.1.1; ZooKeeper3.5.10; Python3.7.9

Dataset

data set

You can also click on the link below

Link: https://pan.baidu.com/s/13T8IHjAjvbsvQtQ01Ro__Q?pwd=494j
Extraction code: 494j

Code Principle

This part of the data analysis mainly focuses on PM2.5 concentration data. The concentration limit interval is shown in Table 3.1. However, there is no strict requirement to limit it to daily data. Instead, only its interval information is used. Since the PM2.5 concentration data entry is 0 in the sixth interval and after, only the distribution in the first five intervals is calculated. The analysis mainly includes:
(1) Read res.csv, create a temporary view, and select the field PM2.5 monitoring concentration (μg/m3) from the temporary view;
(2) Use the count function to calculate the number of distributions in each interval;
(3) Store a new dataframe with the header as (interval level, quantity).

Part of the code

I divided the code into several parts. Some codes are only slightly different from before.

Task2 function code (main part)
 def Task2(df: DataFrame): Unit = {
    df.createOrReplaceTempView("PM25")
    val PM25_total = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` >= 0" ).count ()
    val PM25_1 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` >= 0 and `PM2.5 Monitoring concentration (μg/m3)` <= 35" ).count()
    // println(PM25_total)
    // println(PM25_1)
    val PM25_2 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 35 and `PM2.5 monitoring Concentration (μg/m3)` <= 75" ).count()
    val PM25_3 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 75 and `PM2.5 monitoring Concentration (μg/m3)` <= 115" ).count()
    val PM25_4 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 115 and `PM2.5 monitoring Concentration (μg/m3)` <= 150" ).count()
    val PM25_5 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 150 and `PM2.5 monitoring Concentration (μg/m3)` <= 250" ).count()
    val PM25_6 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 250 and `PM2.5 monitoring Concentration (μg/m3)` <= 350" ).count()
    val PM25_7 = spark.sql("select `PM2.5 monitoring concentration (μg/m3)` from PM25 " + "where `PM2.5 monitoring concentration (μg/m3)` > 350 and `PM2.5 monitoring Concentration (μg/m3)` <= 500" ).count()
    val ranges = Array("Total","0-35", "35-75", "75-115", "115-150", "150-250" , "250-350", "350-500")
    val counts = Array(PM25_total,PM25_1, PM25_2, PM25_3, PM25_4, PM25_5, PM25_6, PM25_7)
    val pm25DistDF = spark.createDataFrame(ranges.zip(counts)).toDF("range", "count")
    pm25DistDF.write.option("header","true").mode("overwrite").csv("file:///root/work/Task2/pm25_dist.csv")
    print("@@@@@@PM25")
    pm25DistDF.show()


    df.createOrReplaceTempView("SO2")
    val SO2_total = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` >= 0" ).count()
    val SO2_1 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` >= 0 and `SO2 monitoring concentration (μg/m3 )` <= 35" ).count()
    // println(PM25_total)
    // println(PM25_1)
    val SO2_2 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 35 and `SO2 monitoring concentration (μg/m3) ` <= 75" ).count()
    val SO2_3 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 75 and `SO2 monitoring concentration (μg/m3) ` <= 115" ).count()
    val SO2_4 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 115 and `SO2 monitoring concentration (μg/m3) ` <= 150" ).count()
    val SO2_5 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 150 and `SO2 monitoring concentration (μg/m3) ` <= 250" ).count()
    val SO2_6 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 250 and `SO2 monitoring concentration (μg/m3) ` <= 350" ).count()
    val SO2_7 = spark.sql("select `SO2 monitoring concentration (μg/m3)` from SO2 " + "where `SO2 monitoring concentration (μg/m3)` > 350 and `SO2 monitoring concentration (μg/m3) ` <= 500" ).count()
    val rangesso2 = Array("Total","0-50", "50-150", "150-475", "475-800", "800-1600" , "1600-2100", "2100-2620")
    val countsso2 = Array(SO2_total,SO2_1, SO2_2, SO2_3, SO2_4, SO2_5, SO2_6, SO2_7)
    val SO2DistDF = spark.createDataFrame(rangesso2.zip(countsso2)).toDF("range", "count")
    SO2DistDF.write.option("header","true").mode("overwrite").csv("file:///root/work/Task2/SO2_dist.csv")
    SO2DistDF.show()




  }
Main function code
 def main(args: Array[String]): Unit = {

// Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
// Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    Logger.getLogger("org").setLevel(Level.ERROR)
    println("Test Begin")
// println(SparkSession.getClass)

    val df = spark.read
      .schema(schema)
      .option("header", "true")
      .csv("file:///root/res.csv")
// df.show()
//Task1(df)
    Task2(df)
    val df_data2 = spark.read
      .schema(schema_data2)
      .option("header", "true")
      .csv("file:///root/data2.csv")
// df_data2.show()
// Task3(df_data2)
//Task4(df)

  }
Run spark
[root@master ~]# ./spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
Guide package

I will update the installation of package dependencies in a few days, specifically step 0.1. If anyone sees this but forgets to update, please remind me!

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.log4j.{Level,Logger}
import org.apache.spark.mllib.stat.Statistics

import scala.collection.mutable.ArrayBuffer
Import of some Spark information and schema
 val schema = StructType(Array(
    StructField("", FloatType),
    StructField("Monitoring Time", StringType),
    StructField("SO2 monitoring concentration (μg/m3)", FloatType),
    StructField("NO2 monitoring concentration (μg/m3)", FloatType),
    StructField("PM10 monitoring concentration (μg/m3)", FloatType),
    StructField("PM2.5 monitoring concentration (μg/m3)", FloatType),
    StructField("O3 monitoring concentration (μg/m3)", FloatType),
    StructField("CO monitoring concentration (mg/m3)", FloatType),
    StructField("Temperature(℃)", FloatType),
    StructField("Humidity(%)", FloatType),
    StructField("Barometric Pressure (MBar)", FloatType),
    StructField("Wind speed (m/s)", FloatType),
    StructField("Wind direction(°)", FloatType),
    StructField("Cloud amount", FloatType),
    StructField("Long wave radiation (W/m2)", FloatType)
  ))

  val schema_data2 = StructType(Array(
    StructField("Monitoring Date", StringType),
    StructField("SO2 monitoring concentration (μg/m3)", FloatType),
    StructField("NO2 monitoring concentration (μg/m3)", FloatType),
    StructField("PM10 monitoring concentration (μg/m3)", FloatType),
    StructField("PM2.5 monitoring concentration (μg/m3)", FloatType),
    StructField("O3 maximum eight-hour moving average monitoring concentration (μg/m3)", FloatType),
    StructField("CO monitoring concentration (mg/m3)", FloatType)
  ))

  val spark = SparkSession
    .builder()
    .master("spark://192.168.244.130:7077")
    .getOrCreate()

If it fails when connecting to spark, you can use the following code to replace the previous one (but this can only be used during testing, it is a fake Spark. I will also do the debugging of the specific error report in a few days. I forgot if any readers need it. remind me)

 val spark = SparkSession
    .builder()
    .master("local[2]")
    .getOrCreate()
Run result
Test Begin
@@@@@@PM25 + ------- + ----- +
|range|count|
 + ------- + ----- +
| Total|19057|
| 0-35|14887|
| 35-75| 3844|
|75-115| 288|
|115-150| 36|
|150-250| 2|
|250-350| 0|
|350-500| 0|
 + ------- + ----- +

 + --------- + ----- +
|range|count|
 + --------- + ----- +
| Total|19025|
| 0-50|19020|
| 50-150| 5|
| 150-475| 0|
|475-800| 0|
| 800-1600| 0|
|1600-2100| 0|
|2100-2620| 0|
 + --------- + ----- +


Process finished with exit code 0