Level 1: DataSet Creation
Knowledge points
1. Strongly typed DataSet creation
(1) Create a DataSet using a sequence of basic types
## Create a collection val seq = Seq(1,2,3,4) ## Call toDS() method val ds: Dataset[Int] = seq.toDS() ds.show()
(2) Use the sample class sequence to create a DataSet (the sample class cannot be located in the main method, otherwise the compilation will not pass)
## Create sample class User case class User(id:Int,name:String,age:Int) ## Call toDS() method val ds1: Dataset[User] = Seq(User(202201,"Anne",20)).toDS() ds1.show()
(3) Convert RDD to DataSet
## Create sample class case class User(id:Int,name:String,age:Int) ## Call toDS() method val ds2: Dataset[User] = sc.makeRDD(List((202201, "Anne", 20), (202202, "Jack", 21))) .map(t => User(t._1, t._2, t._3)).toDS() ds2.show()
(4) Convert DataFrame to DataSet
## Create sample class User case class User(id:Int,name:String,age:Int) ## Create RDD and call toDF() method val df: DataFrame = sc.makeRDD(List((202203, "Anna", 22), (202204, "Peter", 24))).toDF("id","name" ,"age") ## Call the as[] method val ds3: Dataset[User] = df.as[User] ds3.show()
Programming requirements
-
Create a DataSet using the basic type String
-
Create a DataSet using the sample class Staff
-
Convert RDD to DataSet of type Staff
-
Convert DataFrame to DataSet of type Staff
-
Read the
/root/depart.csv
file to create a DataSet -
Data segmentation method: comma (
,
)Directory where the data is located:
/root/depart.csv
depart
table structure:
INFO | TYPE |
---|---|
depart_id | int |
depart_name | string |
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext, sql} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object first { def main(args: Array[String]): Unit = { // TODO Create the SparkSQL running environment val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL") val sc = new SparkContext(sparkConf) val spark =SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ /********* Begin *********/ // Create a DataSet using the basic type String and print the output val seq = Seq("hello","spark") ///Call toDS() method val ds:Dataset[String]=seq.toDS() ds.show() // Use the sample class Staff to create a DataSet and print the output val seq1 = Seq(Staff(1011,"JayChou","male")) val ds1:Dataset[Staff]=seq1.toDS() ds1.show() // Convert RDD to Staff type DataSet and print the output val rdd1: RDD[(Int, String, String)] = sc.makeRDD(List((1012, "Eason", "male"))) val ds2:Dataset[Staff]=rdd1.map(t=>Staff(t._1,t._2,t._3)).toDS() ds2.show() // Convert the DataFrame to a Staff type DataSet and print the output val rdd2: RDD[(Int, String, String)] = sc.makeRDD(List((1013, "MJ", "female"))) val ds3: DataFrame = rdd2.toDF("id","name","sex") ds3.show() // Read the '/root/depart.csv' file to create a Dataset, retain the header, and print the output val ds4:Dataset[Row]=spark.read.format("csv").option("seq",",").option("header","true") .load("/root/depart.csv") ds4.show() /********* End *********/ //TODO closes the environment spark.close() sc.stop() } case class User(id:Int,name:String,age:Int) case class Staff(id:Int,name:String,sex:String) }
Level 2: Untyped DataSet operations
Knowledge points
1. Select the type of operation
(1) select: used to select the columns to be obtained;
(2) selectExpr: You can directly use the SQL function for query, which is equivalent to using select together with the expr function;
(3) withColumn: used to create a new column or modify the original column;
(4) withColumnRenamed: used to modify column names.
2. Cutting type operations
(1) drop: used to subtract or delete a column.
3. Aggregation type operations
(1) groupBy: used to specify a row for grouping operation.
Programming requirements
-
Read the
/root/dept.csv
file to create aDataSet
-
Query
name
column data -
Query the average age of the
staff
table -
Modify the original
id
column asstaff_id
-
Change the
name
column tostaffName
-
Delete
age
column -
Group and count the number of people in different departments according to
depart_name
-
Data segmentation method: comma (
,
)Directory where the data is located:
/root/staff.csv
staff
table structure:
INFO | TYPE |
---|---|
id | int |
name | string |
age | int |
depart_name | string |
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext, sql} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object second { def main(args: Array[String]): Unit = { // TODO Create the SparkSQL running environment val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL") val sc = new SparkContext(sparkConf) val spark =SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ /********* Begin *********/ //case class User(id:Int,name:String,age:Int,depart_name:String) // Read the '/root/dept.csv' file to create a DataSet val df:DataFrame=spark.read.format("csv").option("seq",",").option("header","true").load( "/root/staff.csv") //df.show() // Query 'name' column data df.select("name").show() // Query the average age df.selectExpr("avg(age)").show() // Modify the original 'id' column as 'staff_id' df.withColumn("staff_id",'id).show() // Modify the 'name' column to 'staffName' df.withColumnRenamed("name","staffName").show() // Delete the 'age' column df.drop("age").show() //Group statistics on the number of people in different departments by 'depart_name' df.groupBy("depart_name").count().show() /********* End *********/ //TODO closes the environment spark.close() sc.stop() } }
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. CS entry skill treeLinux introductionFirst introduction to Linux 38115 people are learning the system