Concepts and operations of dataframe and saprkSQL

sparkSQL background

  1. sparkContext:
    • SparkContext is the entry point for a Spark application and represents the connection to the Spark cluster.
    • It is the main entry point in Spark 1.x, and SparkSession was introduced in Spark 2.x to replace SparkContext.
    • SparkContext is responsible for managing task scheduling, data partitioning and communication with the cluster.
    • You can use SparkContext to create RDDs (Resilient Distributed Datasets) and perform basic Spark operations such as map, reduce, etc.
    • Typically, you would use SparkContext in Spark 1.x versions to initialize Spark applications.
  2. SparkSession:
    • SparkSession is a new concept introduced in Spark 2.x version. It is the entry point of Spark SQL and is used to process structured data.
    • SparkSession inherits the functions of SparkContext and also provides support for DataFrame and SQL queries.
    • It is more convenient and allows you to seamlessly use Spark SQL, DataFrame operations and raw RDD operations in a single session.
    • With SparkSession, you can create and manipulate DataFrames, execute SQL queries, and interact with Spark clusters.
    • In Spark 2.x and later versions, it is generally recommended to use SparkSession as the entry point for Spark applications.

In Spark 2.x and later, you can create a SparkSession in the following ways:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

? This will create a SparkSession that will act as the entry point to your Spark application, allowing you to use both Spark SQL and Spark primitive operations. You can use this SparkSession to load data, process structured data, and execute SQL queries. SparkSession is a more powerful and flexible option, especially when working with structured data.

? Actually, after starting into pyspark, pyspark provides a SparkContext object (named sc) and a SparkSession object (named spark) by default

? SparkSession does not directly inherit SparkContext, but is a more advanced and comprehensive interface introduced in Spark 2.x version, which includes SparkContext and further extended with many new features, especially in structured data processing.

To put it simply, use sparkcontext to process RDD data. For others, it is recommended to use sparksession

In summary, SparkSession is the recommended entry point for Spark 2.x and later versions, which provides more functionality, especially when it comes to processing structured data and executing SQL queries. While SparkContext still has its uses, in most cases it is more convenient to use SparkSession, especially when processing and analyzing structured data. Therefore, if you are using Spark 2.x or newer, it is generally recommended to use SparkSession.

Spark SQL Architecture

Catalyst Optimizer:

  • Parser: Spark SQL first parses the SQL query string into an abstract syntax tree (AST).
  • Analyzer: The AST is analyzed, checking for syntax and semantic errors, performing name resolution and type checking in preparation for optimization.
  • Logical Plan: At this stage, a logical query plan is generated, which represents the logical structure of the query.

Tungsten Execution Engine:

  • Optimization: Catalyst Optimizer optimizes logical query plans and applies various optimization rules to improve query performance.
  • Physical Plan: Generates a physical query plan that specifies how to execute the query on the distributed cluster. This includes choosing data distribution and task scheduling.
  • Code Generation: Spark SQL uses Tungsten code generation technology to convert physical plans into efficient Java bytecode to accelerate data processing.

Data Sources:

  • Spark SQL can connect to a variety of data sources, including relational databases, Parquet, Avro, JSON, CSV, etc. The data source module is responsible for loading and parsing data to make it available for querying.

Hive Integration:

  • Spark SQL integrates Hive metastore and query capabilities, allowing users to query Hive tables and use Hive UDFs (user-defined functions).

DataFrame API:

  • The DataFrame API provides a way to programmatically manipulate data, similar to tables in a relational database. DataFrames can be created, transformed, and manipulated programmatically, or used with SQL queries.

User-Defined Functions (UDFs):

  • Users can define their own functions and then use these UDFs in Spark SQL queries. This allows users to extend the capabilities of Spark SQL to meet specific needs.

Query Execution:

  • The query execution engine converts the physical plan into actual distributed tasks and executes these tasks on the Spark cluster. This includes distributed processing of data, partitioning of data, and data transfer between nodes.

Result Presentation:

  • Query results can be presented in a variety of formats, including display in the console, written to a file, or written to an external system.

