Write programs based on Spark SQL to complete simple indicators.

Table of Contents

data preparation:

I have prepared the data information of members of an aerospace company as the data for this project. The details are as follows:

Demand indicators:

In order to visualize the ratio of the number of members among cities in various provinces, the cities ranked by the number of aerospace company members (in descending order) were screened out.

The members of the aerospace company are grouped by province, and after screening out the members in the cities within the province, they are sorted in descending order by total points (top three).

The aerospace company has decided to upgrade members who have taken more than 100 flights to platinum membership, so it needs to screen out customers who meet the conditions.

See the age distribution in each city, see the age difference, and screen out the maximum and minimum ages in the city.

Screen out member membership time (registration time) as needed.

Data preprocessing:

Requirement indicator realization:

1. In order to visually understand the ratio of the number of members among cities in various provinces, the cities ranked by the number of aerospace company members (in descending order) were screened out.

2. Group the members of the aerospace company by province. After screening out the members in the cities within the province, sort them in descending order (top three) according to the total points.

3. The aerospace company has decided to upgrade members who have taken more than 100 flights to platinum membership, so it needs to screen out customers who meet the conditions.

4. See the age distribution in each city, see the age difference, and screen out the maximum and minimum ages in the city.

5. Screen out the membership membership time (registration time) as needed.

Summarize:


Data preparation:

I have prepared the data information of the members of a certain aerospace company as the data for this project. The data, details are as follows:

–Data comes from the author’s course teacher.

Demand indicator:

  • In order to be intuitive The ratio of the number of members among cities in each province, screens out the cities with the largest number of aerospace company members (descending order).
  • Members of aerospace companies are groupedby province, and after screening out members in cities within the province, they are sorted in descending order by total points (top three).
  • See the age distribution in each city, see the age difference, and screen out themaximum age and the minimum age in the city.
  • Filter as needed Find out the member’s membership time (registration time).

Data preprocessing:

To convert the excel file on hand to a json file, you can use Baidu’s conversion tool. Here I am using an online table conversion tool – making table conversion easier (tableconvert.com)icon-default.png?t=N7T8https://tableconvert. com/zh-cn/

Requirement indicator realization:

1. To visually understand the ratio of the number of members among cities in various provinces,Screen out the cities with the largest number of aerospace company members (descending order).
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object App1 {
  def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder.appName("App1").master("local").getOrCreate
    val df = ss.read.json("date.json")
// Define a new tabular data structure
    val he: DataFrame = df.groupBy("city", "province")
      .count
      .orderBy(new Column("count").desc)
// Save to local partition file format path
    he.repartition(1).write.format("json").save("output")
    he.show(10, false)

    ss.stop()

  }

}

Create a project with Scala, and after reading the file data in the entity class, run its code:

Saving the queried data to a local row is for experimentation and is optional.

2. Group the members of the aerospace company< strong>Group by province, after screening out the members in cities within the province, sort them in descending order (top three) by total points.
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object App2 {
  def main(args: Array[String]): Unit = {
    val ss: SparkSession = SparkSession.builder.appName("App2").master("local").getOrCreate
    val df: Dataset[Row] = ss.read.json("date.json")
    //Create a temporary drawing view
    df.createTempView("df")
    //Execute SQL statement
    ss.sql("select city, province, country, total_score, dense_rank() over(partition by province order by total_score desc ) as rank_no from df")
      .where(" rank_no <= 3 ")
      .show(15)
    ss.stop()
  }
}

Realization effect:

You can see the top three cities in each province with total member points.

3. The aerospace company has decided to upgrade members who have taken more than 100 flights to platinum membership. strong>Members with more than 100 trips (number of flights) are upgraded to platinum members, so it is necessary to screen out customers who meet the conditions.
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class App3 {
    public static void main(String[] args) throws AnalysisException {
        SparkSession ss = SparkSession.builder()
                .appName("App3")
                .master("local")
                .getOrCreate();

        Dataset<Row> df = ss.read()
                .json("date.json");

        ss.udf().register("susu", new UDF1<String, String>() {
            @Override
            public String call(String number_of_flights) throws Exception {
                return Long.parseLong(number_of_flights) >= 100 ? "Platinum Member" : "Ordinary Member";
            }
        }, DataTypes.StringType);

        df.createTempView("user");

        ss.sql("select member_id, joining_time, city, age, number_of_flights, " +
                        "susu(number_of_flights) as `membership level` from user")
                .show();
        ss.stop();
    }
}

In this part, you write a custom function and then call the function in SQL language:

You can see that members are classified according to the number of trips, which is clear at a glance.

Four. See the age distribution in each city, see the age difference, screen Maximum age and minimum age withinthe city.
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object App4 {
  def main(args: Array[String]): Unit = {
    val ss: SparkSession = SparkSession.builder.appName("App4").master("local").getOrCreate
    val df: Dataset[Row] = ss.read.json("date.json")
    df.createTempView("user")
    ss.sql("select city, max(age), min(age) from user group by city, gender ").show()
    ss.stop()
  }
}

The effect is as follows:

5. Screen out the member’s membership time (registration time) as needed.
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object App5 {
  def main(args: Array[String]): Unit = {
    val ss: SparkSession = SparkSession.builder.appName("App5").master("local").getOrCreate
    val df: Dataset[Row] = ss.read.json("date.json")
    df.createTempView("user")
    df.where("joining_time like ' 08%'").show()
    ss.stop()
  }
}

Use where() to fuzzy query to find the required membership date:

Summary:

This project has since been completed.

The author only wrote five simple requirements, just to make it easier for everyone to understand spark sql.

Since I came into contact with Spark, I have encountered a lot of problems on the way to get started, searching for errors and finding methods. My learning ability may not be as good as others, but if I encounter many obstacles, I will work hard to make up for the setbacks.

Hadoop MapReduce can only analyze offline massive data, and its performance is poor. As a rising star, Spark has good performance, supports many scenarios, and has many enterprise applications, and can completely replace MapReduce.

Let us encourage each other and walk together on the road of programming.