Creation and use of untyped DataSet

Level 1: DataSet Creation

Knowledge points

1. Strongly typed DataSet creation

(1) Create a DataSet using a sequence of basic types

## Create a collection
val seq = Seq(1,2,3,4)
## Call toDS() method
val ds: Dataset[Int] = seq.toDS()
ds.show() 

(2) Use the sample class sequence to create a DataSet (the sample class cannot be located in the main method, otherwise the compilation will not pass)

## Create sample class User
case class User(id:Int,name:String,age:Int)
## Call toDS() method
val ds1: Dataset[User] = Seq(User(202201,"Anne",20)).toDS()
ds1.show() 

(3) Convert RDD to DataSet

## Create sample class
case class User(id:Int,name:String,age:Int)
## Call toDS() method
val ds2: Dataset[User] = sc.makeRDD(List((202201, "Anne", 20), (202202, "Jack", 21)))
.map(t => User(t._1, t._2, t._3)).toDS()
ds2.show()

(4) Convert DataFrame to DataSet

## Create sample class User
case class User(id:Int,name:String,age:Int)
## Create RDD and call toDF() method
val df: DataFrame = sc.makeRDD(List((202203, "Anna", 22), (202204, "Peter", 24))).toDF("id","name" ,"age")
## Call the as[] method
val ds3: Dataset[User] = df.as[User]
ds3.show() 

Programming requirements

  • Create a DataSet using the basic type String

  • Create a DataSet using the sample class Staff

  • Convert RDD to DataSet of type Staff

  • Convert DataFrame to DataSet of type Staff

  • Read the /root/depart.csv file to create a DataSet

  • Data segmentation method: comma (,)

    Directory where the data is located: /root/depart.csv

depart table structure:

INFO TYPE
depart_id int
depart_name string
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object first {

  def main(args: Array[String]): Unit = {

    // TODO Create the SparkSQL running environment
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val sc = new SparkContext(sparkConf)
    val spark =SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

  /********* Begin *********/
    // Create a DataSet using the basic type String and print the output
    val seq = Seq("hello","spark")
    ///Call toDS() method
    val ds:Dataset[String]=seq.toDS()
    ds.show()

    // Use the sample class Staff to create a DataSet and print the output
    val seq1 = Seq(Staff(1011,"JayChou","male"))
    val ds1:Dataset[Staff]=seq1.toDS()
    ds1.show()


    // Convert RDD to Staff type DataSet and print the output
    val rdd1: RDD[(Int, String, String)] = sc.makeRDD(List((1012, "Eason", "male")))
    val ds2:Dataset[Staff]=rdd1.map(t=>Staff(t._1,t._2,t._3)).toDS()
    ds2.show()


    // Convert the DataFrame to a Staff type DataSet and print the output
    val rdd2: RDD[(Int, String, String)] = sc.makeRDD(List((1013, "MJ", "female")))
    val ds3: DataFrame = rdd2.toDF("id","name","sex")
    ds3.show()

    // Read the '/root/depart.csv' file to create a Dataset, retain the header, and print the output
    val ds4:Dataset[Row]=spark.read.format("csv").option("seq",",").option("header","true") .load("/root/depart.csv")
    ds4.show()


/********* End *********/

    //TODO closes the environment
    spark.close()
    sc.stop()

  }
  case class User(id:Int,name:String,age:Int)
  case class Staff(id:Int,name:String,sex:String)
}

Level 2: Untyped DataSet operations

Knowledge points

1. Select the type of operation

(1) select: used to select the columns to be obtained;

(2) selectExpr: You can directly use the SQL function for query, which is equivalent to using select together with the expr function;

(3) withColumn: used to create a new column or modify the original column;

(4) withColumnRenamed: used to modify column names.

2. Cutting type operations

(1) drop: used to subtract or delete a column.

3. Aggregation type operations

(1) groupBy: used to specify a row for grouping operation.

Programming requirements

  • Read the /root/dept.csv file to create a DataSet

  • Query name column data

  • Query the average age of the staff table

  • Modify the original id column as staff_id

  • Change the name column to staffName

  • Delete age column

  • Group and count the number of people in different departments according to depart_name

  • Data segmentation method: comma (,)

    Directory where the data is located: /root/staff.csv

staff table structure:

INFO TYPE
id int
name string
age int
depart_name string
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object second {

  def main(args: Array[String]): Unit = {

    // TODO Create the SparkSQL running environment
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val sc = new SparkContext(sparkConf)
    val spark =SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

  /********* Begin *********/
    //case class User(id:Int,name:String,age:Int,depart_name:String)
    // Read the '/root/dept.csv' file to create a DataSet
    val df:DataFrame=spark.read.format("csv").option("seq",",").option("header","true").load( "/root/staff.csv")
    //df.show()
    // Query 'name' column data
    df.select("name").show()

    // Query the average age
    df.selectExpr("avg(age)").show()

    // Modify the original 'id' column as 'staff_id'
    df.withColumn("staff_id",'id).show()

    // Modify the 'name' column to 'staffName'
    df.withColumnRenamed("name","staffName").show()

    // Delete the 'age' column
    df.drop("age").show()

    //Group statistics on the number of people in different departments by 'depart_name'
    df.groupBy("depart_name").count().show()


/********* End *********/

    //TODO closes the environment
    spark.close()
    sc.stop()

  }
}

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. CS entry skill treeLinux introductionFirst introduction to Linux 38115 people are learning the system