Cluster Manager Integration:

  • Spark SQL can be integrated with different cluster managers such as Apache Hadoop YARN, Apache Mesos or a standalone Spark cluster manager.

DataFrame

Dataframe, rdd difference

DataFrame:
DataFrame is a structured data abstraction, similar to tables in relational databases. Its features are as follows:
Column names and data types:

A DataFrame contains multiple columns, each column has a name and data type. This means that the data in the DataFrame is organized in tabular form, and the data type of each column is clear, such as integer, string, date, etc.
Mode information:

DataFrames usually have an explicit schema information describing the name and data type of each column. This makes the structure of the data clear and facilitates data interpretation and querying.
Optimization and querying:

DataFrame supports advanced queries and operations, including SQL queries, filtering, mapping, grouping, aggregation, etc. It can perform query optimization through Catalyst query optimizer to improve query performance.
SQL integration:

DataFrame integrates Spark SQL and can use SQL query language to query and operate data, making it more convenient to process structured data.

This is a data example of dataframe, which is similar to a table
 + ---- + ------ + ----- +
| ID | Name | Age |
 + ---- + ------ + ----- +
| 1 | Alice | 20 |
| 2 | Bob | 22 |
| 3 | Carol | 21 |
| 4 | David | 23 |
 + ---- + ------ + ----- +

RDD:
RDD (Resilient Distributed Dataset) is a set of data objects distributed in the cluster. It has no fixed structure and can contain various types of data. Its features are as follows:
Weak type:

RDD is weakly typed, it does not contain column names and data type information. This means you need to manage the type conversion and validation of the data yourself.
Versatility:

RDDs can contain various types of data, including structured and unstructured data. It has no fixed table structure and can adapt to different types of data.
Operations and transformations:

RDD provides basic operations and transformation methods, such as map, reduce, filter, etc. You need to manually write code to implement data processing logic
Data management:

RDDs require manual management of partitioning, data distribution, and data processing. This usually requires more programming effort and manual optimization.

rdd has various data types
[3, 1, 4, 1, 5, 9, 2, 6, 5, 3]

Due to the weak type of rdd, which contains various types of data, the data in the dataframe can also be converted into rdd data through the .rdd method.

Concept

DataFrame is a tabular data structure that is widely used for data storage and analysis, especially in data processing tools such as Python’s Pandas library, R language, and Apache Spark. DataFrame provides a two-dimensional, tabular data structure in which each column can contain different types of data (numeric, string, Boolean, etc.), similar to a SQL table or Excel spreadsheet.

A DataFrame in Apache Spark is an immutable distributed data collection designed to handle large amounts of structured data. Spark’s DataFrame is further encapsulated on the basis of RDD (Resilient Distributed Dataset, elastic distributed data set). It provides more advanced abstraction and optimized interfaces for structured and semi-structured data.

Key features of DataFrame include:

  • Structured data: DataFrame is designed to handle structured data, which means the data is organized in a table with rows and columns. Each column has a name and data type, and each row contains specific data. This makes DataFrame suitable for processing data similar to that found in relational databases.
  • Strongly typed: DataFrame is strongly typed, which means each column has an explicit data type, such as integer, string, date, etc. This helps catch and prevent type errors, making data processing more stable and maintainable.
  • Immutability: A DataFrame is immutable, which means that once created, you cannot modify it directly. Any operation on a DataFrame creates a new DataFrame. This immutability facilitates concurrent processing and avoids data corruption.
  • Lazy evaluation: DataFrame operations are usually lazily evaluated, which means that the operation is not performed immediately but only when needed. This allows Spark to perform optimizations and execution plan generation to improve performance.
  • Distributed processing: DataFrame allows you to process data on a distributed computing cluster. It can automatically split data into partitions and perform operations on different nodes, enabling high-performance data processing.
  • API support: Spark SQL provides API support for multiple programming languages, including Scala, Java, Python, and R. You can use the programming language you are most familiar with to manipulate DataFrames.
  • Operations and transformations: DataFrame supports various operations and transformations, such as filtering, mapping, grouping, aggregation, joining, sorting, etc. These operations allow you to perform complex data processing and analysis.
  • Integrated Spark SQL: DataFrame is part of Spark SQL, allowing you to use SQL query language to query and process data. You can also mix DataFrame and SQL queries to meet different needs more flexibly.

Read

The process of creating a DataFrame in Apache Spark typically involves the following steps:

  1. Initialize SparkSession: SparkSession is the entry point for interaction with Spark. Before you start using Spark, you need to create a SparkSession object.
  2. Prepare data: Data can come from local collections (such as Python lists), external data sources (such as HDFS, Hive tables, relational databases, JSON files, etc.).
  3. Create DataFrame: Use SparkSession to read data into DataFrame format. For local collections, you can use the spark.createDataFrame(data) method.

Give a specific example:

Let’s say you have a list containing user information and want to create a DataFrame. In PySpark you can do this:

from pyspark.sql import SparkSession
from pyspark.sql import Row
#Initialize SparkSession
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()
# Prepare data using Row objects
data = [Row(name="Alice", age=12, grade=6),
        Row(name="Bob", age=15, grade=9),
        Row(name="Cindy", age=14, grade=8)]
# Create DataFrame using SparkSession
df = spark.createDataFrame(data)
# Display DataFrame
df.show()

When the above code is run, df.show() will output the contents of the DataFrame in tabular form. This is a very simple example just to demonstrate the process of creating a DataFrame in Spark.

In practical applications, data is usually read from file systems such as HDFS, cloud storage, or databases to create DataFrames. Examples are as follows:

# Read JSON file to create DataFrame
df = spark.read.json("path/to/jsonfile.json")
# Read CSV file to create DataFrame
df = spark.read.csv("path/to/csvfile.csv", inferSchema=True, header=True)

Save

In PySpark, you can use the write method to save a DataFrame to a different data storage system. DataFrame offers several saving format options, including Parquet, JSON, CSV, Avro, JDBC databases, and more. Here is an example of saving a DataFrame to different formats:

  1. Save as Parquet file:

    df.write.parquet("data.parquet")
    

    This will save the DataFrame as a Parquet file.

  2. Save as JSON file:

    df.write.json("data.json")
    

    This will save the DataFrame as a JSON file.

  3. Save as CSV file:

    df.write.csv("data.csv")
    

    This will save the DataFrame as a CSV file.

  4. Save to JDBC database:

    df.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/database_name") \
        .option("dbtable", "table_name") \
        .option("user", "username") \
        .option("password", "password") \
        .save()
    

    This will save the DataFrame to the specified JDBC database table.

  5. Other formats: Spark provides a variety of other formats that you can choose according to your needs, such as Avro, ORC, Delta Lake, etc.

? Spark will write the results to the target directory and save the data as a partition file. This is a common way that Spark processes data, especially in distributed environments.
? When writing a JSON file, Spark divides the data into default partitions and saves each partition as a JSON file. So you see the part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json file, which is a partition file that contains a portion of data. At the same time, there is also a _SUCCESS file, which is used to mark the success of job execution.

Directories typically contain the following files and content:
Data files: These are the files that store the actual data. Each partition generates a data file, and the data file contains the data records in the partition.
Metadata files: Some metadata files may be included in the directory to describe the structure, schema, and other information of the data. These files typically include .metadata files or .crc files, which are used to check data integrity.
Success Mark File: Typically a _SUCCESS file is generated to mark a successful job execution. This file is usually an empty file and is used to indicate that the data save operation completed successfully.

The following is a case

mydata/
|-- part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json
|-- part-00001-7c3b459f-d5a2-4857-8a69-4417c1509e99.json
|-- .metadata
|-- _SUCCESS

Reflection mechanism infers RDD

? To create a DataFrame from an RDD using reflection, the data in the RDD itself should be structured and each element should contain field and data type information. Specifically, each element in the RDD should be a Structured Data Object

from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName("ReflectionExample").getOrCreate()

#Create a structured RDD containing student information
student_data = [
    {<!-- -->"name": "Alice", "age": 20},
    {<!-- -->"name": "Bob", "age": 22},
    {<!-- -->"name": "Carol", "age": 21},
    {<!-- -->"name": "David", "age": 23}
]

# Create structured data as RDD
rdd = spark.sparkContext.parallelize(student_data)

# Convert RDD to DataFrame using reflection mechanism
from pyspark.sql import Row
df = rdd.map(lambda x: Row(**x)).toDF()

# Display DataFrame data
df.show()

The following two lines are the key to converting the dataframe

  1. rdd.map(lambda x: Row(**x)): In this step, the map operation executes a function on each element in the RDD. This function accepts A dictionary x (that is, the elements in the RDD), and then use Row(**x) to convert the dictionary to a Row object. Here Row is a special class in Spark SQL that allows to convert structured data into objects with explicit fields and data types.
  2. toDF(): This step converts an RDD containing Row objects into a DataFrame. At this point, Spark uses reflection to automatically infer the field names and data types in each Row object and build a DataFrame with an explicit schema.

Introduction to row

pyspark.sql.Row is a class in PySpark used to create and represent data records with named fields. The Row class allows you to create data records with explicit field names, often useful when using reflection to create a DataFrame.

from pyspark.sql import Row

#Create a Row object to represent a person's information
person = Row(name="Alice", age=30, city="New York")
#The attribute fields name, age, city have been determined in the redirected code

#Access fields of Row object
print(person.name) # Output "Alice"
print(person.age) # Output 30
print(person.city) # Output "New York"

# Register DataFrame as a temporary table
Object name.createOrReplaceTempView("Temporary registry name")
# Execute SQL query
result = spark.sql("SELECT name, age FROM temporary registry WHERE age > 20")
#After generating the temporary registry, you can treat the DataFrame as a data table and use SQL query syntax to operate it. This means you can use SQL queries to filter, sort, aggregate, and transform data just like in a relational database

Programmatic RDD inference

from pyspark.sql.types import *
#Equivalent to from pyspark.sql.types import StructType, StructField, StringType, IntegerType

#The schema is defined using StructType. Each field is defined by StructField, specifying the field name, data type and whether it is allowed to be empty (True means it is allowed to be empty)
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
# Convert data to DataFrame using defined schema
df = spark.createDataFrame(
    lines.map(lambda line: line.split(",")),
    schema
)
#spark.createDataFrame(...): This part uses the createDataFrame method to create a DataFrame. It accepts two parameters. The first parameter is the split data, that is, each line of text data is split into a list of strings. The second parameter is the schema, which is used to specify the field name and data type.
#Note that the first parameter is the split data, usually an RDD, where each element represents a row of data, and each row of data is split into a list of strings. The second parameter is the data schema, which defines field names and data types and is used to determine how to interpret and process the data.
#The number of the first parameter must correspond to the number of the second parameter, because a field must correspond to a column. The second parameter is the data schema (schema), which defines the field name and data type and is used to determine how to interpret it. and process data.

# Register DataFrame as a temporary table
df.createOrReplaceTempView("people")

The difference between the two:

  1. Programmatic RDD inference:
    • How the schema is defined: In programmatic RDD inference, you need to manually define the schema, i.e. field names and data types. This usually requires you to explicitly specify the name and type of each field in your code to ensure the data is interpreted correctly.
    • Data processing method: You need to programmatically map data to fields in the schema to ensure one-to-one correspondence between data and schema. This method is suitable for processing data with unknown structure, or when precise control of the data structure is required.
  2. Reflective RDD inference:
    • How the schema is defined: In reflective RDD inference, the schema is inferred by automatically detecting the data in the RDD without the need for manual definition. The system attempts to infer field names and data types from the data.
    • Data processing method: The structure of the data is automatically inferred by the system, and you do not need to manually map the data to fields in the schema. This method is suitable for situations where the data structure is known and you want to quickly create a DataFrame.

Overall, programmatic RDD inference requires more manual work but allows greater flexibility and is suitable for processing data of unknown structure. Reflective RDD inference is more automated, suitable for data with known structure, and can create DataFrame faster. Which method you choose depends on the characteristics of your data and your needs.

Using Spark SQL

  1. Create SparkSession: First, you need to create a SparkSession object, which is the entry point for interacting with Spark SQL. You can create it using:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("MySparkSQLApp") \
        .getOrCreate()
    

    This will create a SparkSession which will provide you with access to Spark SQL functionality.

  2. Load data: Use SparkSession to load your data. You can load data from various data sources such as text files, CSV files, JSON files, Parquet files, databases, etc. For example, to load a CSV file:

    df = spark.read.csv("data.csv", header=True, inferSchema=True)
    

    This creates a DataFrame object df containing the data loaded from the CSV file.

  3. Execute SQL queries: You can use the spark.sql() method to execute SQL queries. For example:

    df.createOrReplaceTempView("mytable") # Create a temporary view
    result = spark.sql("SELECT * FROM mytable WHERE age > 25")
    result.show()
    

    This will execute the SQL query and display the results.

  4. Perform DataFrame operations: In addition to executing SQL queries, you can use the DataFrame API to perform various operations such as filtering, aggregating, joining, and transforming data. The DataFrame API provides a more flexible way to work with data.

  5. Save data: If necessary, you can save the processed data to different data storage systems, such as Parquet files, JSON files, databases, etc.

  6. Close SparkSession: After completing the task, remember to close SparkSession to release resources:

    spark.stop()
    

These are general steps for using Spark SQL. You can perform more complex operations based on your specific needs, including connecting multiple data sources, performing complex transformations and analyses, and saving results to different data storage systems. Make sure you familiarize yourself with the documentation for Spark SQL and PySpark to gain a deeper understanding of their functionality and usage.

Connect to database

 jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.jdbc.Driver") \
        .option("url","jdbc:mysql://localhost:3306/spark")\
        .option("dbtable", "student") \
        .option("user", "root") \
        .option("password", "123456") \
        .load()
jdbcDF.show()

Below is a sample code that shows how to use Spark SQL to write data to a database. Suppose you already have a DataFrame df and you want to write it to a database table named “mytable”:

from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName("WriteToDatabase").getOrCreate()

#Create a DataFrame df containing the data to be written to the database

# Database connection information
url = "jdbc:mysql://localhost:3306/your_database"
properties = {<!-- -->
    "user": "your_username",
    "password": "your_password",
    "driver": "com.mysql.jdbc.Driver"
}

# Write data to the database table "mytable"
#Write mode, you can choose "overwrite" to overwrite or "append" to append
df.write.mode("overwrite").jdbc(
    url, "mytable", mode="overwrite", properties=properties)
# Close SparkSession
spark.stop()

ppt code case

#!/usr/bin/env python3

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

#Set mode information below
schema = StructType([StructField("id", IntegerType(), True), \
StructField("name", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("age", IntegerType(), True)])
#Set two pieces of data below to represent the information of two students
studentRDD = spark \
.sparkContext \
.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \
.map(lambda x:x.split(" "))
 
#Create Row objects below. Each Row object is a row in rowRDD.
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip( ))))
 
#Establish the correspondence between the Row object and the schema, that is, match the data with the schema
studentDF = spark.createDataFrame(rowRDD, schema)
 
#Write to database
prop = {<!-- -->}
prop['user'] = 'root'
prop['password'] = '123456'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

Please see the explanation of studentDF.write.jdbc(“jdbc:mysql://localhost:3306/spark”, student’, append’, prop)

  • jdbc: This section specifies the use of the JDBC protocol to connect to the database.
  • mysql: This is the type of database, specifying the type of database to be connected, here is MySQL.
  • localhost: This is the hostname or IP address of the database server, indicating a connection to the local database server.
  • 3306: This is the default port number of the MySQL database, indicating the 3306 port to connect to the MySQL server.
  • spark: This part is the name of the database, indicating the name of the database to be connected. Here, it connects to a database called “spark”.

So, jdbc:mysql://localhost:3306/spark means the “spark” database that will be connected to the local MySQL database server using the JDBC protocol. You can modify this connection string according to the actual situation and replace the host name, port number, database name and other information with the information of the database server you want to connect to.